Solon

使用分布式事件总线(消息总线)

使用分布式事件进行业务水平扩展(目前适配有:water, rabbitmq, rocketmq, mqtt, kafka)。支持多通道模式,可让不同消息框架并存。此特性可按业务做不同安排,例如:业务消息用 RabbitMQ,IoT消息用 Mqtt。

1、三个特性

  • 可重试守护

消费失败后不断重发确保最终成功。此特性可支持SAGA分布式事务模型,实现最终一致性。事件消费时,注要幂等性控制。

  • 可自动延时

消费失败后会自动延时(目前支持有: water, rabbitmq, rocketmq)

  • 可定时事件

比如,可设定10天后执行

支持情况

适配框架重试守护自动延时定时事件
water支持支持支持
rabbitmq支持支持支持
rocketmq(开源版)支持支持(有自己的策略)半支持(最长2小时)
mqtt支持//
kafka支持//
jedis///

2、失败自动延时策略

失败次数延时
00s
15s
210s
330s
41m(即 1 分钟)
52m
65m
710m
830m
91h(即 1 小时)
n2h

3、示例

发布事件

public class EventController {
    public void onUserRegistered(long user_id) {
        //用户注册完成后,发布一个事件
        //
        CloudClient.event().publish(
                new Event("user.registered", String.format("{\"user_id\":%d}", user_id)));
    }
}

事件订阅与处理

@CloudEvent("user.registered")
public class EventHandlerImpl implements CloudEventHandler {
    @Override
    public boolean handler(Event event) throws Throwable {
        //用户注册完成后,送个金币...
        //
        return true;
    }
}

拦截事件处理(记录消耗时间,日志等...)

@Component
public class EventInterceptorImpl implements CloudEventInterceptor {
    static Logger log = LoggerFactory.getLogger(BaseEventInterceptor.class);

    @Override
    public boolean doIntercept(Event event, CloudEventHandler handler) throws Throwable {
        TagsMDC.tag0("event");
        TagsMDC.tag1(event.topic());

        if (Utils.isNotEmpty(event.tags())) {
            TagsMDC.tag2(event.tags());
        }

        TagsMDC.tag3(event.key());
        Timecount timecount = new Timecount().start();
        long timespan = 0;

        try {
            boolean succeeded = handler.handle(event);
            timespan = timecount.stop().milliseconds();

            if (succeeded) {
                log.info("Event execution succeeded @{}ms", timespan);
                return true;
            } else {
                log.warn("Event execution failed @{}ms", timespan);
                return false;
            }
        } catch (Throwable e) {
            timespan = timecount.stop().milliseconds();

            log.error("Event execution error @{}ms: {}", timespan, e);
            throw e;
        } finally {
            if (timespan > 0) {
                CloudClient.metric().addMeter(Solon.cfg().appName(), "event", event.topic(), timespan);
            }
        }
    }
}

4、增强模式

详见: 《生态 / solon cloud / cloudevent-plus-solon-plugin》

5、代码演示