flow - 上下文详解(FlowContext)
2026年1月13日 下午9:11:33
FlowContext 是流程执行的核心现场。它不仅承载了业务数据(Model),还负责控制流程的流转状态(中断/停止)、记录执行轨迹(Trace)以及处理实例级别的事件通信。
1、作为参数模型(model)提供输入输出支持
上下文作为一个动态参数容器,支持节点间的数据传递和外部输入输出。在 YAML 脚本中,你可以直接操作这些变量。
| 对应字段或方法 | 描述 |
|---|---|
context.model() | 提供数据输入输出支持(在脚本中,可直接使用字段) |
context.put(key, value) | model 的快捷方法 |
context.putIfAbsent(key, value) | model 的快捷方法 |
context.putAll(map) | model 的快捷方法 |
context.computeIfAbsent(key, mappingFunction) | model 的快捷方法 |
context.get(key) | model 的快捷方法(返回 Object) |
context.getAs(key) | model 的快捷方法(返回 T,由接收类型决定)。v3.5.0 后支持 |
context.getOrDefault(key, def) | model 的快捷方法 |
context.remove(key) | model 的快捷方法 |
作用:
- 给流处理输入变量
- 在节点处理之间传递变量(例如:一个节点的处理结果作为变量,后一个节点可使用)
应用示例1:(给流处理输入变量)
#c1.yml
id: c1
layout:
- task: 'if(a > 0){context.put("result", 1);}'
FlowContext context = FlowContext.of();
context.put("a",2);
flowEngine.eval(c1,context);
System.out.println(context.get("result")); //打印执行结果
应用示例2:(在节点处理之间传递变量)
#c1.yml
id: c1
layout:
- task: |
context.put("a", 1); context.put("b", 2);
- task: |
context.put("c", (a + b)); //可以直接使用变量,可者通过 (int)context.get("a") 获取
2、中断或停止控制
您可以根据业务逻辑,在任务执行过程中精确控制流程的去向。
| 对应字段或方法 | 描述 |
|---|---|
context.interrupt() | 中断当前分支 |
context.stop() | 停止执行(再次执行,即为恢复) |
context.isStopped() | 是否已停止(一般用于用于外部观测) |
3、持行跟踪与(快照式)序列化(v3.8.1 后支持)
这是处理长流程的核心能力。通过将上下文序列化,可以实现跨线程、跨时间甚至跨服务器的流程恢复。
| 对应字段或方法 | 描述 |
|---|---|
context.toJson() | 序列化为 json |
FlowContext.fromJson(json) | 从 json 加载上下文 |
context.trace() | 状态跟踪 |
context.lastRecord() | 根图最后运行的记录 |
context.lastNodeId() | 根图最后运行的节点Id |
停止与恢复实战:
// --- 第一阶段:执行并因条件不满足而停止 ---
context.put("isReady", false);
flowEngine.eval(graph, context);
if (context.isStopped()) {
String snapshot = context.toJson(); // 序列化状态并存入数据库
}
// --- 第二阶段:从快照恢复并继续执行 ---
FlowContext restoredCtx = FlowContext.fromJson(snapshot);
restoredCtx.put("isReady", true); // 更新关键条件
flowEngine.eval(graph, restoredCtx); // 引擎会自动从上次停止的节点继续流转
4、广播事件(基于大米事件总线 - DamiBus)
基于 DamiBus 实现,支持在流程执行中进行异步广播或同步调用,实现组件间的极度解耦。
| 对应字段或方法 | 描述 |
|---|---|
context.eventBus() | 事件总线 |
流配置中的应用示例:
id: event1
layout:
- task: '@DemoCom'
- task: 'context.eventBus().send("demo.topic", "hello");'
代码执行中的应用示例:
public class DemoController {
...
public void test(){
FlowContext context = FlowContext.of();
context.eventBus().<String,String>listen("demo.topic", event -> {
System.out.println(event.getContent());
});
flowEngine.eval("event1", context);
}
}
@Component
public static class DemoCom implements TaskComponent {
@Override
public void run(FlowContext context, Node node) throws Throwable {
context.eventBus().send("demo.topic", "hello-com");
}
}
5、FlowContext 接口参考
@Preview("3.0")
public interface FlowContext extends NonSerializable {
static FlowContext of() {
return new FlowContextDefault();
}
static FlowContext of(String instanceId) {
return new FlowContextDefault(instanceId);
}
/**
* 从 json 加载(用于持久化)
*
* @since 3.8.1
*/
static FlowContext fromJson(String json) {
return FlowContextDefault.fromJson(json);
}
/// ////////////
/**
* 转为 json(用于持久化)
*
* @since 3.8.1
*/
String toJson();
/// ////////////
/**
* 然后(装配自己)
*/
default FlowContext then(Consumer<FlowContext> consumer) {
consumer.accept(this);
return this;
}
/// ////////////
/**
* 获取事件总线(based damibus)
*/
DamiBus eventBus();
/// ////////////
/**
* 交换器
*/
@Internal
@Nullable
FlowExchanger exchanger();
/**
* 中断当前分支(如果有其它分支,仍会执行)
*
* @since 3.7
*/
void interrupt();
/**
* 停止执行(即结束运行)
*
* @since 3.7
*/
void stop();
/**
* 是否已停止(用于外部检测)
*
* @since 3.8.1
*/
boolean isStopped();
/// ////////////
/**
* 痕迹
*
* @since 3.8.1
*/
@Preview("3.8.0")
FlowTrace trace();
/**
* 启用痕迹(默认为启用)
*
* @since 3.8.1
*/
@Preview("3.8.0")
FlowContext enableTrace(boolean enable);
/**
* 根图最后运行的节点
*/
@Preview("3.8.0")
@Nullable
NodeRecord lastRecord();
/**
* 根图最后运行的节点Id
*
* @since 3.8.0
*/
@Preview("3.8.0")
@Nullable
String lastNodeId();
/// ////////////
/**
* 数据模型
*/
Map<String, Object> model();
/**
* 获取流实例id
*/
default String getInstanceId() {
return getAs("instanceId");
}
/**
* 临时域变量
*/
<X extends Throwable> void with(String key, Object value, RunnableTx<X> runnable) throws X;
/**
* 推入
*/
default FlowContext put(String key, Object value) {
if (value != null) {
model().put(key, value);
}
return this;
}
/**
* 推入
*/
default FlowContext putIfAbsent(String key, Object value) {
if (value != null) {
model().putIfAbsent(key, value);
}
return this;
}
/**
* 推入全部
*/
default FlowContext putAll(Map<String, Object> model) {
this.model().putAll(model);
return this;
}
/**
* 尝试完成
*/
default <T> T computeIfAbsent(String key, Function<String, T> mappingFunction) {
return (T) model().computeIfAbsent(key, mappingFunction);
}
/**
* 包含 key
*
*/
default boolean containsKey(String key) {
return model().containsKey(key);
}
/**
* 获取
*/
default Object get(String key) {
return model().get(key);
}
/**
* 获取
*/
default <T> T getAs(String key) {
return (T) model().get(key);
}
/**
* 获取或默认
*/
default <T> T getOrDefault(String key, T def) {
return (T) model().getOrDefault(key, def);
}
/**
* 移除
*/
default void remove(String key) {
model().remove(key);
}
}