Solon

使用分布式事件(事件总线)

</> markdown

生态 / Solon Cloud Event [传送]

1、情况简介

使用分布式事件(或事件总线)可实现业务水平扩展、分布式事务效果(目前适配有:local, water, rabbitmq, rocketmq, mqtt, kafka,等)。

  • 主要通过 CloudEventServicePlus 接口进行适配
  • 使用 CloudClient.event() 获取适配实例

2、五个特性

  • 可确认(ack)

支持是否成功消费的确认机制

  • 可重试守护(retry)

消费失败后不断重发确保最终成功。此特性可支持SAGA分布式事务模型,实现最终一致性。事件消费时,注要幂等性控制。

  • 可自动延时

消费失败后会自动延时(目前支持有: local, water, rabbitmq, rocketmq, rocketmq5, aliyun-ons)

  • 可定时事件

比如,可设定10天后执行

  • 可多插件共存(多通道模式)

支持多个插件同时存在,按业务做不同安排。例如:业务消息用 RabbitMQ,IoT消息用 Mqtt,日志用 kafka。

支持情况

适配框架确认与重试守护自动延时定时事件
local支持支持支持
water支持支持支持
rabbitmq支持支持支持
rocketmq(开源版)支持支持(有自己的策略)半支持(最长2小时)
rocketmq5(开源版)支持支持支持(有最长时限)
mqtt支持//
kafka支持//
jedis///

3、失败自动延时策略(不同框架会有不同)

失败次数延时
00s
15s
210s
330s
41m(即 1 分钟)
52m
65m
710m
830m
91h(即 1 小时)
n2h

4、示例

发布事件

public class EventController {
    public void onUserRegistered(long user_id) {
        //用户注册完成后,发布一个事件
        //
        Event event = new Event("user.registered", String.format("{\"user_id\":%d}", user_id))
        CloudClient.event().publish(event);
                
        
        //用户注册完成后,发布一个事件(10天后执行)
        //
        Date eventTime = DateTime.Now().addDay(10);
        Event event = new Event("user.registered", String.format("{\"user_id\":%d}", user_id))
        CloudClient.event().publish(event.scheduled(eventTime));
    }
}

事件订阅与处理

@CloudEvent("user.registered")
public class EventHandlerImpl implements CloudEventHandler {
    @Override
    public boolean handler(Event event) throws Throwable {
        //用户注册完成后,送个金币...
        //
        return true;
    }
}

拦截事件处理(记录消耗时间,日志等...)

@Component
public class EventInterceptorImpl implements CloudEventInterceptor {
    static Logger log = LoggerFactory.getLogger(BaseEventInterceptor.class);

    @Override
    public boolean doIntercept(Event event, CloudEventHandler handler) throws Throwable {
        TagsMDC.tag0("event");
        TagsMDC.tag1(event.topic());

        if (Utils.isNotEmpty(event.tags())) {
            TagsMDC.tag2(event.tags());
        }

        TagsMDC.tag3(event.key());
        Timecount timecount = new Timecount().start();
        long timespan = 0;

        try {
            boolean succeeded = handler.handle(event);
            timespan = timecount.stop().milliseconds();

            if (succeeded) {
                log.info("Event execution succeeded @{}ms", timespan);
                return true;
            } else {
                log.warn("Event execution failed @{}ms", timespan);
                return false;
            }
        } catch (Throwable e) {
            timespan = timecount.stop().milliseconds();

            log.error("Event execution error @{}ms: {}", timespan, e);
            throw e;
        } finally {
            if (timespan > 0) {
                CloudClient.metric().addMeter(Solon.cfg().appName(), "event", event.topic(), timespan);
            }
        }
    }
}

5、增强模式

详见: 《生态 / solon cloud / cloudevent-plus-solon-plugin》

6、多通道模式

详见: demo9039-event_multi_channel2

7、代码演示