flow - 事件总线(事件广播、解耦)
2026年1月13日 下午8:45:25
1、流事件总线
Solon-Flow 内置了基于大米总线(DamiBus)的事件机制。它是 上下文级别(实例级别) 的,主要用于在流程执行过程中实现逻辑解耦、外部交互及复杂扩展。
支持三种交互接口(具体参考 DamiBus):
| 模式 | 方法 | 说明 |
|---|---|---|
| 发送模式 | send | 纯粹的异步/同步广播,不要求接收方答复。 |
| 请求模式 | call | 发送事件并要求一个答复(类似 RPC),支持超时与备用处理。 |
| 流模式 | stream | 发送事件并支持多个答复(持续接收数据流)。 |
支持泛型(因为支持答复,所以可以同时指定 "发" 与 "收" 的类型约束)
context.eventBus()
为什么使用事件总线?
- 逻辑解耦:流程定义只管“发信号”,具体的业务处理(如发邮件、存数据库)由外部代码监听处理。
- 动态扩展:可以在不修改流程 YAML 配置的情况下,通过增加监听器来改变流程的行为。
- 类型安全:借助 DamiBus 的强类型支持,避免了传统事件总线中常见的 Object 类型转换风险。
2、编排配置示例 (YAML)
在流程定义中,可以通过脚本直接触发事件,将繁重的业务逻辑交给外部订阅者处理。
# flow/f1.yml
id: f1
layout:
- task: |
//发送事件
context.eventBus().send("send.topic1", "hello1");
- task: |
//发送事件并做为请求(要求给答复)//使用泛型
String rst = context.eventBus().<String, String>call("call.topic1", "hello2").get();
System.out.println(rst);
- task: |
//发送事件并做为请求(要求给答复)//使用泛型 //支持备用处理(如果没有订阅)
String rst = context.eventBus().<String, String>call("call.topic1", "hello2", fallback -> fallback.complete("def")).get();
System.out.println(rst);
3、宿主代码订阅示例 (Java)
在启动流程前,可以通过上下文监听感兴趣的主题,从而实现流程对外部环境的“回调”。
public class DemoTest {
@Test
public void case1() throws Exception {
FlowEngine flowEngine = FlowEngine.newInstance();
flowEngine.load("classpath:flow/*.yml");
FlowContext context = FlowContext.of();
//事件监听(即订阅)
context.eventBus().<String>listen("send.topic1", (event) -> { //for send
System.err.println(event.getPayload());
});
//事件监听(即订阅)
context.eventBus().<String,String>listen("call.topic1", (event, data, sink) -> { //for call
System.out.println(data);
sink.complete("ok"); //答复
});
flowEngine.eval("f1", context);
}
}