flow - 上下文及持久化(FlowContext)
2025年12月29日 下午5:16:22
流上下文(FlowContext),主要为流程执行提供:参数模型、节点跟踪、序列化、广播事件等支持。
1、作为参数模型(model)提供输入输出支持
| 对应字段或方法 | 描述 |
|---|---|
context.model() | 提供数据输入输出支持(在脚本中,可直接使用字段) |
context.put(key, value) | model 的快捷方法 |
context.putIfAbsent(key, value) | model 的快捷方法 |
context.putAll(map) | model 的快捷方法 |
context.computeIfAbsent(key, mappingFunction) | model 的快捷方法 |
context.get(key) | model 的快捷方法(返回 Object) |
context.getAs(key) | model 的快捷方法(返回 T,由接收类型决定)。v3.5.0 后支持 |
context.getOrDefault(key, def) | model 的快捷方法 |
context.remove(key) | model 的快捷方法 |
作用:
- 给流处理输入变量
- 在节点处理之间传递变量(例如:一个节点的处理结果作为变量,后一个节点可使用)
应用示例1:(给流处理输入变量)
#c1.yml
id: c1
layout:
- task: 'if(a > 0){context.put("result", 1);}'
FlowContext context = FlowContext.of();
context.put("a",2);
flowEngine.eval(c1,context);
System.out.println(context.get("result")); //打印执行结果
应用示例2:(在节点处理之间传递变量)
#c1.yml
id: c1
layout:
- task: |
context.put("a", 1); context.put("b", 2);
- task: |
context.put("c", (a + b)); //可以直接使用变量,可者通过 (int)context.get("a") 获取
2、中断与停止
| 对应字段或方法 | 描述 |
|---|---|
context.interrupt() | 中断当前分支 |
context.stop() | 停止执行(再次执行,即为恢复) |
3、序列化,和持久化支持
序列化方法 v3.8.1 后支持。3.8.1 之前可以直接 copy 代码
| 对应字段或方法 | 描述 |
|---|---|
context.toJson() | 序列化为 json |
FlowContext.fromJson(json) | 从 json 加载上下文 |
使用示例:
//恢复上下文
FlowContext context = FlowContext.fromJson(json);
//从恢复上下文开始持行
flowEngine.eval(graphId, context.lastNodeId(), context);
//转为 json(方便持久化)
json = context.toJson();
4、广播事件(基于大米事件总线 - DamiBus)
基于流上下文(FlowContext)实例级的事件总线。可以在流执行中广播事件
| 对应字段或方法 | 描述 |
|---|---|
context.eventBus() | 事件总线 |
流配置中的应用示例:
id: event1
layout:
- task: '@DemoCom'
- task: 'context.eventBus().send("demo.topic", "hello");'
代码执行中的应用示例:
public class DemoController {
...
public void test(){
FlowContext context = FlowContext.of();
context.eventBus().<String,String>listen("demo.topic", event -> {
System.out.println(event.getContent());
});
flowEngine.eval("event1", context);
}
}
@Component
public static class DemoCom implements TaskComponent {
@Override
public void run(FlowContext context, Node node) throws Throwable {
context.eventBus().send("demo.topic", "hello-com");
}
}
5、FlowContext 接口参考
@Preview("3.0")
public interface FlowContext {
static FlowContext of() {
return new FlowContextDefault();
}
static FlowContext of(String instanceId) {
return new FlowContextDefault(instanceId);
}
/**
* 从 json 加载(用于持久化)
*
* @since 3.8.1
*/
static FlowContext fromJson(String json) {
return FlowContextDefault.fromJson(json);
}
/// ////////////
/**
* 转为 json(用于持久化)
*
* @since 3.8.1
*/
String toJson();
/// ////////////
/**
* 获取事件总线(based damibus)
*/
DamiBus eventBus();
/**
* 中断当前分支(如果有其它分支,仍会执行)
*
* @since 3.7
*/
void interrupt();
/**
* 停止执行(即结束运行)
*
* @since 3.7
*/
void stop();
/// ////////////
/**
* 最后运行的节点
*/
@Preview("3.8.0")
@Nullable
NodeTrace lastNode();
/**
* 最后运行的节点Id
*
* @since 3.8.0
*/
@Preview("3.8.0")
@Nullable
default String lastNodeId() {
if (lastNode() != null) {
return lastNode().getId();
}
return null;
}
/// ////////////
/**
* 数据模型
*/
Map<String, Object> model();
/**
* 获取流实例id
*/
default String getInstanceId() {
return getAs("instanceId");
}
/**
* 推入
*/
default FlowContext put(String key, Object value) {
if (value != null) {
model().put(key, value);
}
return this;
}
/**
* 推入
*/
default FlowContext putIfAbsent(String key, Object value) {
if (value != null) {
model().putIfAbsent(key, value);
}
return this;
}
/**
* 推入全部
*/
default FlowContext putAll(Map<String, Object> model) {
this.model().putAll(model);
return this;
}
/**
* 尝试完成
*/
default <T> T computeIfAbsent(String key, Function<String, T> mappingFunction) {
return (T) model().computeIfAbsent(key, mappingFunction);
}
/**
* 获取
*/
default Object get(String key) {
return model().get(key);
}
/**
* 获取
*/
default <T> T getAs(String key) {
return (T) model().get(key);
}
/**
* 获取或默认
*/
default <T> T getOrDefault(String key, T def) {
return (T) model().getOrDefault(key, def);
}
/**
* 移除
*/
default void remove(String key) {
model().remove(key);
}
}