阿里MQ
这个插件主要实现了以下3个功能:
利用AliMQ实现应用与应用之间的事件收发
这里事件叫做ClusteredEvent,ClusteredEvent的收发是以应用为单位的,如果一个应用以多实例的集群形式部署,那么这个应用的所有实例中只会有一个实例接收到这个事件。ClusteredEvent事件借助MQ消息传播。消息轨迹类似下图。
图中,有两个应用,分别是MACULA_PLUGINS_WEBAPP和WHATEVER_APP。其中MACULA_PLUGINS_WEBAPP有两个实例(他们具有相同的订阅ClusteredEvent Topic的CID),分别运行在进程18840和7144中;WHATEVER_APP有一个实例,运行在进程5060中。由图可见,MACULA_PLUGINS_WEBAPP的两个实例里只用其中一个实例(18840)接收到了事件。
这个图其实是ClusteredEvent应用于UIM登出事件的场景(你也可以用到其他合适的场景中)。当用户登出UIM时,UIM会向MACULA_CLUSTERED_EVENT_DEV Topic发送一个UimLogoutEvent(其中含有登出用户的用户名,继承自ClusteredEvent)。MACULA_PLUGINS_WEBAPP和WHATEVER_APP都订阅了这个Topic。由于MACULA_PLUGINS_WEBAPP用的Macula3, 而Macula3采用的是Spring-Session来进行实例间的会话管理,因此只需要其中一个实例接收这个事件进行处理便可。
ClusteredEvent怎么用?
- 在你项目的pom.xml里加入macula-plugins-alimq的依赖。注:这里没写版本号,保持与其他macula-plugins版本号一致便可。
<dependency>
<groupId>org.macula.plugins</groupId>
<artifactId>macula-plugins-alimq</artifactId>
<version>3.0.0.RELEASE</version>
</dependency>
- 在applicationContext-root.xml中加入类似如下配置。
<!-- 发布、订阅业务事件 -->
<util:properties id="maculaEventProducerProperties">
<prop key="ProducerId">PID_MACULA_EVENT_DEV</prop>
<prop key="AccessKey">b80b0bcf82234957b72531b1670e090a</prop>
<prop key="SecretKey">o9qjprQL0UwZrmzTmgHZne9fio0=</prop>
<prop key="ONSAddr">http://mq.server-test.infinitus.com.cn/rocketmq/nsaddr4broker-internal</prop>
<prop key="SendMsgTimeoutMillis">3000</prop>
</util:properties>
<util:properties id="maculaClusteredEventConsumerProperties" >
<prop key="ConsumerId">CID_MACULA_PLUGINS_WEBAPP_DEV</prop>
<prop key="AccessKey">b80b0bcf82234957b72531b1670e090a</prop>
<prop key="SecretKey">o9qjprQL0UwZrmzTmgHZne9fio0=</prop>
<prop key="ONSAddr">http://mq.server-test.infinitus.com.cn/rocketmq/nsaddr4broker-internal</prop>
<prop key="MessageModel">CLUSTERING</prop>
</util:properties>
注意几点:
- 根据不同的环境调整配置,AccessKey,SecretKey,ONSAddr这些相同的配置可以放到macula.properties中,然后通过#{T(org.macula.Configuration).getProperty()}的方式来配置,避免重复。
- maculaEventProducerProperties,maculaClusteredEventConsumerProperties名字是固定的,不能改。
- ProducerId固定是PID_MACULA_EVENT_{环境},如PID_MACULA_EVENT_DEV(开发),PID_MACULA_EVENT_TEST(测试),PID_MACULA_EVENT(生产)等,你要根据环境调整。
- ConsumerId可以由你根据你的应用而定。
- 注意MessageModel一定是CLUSTERING,你也可以不指定,默认就是CLUSTERING。
- 在阿里MQ控制台,把MACULA_CLUSTERED_EVENT_{环境} 这个Topic授权给你的子账号发布、订阅消息,并把你在上面配置中用到的ConsumerId加到该Topic的订阅者里。
- 在你的Java项目里,添加一个类继承ClusteredEvent。不要忘记给这个类添加一个默认的serialVersionUID。
public class UimLogoutEvent extends ClusteredEvent<String> {
private static final long serialVersionUID = 1L;
public UimLogoutEvent(String source) {
super(source)
}
}
- 在需要发送事件的地方,publish event即可,如:
ApplicationContext.publishApplicationEvent(new UimLogoutEvent(loginUsername));
- 在对这个事件感兴趣的项目里实现这个事件的Listener,如:
@Component
public class UimLogoutEventListener implements ApplicationListener<UimLogoutEvent>{
@Autowired
private RedisOperationsSessionRepository sessionRepository;
@Override
public void onApplicationEvent(UimLogoutEvent event) {
Map<String, ?> sessions = sessionRepository.findByIndexNameAndIndexValue(FindByIndexNameSessionRepository.PRINCIPAL_NAME_INDEX_NAME, event.getSource());
if (sessions != null) {
for (String sessionId : sessions.keySet()) {
sessionRepository.delete(sessionId);
}
}
}
}
- MACULA_CLUSTERED_EVENT_{环境}的Topic框架会在相应环境创建好。你需要做的是在macula.properties里根据环境改变MACULA_CLUSTERED_EVENT的后缀(因为我们的阿里MQ只有一个非生产环境)。生产则不用,因为框架里默认的就是MACULA_CLUSTERED _EVENT。
macula.clusteredEventTopic = MACULA_CLUSTERED_EVENT_DEV
使用时要注意什么?
ClusteredEvent使用的Topic是无序消息Topic,而且ClusteredEnvent在框架里ApplicationContext.publishApplicationEvent的时候也是异步的,因此ClusteredEvent不能保证FIFO。所以你不能用它来处理有严格时间先后顺序的逻辑。
ClusteredEvent 的Topic是无序消息Topic,因此不具有事务特性,未来我们会视乎情况看在框架里增加事务事件是否合适和可行。
我只想订阅某个或某些应用发出的ClusteredEvent,怎么办? 在ClusteredEvent的消息里,Message Tag是由AppGroup和AppId构成的,如UIM.UIM、GBSS.edealer,你可以在macula.properties指定下面这个属性的值来过滤消息,只接收你感兴趣的应用发出来的ClusteredEvent。默认不配置情况下是*,会接收所有应用发出的ClusteredEvent。建议配置。
macula.clusteredEventSubscribeTag = UIM.UIM||GBSS.edealer
新创新的CosumerID一定要注意消费点位,按照阿里MQ的手册注意设置,否则新创新的ConsumerID容易把历史消息都消费一遍。
注意消费逻辑的幂等性,AliMQ不能100%保证MQ消息不出现重复发送。
使用ClusteredEvent实现各个应用的UIM单点登出
现时,当用户登出UIM时,UIM会POST一个HTTP请求到所有接入UIM的应用,应用在接收到这个请求后,就可以在本应用中将该用户登出(主要是通过SignleSingoutFilter来处理),亦即所谓的单点登出。我们发现这个方式不是十分可靠,因此我们增加了ClusteredEvent这种方式。
要用这个功能该怎么做?
Macula 3的项目做了上面功能1中的“ClusteredEvent怎么用”的1、2、3三个步骤,就有这个功能了。当然还需要UIM升级配合。
扩展事件广播方式
利用AliMQ增加了一种同组应用的实例间事件广播机制。各个实例间的事件广播机制,在框架中早已有了redis方式的实现,现在增加多一种AliMQ的方式。
要使用这种方式要怎么做?
- 在applicationContext-root.xml中加入类似如下配置。注意MessageModel一定是BROADCASTING,要显式配置,因为默认是CLUSTERING。另外ConsumerId不能与上面订阅ClusteredEvent的ConsumerId相同,因为它们的订阅方式不同,一个是CLUSTERING,一个是BROADCASTING。这是因为AliMQ一个 JVM 中对应的一个 Producer ID/Consumer ID 只能配置一个 Producer/Consumer 实例,还有就是订阅关系一致性的要求。
<util:properties id="maculaBroadcastEventConsumerProperties">
<prop key="ConsumerId">CID_MACULA_PLUGINS_WEBAPP_DEV2</prop>
<prop key="AccessKey">b80b0bcf82234957b72531b1670e090a</prop>
<prop key="SecretKey">o9qjprQL0UwZrmzTmgHZne9fio0=</prop>
<prop key="ONSAddr">http://mq.server-test.infinitus.com.cn/rocketmq/nsaddr4broker-internal</prop>
<prop key="MessageModel">BROADCASTING</prop>
</util:properties>
- 在macula.properties文件中修改以下属性。
macula.events.transport = alimq
- 别忘了在阿里MQ控制台里增加一个无序消息Topic,名称为{AppGroup}BROADCAST_EVENT{环境},如GBSS_BROADCAST_EVENT_DEV,GBSS_BROADCAST_EVENT_TEST,GBSS_BROADCAST_EVENT等,并注意授权给你的子账号订阅、发布。
- 把步骤1中配置中的ConsumerId加到步骤3创建的Topic的订阅者中,把ProducerId PID_MACULA_EVENT_{环境}加到发布者中。
- 在macula.properties里根据环境改变BROADCAST_EVENT的后缀(因为我们的阿里MQ只有一个非生产环境)。生产则不用,因为默认就是BROADCAST_EVENT。
macula.broadcastEventTopic = BROADCAST_EVENT_DEV