Solon v3.0.3

使用分布式事件 - 生产与消费

</> markdown

特性与策略 | 发产与消费 | 多通道示例 | 生态 / Solon Cloud Event [传送]

1、事件生产

  • 发布事件
@Component
public class UserService {
    public void onUserRegistered(long user_id) {
        //用户注册完成后,发布一个事件
        //
        Event event = new Event("user.registered", String.format("{\"user_id\":%d}", user_id));
        CloudClient.event().publish(event);
    }
}
  • 发布定时事件
@Component
public class UserService {
    public void onUserRegistered(long user_id) {
        //用户注册完成后,发布唤醒事件(10天后执行)
        //
        Date eventTime = DateTime.Now().addDay(10);
        Event event = new Event("user.reawakened", String.format("{\"user_id\":%d}", user_id));
        CloudClient.event().publish(event.scheduled(eventTime));
    }
}
  • 发布事务事件(v2.8.0 后支持)

一般一种行为,发布一个事件就差不多了。难保有多个事情的情况,为了确保事件的“原子性”,引入了“事件事务”

@Component
public class UserService {
    //手动管理事务
    public void onUserRegistered(long user_id) {
        EventTran eventTran = CloudClient.event().newTran();
        
        try {
            Event event = new Event("user.registered", String.format("{\"user_id\":%d}", user_id));
            CloudClient.event().publish(event.tran(eventTran));

            Date eventTime = DateTime.Now().addDay(10);
            Event event = new Event("user.reawakened", String.format("{\"user_id\":%d}", user_id));
            CloudClient.event().publish(event.scheduled(eventTime).tran(eventTran));
            
            eventTran.commit();
        } catch (Throwable ex) {
            eventTran.rollback();
        }
    }
    
    //与 JDBC 事务整合
    @Tran
    public void onUserRegistered_demo2(long user_id) {
        EventTran eventTran = CloudClient.event().newTranAndJoin();
        
        Event event = new Event("user.registered", String.format("{\"user_id\":%d}", user_id));
        CloudClient.event().publish(event.tran(eventTran));

        Date eventTime = DateTime.Now().addDay(10);
        Event event = new Event("user.reawakened", String.format("{\"user_id\":%d}", user_id));
        CloudClient.event().publish(event.scheduled(eventTime).tran(eventTran));
    }
}

2、事件消费

  • 事件订阅与处理,并返回 ACK 结果
@CloudEvent("user.registered")
public class EventHandlerImpl implements CloudEventHandler {
    @Override
    public boolean handle(Event event) throws Throwable {
        //用户注册完成后,送个金币...
        //
        return true;
    }
}

3、事件拦截

可用于记录消耗时间,日志等...

@Component
public class EventInterceptorImpl implements CloudEventInterceptor {
    static Logger log = LoggerFactory.getLogger(BaseEventInterceptor.class);

    @Override
    public boolean doIntercept(Event event, CloudEventHandler handler) throws Throwable {
        TagsMDC.tag0("event");
        TagsMDC.tag1(event.topic());

        if (Utils.isNotEmpty(event.tags())) {
            TagsMDC.tag2(event.tags());
        }

        TagsMDC.tag3(event.key());
        Timecount timecount = new Timecount().start();
        long timespan = 0;

        try {
            boolean succeeded = handler.handle(event);
            timespan = timecount.stop().milliseconds();

            if (succeeded) {
                log.info("Event execution succeeded @{}ms", timespan);
                return true;
            } else {
                log.warn("Event execution failed @{}ms", timespan);
                return false;
            }
        } catch (Throwable e) {
            timespan = timecount.stop().milliseconds();

            log.error("Event execution error @{}ms: {}", timespan, e);
            throw e;
        } finally {
            if (timespan > 0) {
                CloudClient.metric().addMeter(Solon.cfg().appName(), "event", event.topic(), timespan);
            }
        }
    }
}

4、拟模真实的场景应用:

我们设计一个用户注册的场景应用:

  • 持久层添加用户记录
  • 注册后发布一个已注册事件;再发布一个10天后触发的已唤醒事件
  • 在已注册事件里,我们给用户送10个金币;再送手机100元冲值
  • 在已唤醒事件里,我们检查用户的活动行为;如果有,再送100个金币(作为奖励);如果没发推送,告知有抽奖

主服务程序,负责主业务:

@Component
public class UserService {
    @Inject
    UserDao userDao;
    
    //用户注册
    @Tran
    public void userRegister(long userId, String name){
        userDao.addUser(userId, name);
        this.onUserRegistered(userId);
    }
    
    //当用户完成注册时(发布事件)
    private void onUserRegistered(long userId) {
        String eventJson = String.format("{\"userId\":%d}", userId);
        Date  eventTime = DateTime.Now().addDay(10);
        
        EventTran eventTran = CloudClient.event().newTranAndJoin();
        
        //发布用户已注册事件
        CloudClient.event().publish(new Event("user.registered", eventJson).tran(eventTran));
        //发布用户已唤醒事件(用于检查用户在10内,有没有活动行为)
        CloudClient.event().publish(new Event("user.reawakened", eventJson).scheduled(eventTime).tran(eventTran));
    }
}

次服务程序,负责辅助业务(也可以合到主服务程序):

@CloudEvent("user.registered")
public class UserRegisteredEventHandler implements CloudEventHandler {
    @Inject
    UserService userService;
    @Inject
    MobileService mobileSerivce;
    
    @Override
    public boolean handle(Event event) throws Throwable {
        long userId = ONode.load(event.context()).get("userId").getLong();
        
        //送10个金币
        userService.addGold(userId, 10);
        
        //送手机充值100块
        String mobie = userService.getMobile(userId);
        mobileSerivce.recharge(mobile, 100);
        
        return true;
    }
}

@CloudEvent("user.reawakened")
public class UserReawakenedEventHandler implements CloudEventHandler {
    @Inject
    UserService userService;
    @Inject
    PushService pushService
    
    @Override
    public boolean handle(Event event) throws Throwable {
        long userId = ONode.load(event.context()).get("userId").getLong();
        
        if (userService.hasLive(userId, 10)) {
            //再送100个金币
            userService.addGold(userId, 100);
        } else {
            //获取设备id
            String duid = userService.getDuid(userId);
            //发布推送
            pushService.push(duid, "有100个金币等你来拿哟...")
        }
        
        return true;
    }
}