使用分布式事件 - 生产与消费
特性与策略 | 发产与消费 | 多通道示例 | 生态 / Solon Cloud Event [传送]
1、事件生产
- 发布事件
@Component
public class UserService {
public void onUserRegistered(long user_id) {
//用户注册完成后,发布一个事件
//
Event event = new Event("user.registered", String.format("{\"user_id\":%d}", user_id));
CloudClient.event().publish(event);
}
}
- 发布定时事件
@Component
public class UserService {
public void onUserRegistered(long user_id) {
//用户注册完成后,发布唤醒事件(10天后执行)
//
Date eventTime = DateTime.Now().addDay(10);
Event event = new Event("user.reawakened", String.format("{\"user_id\":%d}", user_id));
CloudClient.event().publish(event.scheduled(eventTime));
}
}
- 发布事务事件(v2.8.0 后支持)
一般一种行为,发布一个事件就差不多了。难保有多个事情的情况,为了确保事件的“原子性”,引入了“事件事务”
@Component
public class UserService {
//手动管理事务
public void onUserRegistered(long user_id) {
EventTran eventTran = CloudClient.event().newTran();
try {
Event event = new Event("user.registered", String.format("{\"user_id\":%d}", user_id));
CloudClient.event().publish(event.tran(eventTran));
Date eventTime = DateTime.Now().addDay(10);
Event event = new Event("user.reawakened", String.format("{\"user_id\":%d}", user_id));
CloudClient.event().publish(event.scheduled(eventTime).tran(eventTran));
eventTran.commit();
} catch (Throwable ex) {
eventTran.rollback();
}
}
//与 JDBC 事务整合
@Tran
public void onUserRegistered_demo2(long user_id) {
EventTran eventTran = CloudClient.event().newTranAndJoin();
Event event = new Event("user.registered", String.format("{\"user_id\":%d}", user_id));
CloudClient.event().publish(event.tran(eventTran));
Date eventTime = DateTime.Now().addDay(10);
Event event = new Event("user.reawakened", String.format("{\"user_id\":%d}", user_id));
CloudClient.event().publish(event.scheduled(eventTime).tran(eventTran));
}
}
2、事件消费
- 事件订阅与处理,并返回 ACK 结果
@CloudEvent("user.registered")
public class EventHandlerImpl implements CloudEventHandler {
@Override
public boolean handle(Event event) throws Throwable {
//用户注册完成后,送个金币...
//
return true;
}
}
3、事件拦截
可用于记录消耗时间,日志等...
@Component
public class EventInterceptorImpl implements CloudEventInterceptor {
static Logger log = LoggerFactory.getLogger(BaseEventInterceptor.class);
@Override
public boolean doIntercept(Event event, CloudEventHandler handler) throws Throwable {
TagsMDC.tag0("event");
TagsMDC.tag1(event.topic());
if (Utils.isNotEmpty(event.tags())) {
TagsMDC.tag2(event.tags());
}
TagsMDC.tag3(event.key());
Timecount timecount = new Timecount().start();
long timespan = 0;
try {
boolean succeeded = handler.handle(event);
timespan = timecount.stop().milliseconds();
if (succeeded) {
log.info("Event execution succeeded @{}ms", timespan);
return true;
} else {
log.warn("Event execution failed @{}ms", timespan);
return false;
}
} catch (Throwable e) {
timespan = timecount.stop().milliseconds();
log.error("Event execution error @{}ms: {}", timespan, e);
throw e;
} finally {
if (timespan > 0) {
CloudClient.metric().addMeter(Solon.cfg().appName(), "event", event.topic(), timespan);
}
}
}
}
4、拟模真实的场景应用:
我们设计一个用户注册的场景应用:
- 持久层添加用户记录
- 注册后发布一个已注册事件;再发布一个10天后触发的已唤醒事件
- 在已注册事件里,我们给用户送10个金币;再送手机100元冲值
- 在已唤醒事件里,我们检查用户的活动行为;如果有,再送100个金币(作为奖励);如果没发推送,告知有抽奖
主服务程序,负责主业务:
@Component
public class UserService {
@Inject
UserDao userDao;
//用户注册
@Tran
public void userRegister(long userId, String name){
userDao.addUser(userId, name);
this.onUserRegistered(userId);
}
//当用户完成注册时(发布事件)
private void onUserRegistered(long userId) {
String eventJson = String.format("{\"userId\":%d}", userId);
Date eventTime = DateTime.Now().addDay(10);
EventTran eventTran = CloudClient.event().newTranAndJoin();
//发布用户已注册事件
CloudClient.event().publish(new Event("user.registered", eventJson).tran(eventTran));
//发布用户已唤醒事件(用于检查用户在10内,有没有活动行为)
CloudClient.event().publish(new Event("user.reawakened", eventJson).scheduled(eventTime).tran(eventTran));
}
}
次服务程序,负责辅助业务(也可以合到主服务程序):
@CloudEvent("user.registered")
public class UserRegisteredEventHandler implements CloudEventHandler {
@Inject
UserService userService;
@Inject
MobileService mobileSerivce;
@Override
public boolean handle(Event event) throws Throwable {
long userId = ONode.load(event.context()).get("userId").getLong();
//送10个金币
userService.addGold(userId, 10);
//送手机充值100块
String mobie = userService.getMobile(userId);
mobileSerivce.recharge(mobile, 100);
return true;
}
}
@CloudEvent("user.reawakened")
public class UserReawakenedEventHandler implements CloudEventHandler {
@Inject
UserService userService;
@Inject
PushService pushService
@Override
public boolean handle(Event event) throws Throwable {
long userId = ONode.load(event.context()).get("userId").getLong();
if (userService.hasLive(userId, 10)) {
//再送100个金币
userService.addGold(userId, 100);
} else {
//获取设备id
String duid = userService.getDuid(userId);
//发布推送
pushService.push(duid, "有100个金币等你来拿哟...")
}
return true;
}
}