阿里MQ

本文介绍阿里MQ的使用

这个插件主要实现了以下3个功能:

利用AliMQ实现应用与应用之间的事件收发

这里事件叫做ClusteredEvent,ClusteredEvent的收发是以应用为单位的,如果一个应用以多实例的集群形式部署,那么这个应用的所有实例中只会有一个实例接收到这个事件。ClusteredEvent事件借助MQ消息传播。消息轨迹类似下图。

图中,有两个应用,分别是MACULA_PLUGINS_WEBAPP和WHATEVER_APP。其中MACULA_PLUGINS_WEBAPP有两个实例(他们具有相同的订阅ClusteredEvent Topic的CID),分别运行在进程18840和7144中;WHATEVER_APP有一个实例,运行在进程5060中。由图可见,MACULA_PLUGINS_WEBAPP的两个实例里只用其中一个实例(18840)接收到了事件。

这个图其实是ClusteredEvent应用于UIM登出事件的场景(你也可以用到其他合适的场景中)。当用户登出UIM时,UIM会向MACULA_CLUSTERED_EVENT_DEV Topic发送一个UimLogoutEvent(其中含有登出用户的用户名,继承自ClusteredEvent)。MACULA_PLUGINS_WEBAPP和WHATEVER_APP都订阅了这个Topic。由于MACULA_PLUGINS_WEBAPP用的Macula3, 而Macula3采用的是Spring-Session来进行实例间的会话管理,因此只需要其中一个实例接收这个事件进行处理便可。

ClusteredEvent怎么用?

  1. 在你项目的pom.xml里加入macula-plugins-alimq的依赖。注:这里没写版本号,保持与其他macula-plugins版本号一致便可。
<dependency>
    <groupId>org.macula.plugins</groupId>
    <artifactId>macula-plugins-alimq</artifactId>
    <version>3.0.0.RELEASE</version>
</dependency>
  1. 在applicationContext-root.xml中加入类似如下配置。
<!-- 发布、订阅业务事件 -->
<util:properties id="maculaEventProducerProperties">
        <prop key="ProducerId">PID_MACULA_EVENT_DEV</prop>
        <prop key="AccessKey">b80b0bcf82234957b72531b1670e090a</prop>
        <prop key="SecretKey">o9qjprQL0UwZrmzTmgHZne9fio0=</prop>
        <prop key="ONSAddr">http://mq.server-test.infinitus.com.cn/rocketmq/nsaddr4broker-internal</prop>
        <prop key="SendMsgTimeoutMillis">3000</prop>
</util:properties>

<util:properties id="maculaClusteredEventConsumerProperties" >
        <prop key="ConsumerId">CID_MACULA_PLUGINS_WEBAPP_DEV</prop>
        <prop key="AccessKey">b80b0bcf82234957b72531b1670e090a</prop>
        <prop key="SecretKey">o9qjprQL0UwZrmzTmgHZne9fio0=</prop>
        <prop key="ONSAddr">http://mq.server-test.infinitus.com.cn/rocketmq/nsaddr4broker-internal</prop>
        <prop key="MessageModel">CLUSTERING</prop>
</util:properties>

注意几点:

  • 根据不同的环境调整配置,AccessKey,SecretKey,ONSAddr这些相同的配置可以放到macula.properties中,然后通过#{T(org.macula.Configuration).getProperty()}的方式来配置,避免重复。
  • maculaEventProducerProperties,maculaClusteredEventConsumerProperties名字是固定的,不能改。
  • ProducerId固定是PID_MACULA_EVENT_{环境},如PID_MACULA_EVENT_DEV(开发),PID_MACULA_EVENT_TEST(测试),PID_MACULA_EVENT(生产)等,你要根据环境调整。
  • ConsumerId可以由你根据你的应用而定。
  • 注意MessageModel一定是CLUSTERING,你也可以不指定,默认就是CLUSTERING。
  1. 在阿里MQ控制台,把MACULA_CLUSTERED_EVENT_{环境} 这个Topic授权给你的子账号发布、订阅消息,并把你在上面配置中用到的ConsumerId加到该Topic的订阅者里。
  2. 在你的Java项目里,添加一个类继承ClusteredEvent。不要忘记给这个类添加一个默认的serialVersionUID。
public class UimLogoutEvent extends ClusteredEvent<String> {
    private static final long serialVersionUID = 1L;

    public UimLogoutEvent(String source) {
        super(source)
    }
}
  1. 在需要发送事件的地方,publish event即可,如:
ApplicationContext.publishApplicationEvent(new UimLogoutEvent(loginUsername));
  1. 在对这个事件感兴趣的项目里实现这个事件的Listener,如:
@Component
public class UimLogoutEventListener implements ApplicationListener<UimLogoutEvent>{
	@Autowired
	private RedisOperationsSessionRepository sessionRepository;
	
