Solon v3.8.0

flow - 上下文及持久化(FlowContext)

</> markdown
2025年12月29日 下午5:16:22

流上下文(FlowContext),主要为流程执行提供:参数模型、节点跟踪、序列化、广播事件等支持。

1、作为参数模型(model)提供输入输出支持

对应字段或方法描述
context.model()提供数据输入输出支持(在脚本中,可直接使用字段)
context.put(key, value)model 的快捷方法
context.putIfAbsent(key, value)model 的快捷方法
context.putAll(map)model 的快捷方法
context.computeIfAbsent(key, mappingFunction)model 的快捷方法
context.get(key)model 的快捷方法(返回 Object)
context.getAs(key)model 的快捷方法(返回 T,由接收类型决定)。v3.5.0 后支持
context.getOrDefault(key, def)model 的快捷方法
context.remove(key)model 的快捷方法

作用:

  • 给流处理输入变量
  • 在节点处理之间传递变量(例如:一个节点的处理结果作为变量,后一个节点可使用)

应用示例1:(给流处理输入变量)

#c1.yml
id: c1
layout:
  - task: 'if(a > 0){context.put("result", 1);}'
FlowContext context = FlowContext.of();
context.put("a",2);

flowEngine.eval(c1,context);
System.out.println(context.get("result")); //打印执行结果

应用示例2:(在节点处理之间传递变量)

#c1.yml
id: c1
layout:
  - task: |
      context.put("a", 1); context.put("b", 2);
  - task: |
      context.put("c", (a + b)); //可以直接使用变量,可者通过 (int)context.get("a") 获取

2、中断与停止

对应字段或方法描述
context.interrupt()中断当前分支
context.stop()停止执行(再次执行,即为恢复)

3、序列化,和持久化支持

序列化方法 v3.8.1 后支持。3.8.1 之前可以直接 copy 代码

对应字段或方法描述
context.toJson()序列化为 json
FlowContext.fromJson(json)从 json 加载上下文

使用示例:

//恢复上下文
FlowContext context = FlowContext.fromJson(json);

//从恢复上下文开始持行
flowEngine.eval(graphId, context.lastNodeId(), context);

//转为 json(方便持久化)
json = context.toJson();

4、广播事件(基于大米事件总线 - DamiBus

基于流上下文(FlowContext)实例级的事件总线。可以在流执行中广播事件

对应字段或方法描述
context.eventBus()事件总线

流配置中的应用示例:

id: event1
layout:
  - task: '@DemoCom'
  - task: 'context.eventBus().send("demo.topic", "hello");'

代码执行中的应用示例:

public class DemoController {
    ...
    public void test(){
        FlowContext context = FlowContext.of();
        context.eventBus().<String,String>listen("demo.topic", event -> {
            System.out.println(event.getContent());
        });

        flowEngine.eval("event1", context);
    }
}

 @Component
public static class DemoCom implements TaskComponent {

    @Override
    public void run(FlowContext context, Node node) throws Throwable {
        context.eventBus().send("demo.topic", "hello-com");
    }
}

5、FlowContext 接口参考

@Preview("3.0")
public interface FlowContext {
    static FlowContext of() {
        return new FlowContextDefault();
    }

    static FlowContext of(String instanceId) {
        return new FlowContextDefault(instanceId);
    }

    /**
     * 从 json 加载(用于持久化)
     *
     * @since 3.8.1
     */
    static FlowContext fromJson(String json) {
        return FlowContextDefault.fromJson(json);
    }

    /// ////////////

    /**
     * 转为 json(用于持久化)
     *
     * @since 3.8.1
     */
    String toJson();


    /// ////////////

    /**
     * 获取事件总线(based damibus)
     */
    DamiBus eventBus();

    /**
     * 中断当前分支(如果有其它分支,仍会执行)
     *
     * @since 3.7
     */
    void interrupt();

    /**
     * 停止执行(即结束运行)
     *
     * @since 3.7
     */
    void stop();

    /// ////////////

    /**
     * 最后运行的节点
     */
    @Preview("3.8.0")
    @Nullable
    NodeTrace lastNode();

    /**
     * 最后运行的节点Id
     *
     * @since 3.8.0
     */
    @Preview("3.8.0")
    @Nullable
    default String lastNodeId() {
        if (lastNode() != null) {
            return lastNode().getId();
        }

        return null;
    }

    /// ////////////

    /**
     * 数据模型
     */
    Map<String, Object> model();

    /**
     * 获取流实例id
     */
    default String getInstanceId() {
        return getAs("instanceId");
    }


    /**
     * 推入
     */
    default FlowContext put(String key, Object value) {
        if (value != null) {
            model().put(key, value);
        }
        return this;
    }

    /**
     * 推入
     */
    default FlowContext putIfAbsent(String key, Object value) {
        if (value != null) {
            model().putIfAbsent(key, value);
        }
        return this;
    }

    /**
     * 推入全部
     */
    default FlowContext putAll(Map<String, Object> model) {
        this.model().putAll(model);
        return this;
    }

    /**
     * 尝试完成
     */
    default <T> T computeIfAbsent(String key, Function<String, T> mappingFunction) {
        return (T) model().computeIfAbsent(key, mappingFunction);
    }

    /**
     * 获取
     */
    default Object get(String key) {
        return model().get(key);
    }

    /**
     * 获取
     */
    default <T> T getAs(String key) {
        return (T) model().get(key);
    }

    /**
     * 获取或默认
     */
    default <T> T getOrDefault(String key, T def) {
        return (T) model().getOrDefault(key, def);
    }

    /**
     * 移除
     */
    default void remove(String key) {
        model().remove(key);
    }
}