Solon v3.9.0

flow - 上下文详解(FlowContext)

</> markdown
2026年1月13日 下午9:11:33

FlowContext 是流程执行的核心现场。它不仅承载了业务数据(Model),还负责控制流程的流转状态(中断/停止)、记录执行轨迹(Trace)以及处理实例级别的事件通信。

1、作为参数模型(model)提供输入输出支持

上下文作为一个动态参数容器,支持节点间的数据传递和外部输入输出。在 YAML 脚本中,你可以直接操作这些变量。

对应字段或方法描述
context.model()提供数据输入输出支持(在脚本中,可直接使用字段)
context.put(key, value)model 的快捷方法
context.putIfAbsent(key, value)model 的快捷方法
context.putAll(map)model 的快捷方法
context.computeIfAbsent(key, mappingFunction)model 的快捷方法
context.get(key)model 的快捷方法(返回 Object)
context.getAs(key)model 的快捷方法(返回 T,由接收类型决定)。v3.5.0 后支持
context.getOrDefault(key, def)model 的快捷方法
context.remove(key)model 的快捷方法

作用:

  • 给流处理输入变量
  • 在节点处理之间传递变量(例如:一个节点的处理结果作为变量,后一个节点可使用)

应用示例1:(给流处理输入变量)

#c1.yml
id: c1
layout:
  - task: 'if(a > 0){context.put("result", 1);}'
FlowContext context = FlowContext.of();
context.put("a",2);

flowEngine.eval(c1,context);
System.out.println(context.get("result")); //打印执行结果

应用示例2:(在节点处理之间传递变量)

#c1.yml
id: c1
layout:
  - task: |
      context.put("a", 1); context.put("b", 2);
  - task: |
      context.put("c", (a + b)); //可以直接使用变量,可者通过 (int)context.get("a") 获取

2、中断或停止控制

您可以根据业务逻辑,在任务执行过程中精确控制流程的去向。

对应字段或方法描述
context.interrupt()中断当前分支
context.stop()停止执行(再次执行,即为恢复)
context.isStopped()是否已停止(一般用于用于外部观测)

3、持行跟踪与(快照式)序列化(v3.8.1 后支持)

这是处理长流程的核心能力。通过将上下文序列化,可以实现跨线程、跨时间甚至跨服务器的流程恢复。

对应字段或方法描述
context.toJson()序列化为 json
FlowContext.fromJson(json)从 json 加载上下文
context.trace()状态跟踪
context.lastRecord()根图最后运行的记录
context.lastNodeId()根图最后运行的节点Id

停止与恢复实战:

// --- 第一阶段:执行并因条件不满足而停止 ---
context.put("isReady", false);
flowEngine.eval(graph, context); 

if (context.isStopped()) {
    String snapshot = context.toJson(); // 序列化状态并存入数据库
}

// --- 第二阶段:从快照恢复并继续执行 ---
FlowContext restoredCtx = FlowContext.fromJson(snapshot); 
restoredCtx.put("isReady", true); // 更新关键条件
flowEngine.eval(graph, restoredCtx); // 引擎会自动从上次停止的节点继续流转

4、广播事件(基于大米事件总线 - DamiBus

基于 DamiBus 实现,支持在流程执行中进行异步广播或同步调用,实现组件间的极度解耦。

对应字段或方法描述
context.eventBus()事件总线

流配置中的应用示例:

id: event1
layout:
  - task: '@DemoCom'
  - task: 'context.eventBus().send("demo.topic", "hello");'

代码执行中的应用示例:

public class DemoController {
    ...
    public void test(){
        FlowContext context = FlowContext.of();
        context.eventBus().<String,String>listen("demo.topic", event -> {
            System.out.println(event.getContent());
        });

        flowEngine.eval("event1", context);
    }
}

 @Component
public static class DemoCom implements TaskComponent {

    @Override
    public void run(FlowContext context, Node node) throws Throwable {
        context.eventBus().send("demo.topic", "hello-com");
    }
}

5、FlowContext 接口参考

@Preview("3.0")
public interface FlowContext extends NonSerializable {
    static FlowContext of() {
        return new FlowContextDefault();
    }

    static FlowContext of(String instanceId) {
        return new FlowContextDefault(instanceId);
    }

    /**
     * 从 json 加载(用于持久化)
     *
     * @since 3.8.1
     */
    static FlowContext fromJson(String json) {
        return FlowContextDefault.fromJson(json);
    }


    /// ////////////

    /**
     * 转为 json(用于持久化)
     *
     * @since 3.8.1
     */
    String toJson();

    /// ////////////

    /**
     * 然后(装配自己)
     */
    default FlowContext then(Consumer<FlowContext> consumer) {
        consumer.accept(this);
        return this;
    }


    /// ////////////

    /**
     * 获取事件总线(based damibus)
     */
    DamiBus eventBus();

    /// ////////////
    /**
     * 交换器
     */
    @Internal
    @Nullable
    FlowExchanger exchanger();

    /**
     * 中断当前分支(如果有其它分支,仍会执行)
     *
     * @since 3.7
     */
    void interrupt();

    /**
     * 停止执行(即结束运行)
     *
     * @since 3.7
     */
    void stop();

    /**
     * 是否已停止(用于外部检测)
     *
     * @since 3.8.1
     */
    boolean isStopped();


    /// ////////////

    /**
     * 痕迹
     *
     * @since 3.8.1
     */
    @Preview("3.8.0")
    FlowTrace trace();

    /**
     * 启用痕迹(默认为启用)
     *
     * @since 3.8.1
     */
    @Preview("3.8.0")
    FlowContext enableTrace(boolean enable);

    /**
     * 根图最后运行的节点
     */
    @Preview("3.8.0")
    @Nullable
    NodeRecord lastRecord();

    /**
     * 根图最后运行的节点Id
     *
     * @since 3.8.0
     */
    @Preview("3.8.0")
    @Nullable
    String lastNodeId();

    /// ////////////

    /**
     * 数据模型
     */
    Map<String, Object> model();

    /**
     * 获取流实例id
     */
    default String getInstanceId() {
        return getAs("instanceId");
    }

    /**
     * 临时域变量
     */
    <X extends Throwable> void with(String key, Object value, RunnableTx<X> runnable) throws X;


    /**
     * 推入
     */
    default FlowContext put(String key, Object value) {
        if (value != null) {
            model().put(key, value);
        }
        return this;
    }

    /**
     * 推入
     */
    default FlowContext putIfAbsent(String key, Object value) {
        if (value != null) {
            model().putIfAbsent(key, value);
        }
        return this;
    }

    /**
     * 推入全部
     */
    default FlowContext putAll(Map<String, Object> model) {
        this.model().putAll(model);
        return this;
    }

    /**
     * 尝试完成
     */
    default <T> T computeIfAbsent(String key, Function<String, T> mappingFunction) {
        return (T) model().computeIfAbsent(key, mappingFunction);
    }

    /**
     * 包含 key
     *
     */
    default boolean containsKey(String key) {
        return model().containsKey(key);
    }

    /**
     * 获取
     */
    default Object get(String key) {
        return model().get(key);
    }

    /**
     * 获取
     */
    default <T> T getAs(String key) {
        return (T) model().get(key);
    }

    /**
     * 获取或默认
     */
    default <T> T getOrDefault(String key, T def) {
        return (T) model().getOrDefault(key, def);
    }

    /**
     * 移除
     */
    default void remove(String key) {
        model().remove(key);
    }
}