	@Override
	public void onApplicationEvent(UimLogoutEvent event) {
    	Map<String, ?> sessions = sessionRepository.findByIndexNameAndIndexValue(FindByIndexNameSessionRepository.PRINCIPAL_NAME_INDEX_NAME, event.getSource());
		if (sessions != null) {
			for (String sessionId : sessions.keySet()) {
				sessionRepository.delete(sessionId);
			}
		}
		
	}
}
  1. MACULA_CLUSTERED_EVENT_{环境}的Topic框架会在相应环境创建好。你需要做的是在macula.properties里根据环境改变MACULA_CLUSTERED_EVENT的后缀(因为我们的阿里MQ只有一个非生产环境)。生产则不用,因为框架里默认的就是MACULA_CLUSTERED _EVENT。
macula.clusteredEventTopic = MACULA_CLUSTERED_EVENT_DEV

使用时要注意什么?

  1. ClusteredEvent使用的Topic是无序消息Topic,而且ClusteredEnvent在框架里ApplicationContext.publishApplicationEvent的时候也是异步的,因此ClusteredEvent不能保证FIFO。所以你不能用它来处理有严格时间先后顺序的逻辑。

  2. ClusteredEvent 的Topic是无序消息Topic,因此不具有事务特性,未来我们会视乎情况看在框架里增加事务事件是否合适和可行。

  3. 我只想订阅某个或某些应用发出的ClusteredEvent,怎么办? 在ClusteredEvent的消息里,Message Tag是由AppGroup和AppId构成的,如UIM.UIM、GBSS.edealer,你可以在macula.properties指定下面这个属性的值来过滤消息,只接收你感兴趣的应用发出来的ClusteredEvent。默认不配置情况下是*,会接收所有应用发出的ClusteredEvent。建议配置。

macula.clusteredEventSubscribeTag = UIM.UIM||GBSS.edealer
  1. 新创新的CosumerID一定要注意消费点位,按照阿里MQ的手册注意设置,否则新创新的ConsumerID容易把历史消息都消费一遍。

  2. 注意消费逻辑的幂等性,AliMQ不能100%保证MQ消息不出现重复发送。

使用ClusteredEvent实现各个应用的UIM单点登出

现时,当用户登出UIM时,UIM会POST一个HTTP请求到所有接入UIM的应用,应用在接收到这个请求后,就可以在本应用中将该用户登出(主要是通过SignleSingoutFilter来处理),亦即所谓的单点登出。我们发现这个方式不是十分可靠,因此我们增加了ClusteredEvent这种方式。

要用这个功能该怎么做?

Macula 3的项目做了上面功能1中的“ClusteredEvent怎么用”的1、2、3三个步骤,就有这个功能了。当然还需要UIM升级配合。

扩展事件广播方式

利用AliMQ增加了一种同组应用的实例间事件广播机制。各个实例间的事件广播机制,在框架中早已有了redis方式的实现,现在增加多一种AliMQ的方式。

要使用这种方式要怎么做?

  1. 在applicationContext-root.xml中加入类似如下配置。注意MessageModel一定是BROADCASTING,要显式配置,因为默认是CLUSTERING。另外ConsumerId不能与上面订阅ClusteredEvent的ConsumerId相同,因为它们的订阅方式不同,一个是CLUSTERING,一个是BROADCASTING。这是因为AliMQ一个 JVM 中对应的一个 Producer ID/Consumer ID 只能配置一个 Producer/Consumer 实例,还有就是订阅关系一致性的要求。
<util:properties id="maculaBroadcastEventConsumerProperties">
    <prop key="ConsumerId">CID_MACULA_PLUGINS_WEBAPP_DEV2</prop>
    <prop key="AccessKey">b80b0bcf82234957b72531b1670e090a</prop>
    <prop key="SecretKey">o9qjprQL0UwZrmzTmgHZne9fio0=</prop>
    <prop key="ONSAddr">http://mq.server-test.infinitus.com.cn/rocketmq/nsaddr4broker-internal</prop>
    <prop key="MessageModel">BROADCASTING</prop>
</util:properties>
  1. 在macula.properties文件中修改以下属性。
macula.events.transport = alimq
  1. 别忘了在阿里MQ控制台里增加一个无序消息Topic,名称为{AppGroup}BROADCAST_EVENT{环境},如GBSS_BROADCAST_EVENT_DEV,GBSS_BROADCAST_EVENT_TEST,GBSS_BROADCAST_EVENT等,并注意授权给你的子账号订阅、发布。
  2. 把步骤1中配置中的ConsumerId加到步骤3创建的Topic的订阅者中,把ProducerId PID_MACULA_EVENT_{环境}加到发布者中。
  3. 在macula.properties里根据环境改变BROADCAST_EVENT的后缀(因为我们的阿里MQ只有一个非生产环境)。生产则不用,因为默认就是BROADCAST_EVENT。
macula.broadcastEventTopic = BROADCAST_EVENT_DEV