使用分布式事件(事件总线)
1、情况简介
使用分布式事件(或事件总线)可实现业务水平扩展、分布式事务效果(目前适配有:local, water, rabbitmq, rocketmq, mqtt, kafka,等)。
- 主要通过 CloudEventServicePlus 接口进行适配
- 使用 CloudClient.event() 获取适配实例
2、五个特性
- 可确认(ack)
支持是否成功消费的确认机制
- 可重试守护(retry)
消费失败后不断重发确保最终成功。此特性可支持SAGA分布式事务模型,实现最终一致性。事件消费时,注要幂等性控制。
- 可自动延时
消费失败后会自动延时(目前支持有: local, water, rabbitmq, rocketmq, rocketmq5, aliyun-ons)
- 可定时事件
比如,可设定10天后执行
- 可多插件共存(多通道模式)
支持多个插件同时存在,按业务做不同安排。例如:业务消息用 RabbitMQ,IoT消息用 Mqtt,日志用 kafka。
支持情况
适配框架 | 确认与重试守护 | 自动延时 | 定时事件 |
---|---|---|---|
local | 支持 | 支持 | 支持 |
water | 支持 | 支持 | 支持 |
rabbitmq | 支持 | 支持 | 支持 |
rocketmq(开源版) | 支持 | 支持(有自己的策略) | 半支持(最长2小时) |
rocketmq5(开源版) | 支持 | 支持 | 支持(有最长时限) |
mqtt | 支持 | / | / |
kafka | 支持 | / | / |
jedis | / | / | / |
3、失败自动延时策略(不同框架会有不同)
失败次数 | 延时 |
---|---|
0 | 0s |
1 | 5s |
2 | 10s |
3 | 30s |
4 | 1m(即 1 分钟) |
5 | 2m |
6 | 5m |
7 | 10m |
8 | 30m |
9 | 1h(即 1 小时) |
n | 2h |
4、示例
发布事件
public class EventController {
public void onUserRegistered(long user_id) {
//用户注册完成后,发布一个事件
//
Event event = new Event("user.registered", String.format("{\"user_id\":%d}", user_id))
CloudClient.event().publish(event);
//用户注册完成后,发布一个事件(10天后执行)
//
Date eventTime = DateTime.Now().addDay(10);
Event event = new Event("user.registered", String.format("{\"user_id\":%d}", user_id))
CloudClient.event().publish(event.scheduled(eventTime));
}
}
事件订阅与处理
@CloudEvent("user.registered")
public class EventHandlerImpl implements CloudEventHandler {
@Override
public boolean handler(Event event) throws Throwable {
//用户注册完成后,送个金币...
//
return true;
}
}
拦截事件处理(记录消耗时间,日志等...)
@Component
public class EventInterceptorImpl implements CloudEventInterceptor {
static Logger log = LoggerFactory.getLogger(BaseEventInterceptor.class);
@Override
public boolean doIntercept(Event event, CloudEventHandler handler) throws Throwable {
TagsMDC.tag0("event");
TagsMDC.tag1(event.topic());
if (Utils.isNotEmpty(event.tags())) {
TagsMDC.tag2(event.tags());
}
TagsMDC.tag3(event.key());
Timecount timecount = new Timecount().start();
long timespan = 0;
try {
boolean succeeded = handler.handle(event);
timespan = timecount.stop().milliseconds();
if (succeeded) {
log.info("Event execution succeeded @{}ms", timespan);
return true;
} else {
log.warn("Event execution failed @{}ms", timespan);
return false;
}
} catch (Throwable e) {
timespan = timecount.stop().milliseconds();
log.error("Event execution error @{}ms: {}", timespan, e);
throw e;
} finally {
if (timespan > 0) {
CloudClient.metric().addMeter(Solon.cfg().appName(), "event", event.topic(), timespan);
}
}
}
}
5、增强模式
详见: 《生态 / solon cloud / cloudevent-plus-solon-plugin》
6、多通道模式
详见: demo9039-event_multi_channel2