flow - 事件总线(事件广播、解耦)
1、流事件总线
流事件总线(基于大米事件总线 - DamiBus),是上下文级别的(或实例级别的)。可以为扩展提供“解耦”支持。
支持三种交互接口(具体参考 DamiBus):
- 发送事件: send
- 调用事件:call。要求一个答复(监听时,可以答复一次)
- 流事件:stream。可以有多个答复(监听时,可以答复多次)
支持泛型(因为支持答复,所以可以同时指定 "发" 与 "收" 的类型约束)
context.eventBus()
2、配置应用示例
# flow/f1.chain.yml
id: f1
layout:
- task: |
//发送事件
context.eventBus().send("send.topic1", "hello1");
- task: |
//发送事件并做为请求(要求给答复)//使用泛型
String rst = context.eventBus().<String,String>call("call.topic1", "hello2");
System.out.println(rst);
- task: |
//发送事件并做为请求(要求给答复)//使用泛型 //支持备用处理(如果没有订阅)
String rst = context.eventBus().<String,String>call("call.topic1", "hello2", fallback -> fallback.complete("def"));
System.out.println(rst);
在设计主题时,可以使用“事件类型”作为前缀(方便识别)
3、代码应用示例
public class DemoTest {
@Test
public void case1() throws Exception {
FlowEngine flowEngine = FlowEngine.newInstance();
flowEngine.load("classpath:flow/*.yml");
FlowContext context = FlowContext.of();
//事件监听(即订阅)
context.eventBus().<String>listen("send.topic1", (event) -> { //for send
System.err.println(event.getPayload());
});
//事件监听(即订阅)
context.eventBus().<String,String>listen("call.topic1", (event, data, sink) -> { //for call
System.out.println(data);
sink.complete("ok"); //答复
});
flowEngine.eval("f1", context);
}
}