Solon v3.7.2

flow - 事件总线(事件广播、解耦)

</> markdown

1、流事件总线

流事件总线(基于大米事件总线 - DamiBus),是上下文级别的(或实例级别的)。可以为扩展提供“解耦”支持。

支持三种交互接口(具体参考 DamiBus):

  • 发送事件: send
  • 调用事件:call。要求一个答复(监听时,可以答复一次)
  • 流事件:stream。可以有多个答复(监听时,可以答复多次)

支持泛型(因为支持答复,所以可以同时指定 "发" 与 "收" 的类型约束)

context.eventBus()

2、配置应用示例

# flow/f1.chain.yml
id: f1
layout:
  - task: |
      //发送事件
      context.eventBus().send("send.topic1", "hello1");
  - task: |
      //发送事件并做为请求(要求给答复)//使用泛型
      String rst = context.eventBus().<String,String>call("call.topic1", "hello2"); 
      System.out.println(rst);
  - task: |
      //发送事件并做为请求(要求给答复)//使用泛型 //支持备用处理(如果没有订阅)
      String rst = context.eventBus().<String,String>call("call.topic1", "hello2", fallback -> fallback.complete("def")); 
      System.out.println(rst);

在设计主题时,可以使用“事件类型”作为前缀(方便识别)

3、代码应用示例

public class DemoTest {
    @Test
    public void case1() throws Exception {
        FlowEngine flowEngine = FlowEngine.newInstance();
        flowEngine.load("classpath:flow/*.yml");

        FlowContext context = FlowContext.of();
        
        //事件监听(即订阅)
        context.eventBus().<String>listen("send.topic1", (event) -> { //for send
            System.err.println(event.getPayload());
        });
        
        //事件监听(即订阅)
        context.eventBus().<String,String>listen("call.topic1", (event, data, sink) -> { //for call
            System.out.println(data);
            
            sink.complete("ok"); //答复
        });

        flowEngine.eval("f1", context);
    }
}