Solon v3.9.0

flow - 事件总线(事件广播、解耦)

</> markdown
2026年1月13日 下午8:45:25

1、流事件总线

Solon-Flow 内置了基于大米总线(DamiBus)的事件机制。它是 上下文级别(实例级别) 的,主要用于在流程执行过程中实现逻辑解耦、外部交互及复杂扩展。

支持三种交互接口(具体参考 DamiBus):

模式方法说明
发送模式send纯粹的异步/同步广播,不要求接收方答复。
请求模式call发送事件并要求一个答复(类似 RPC),支持超时与备用处理。
流模式stream发送事件并支持多个答复(持续接收数据流)。

支持泛型(因为支持答复,所以可以同时指定 "发" 与 "收" 的类型约束)

context.eventBus()

为什么使用事件总线?

  • 逻辑解耦:流程定义只管“发信号”,具体的业务处理(如发邮件、存数据库)由外部代码监听处理。
  • 动态扩展:可以在不修改流程 YAML 配置的情况下,通过增加监听器来改变流程的行为。
  • 类型安全:借助 DamiBus 的强类型支持,避免了传统事件总线中常见的 Object 类型转换风险。

2、编排配置示例 (YAML)

在流程定义中,可以通过脚本直接触发事件,将繁重的业务逻辑交给外部订阅者处理。

# flow/f1.yml
id: f1
layout:
  - task: |
      //发送事件
      context.eventBus().send("send.topic1", "hello1");
  - task: |
      //发送事件并做为请求(要求给答复)//使用泛型
      String rst = context.eventBus().<String, String>call("call.topic1", "hello2").get(); 
      System.out.println(rst);
  - task: |
      //发送事件并做为请求(要求给答复)//使用泛型 //支持备用处理(如果没有订阅)
      String rst = context.eventBus().<String, String>call("call.topic1", "hello2", fallback -> fallback.complete("def")).get(); 
      System.out.println(rst);

3、宿主代码订阅示例 (Java)

在启动流程前,可以通过上下文监听感兴趣的主题,从而实现流程对外部环境的“回调”。

public class DemoTest {
    @Test
    public void case1() throws Exception {
        FlowEngine flowEngine = FlowEngine.newInstance();
        flowEngine.load("classpath:flow/*.yml");

        FlowContext context = FlowContext.of();
        
        //事件监听(即订阅)
        context.eventBus().<String>listen("send.topic1", (event) -> { //for send
            System.err.println(event.getPayload());
        });
        
        //事件监听(即订阅)
        context.eventBus().<String,String>listen("call.topic1", (event, data, sink) -> { //for call
            System.out.println(data);
            
            sink.complete("ok"); //答复
        });

        flowEngine.eval("f1", context);
    }
}