Solon v3.1.2

flow - 节点任务异步控制参考

</> markdown

solon-flow 默认是不支持异步的,但它支持 “阻断”, 和 “下一节点” 调用。从而形成异步模式

1、设计异步任务(仅供参考)

定义任务组件增强版基类,支持通过元数据 meta:{async:true} 配置启用异步(同时支持同步或异步)

public abstract class TaskComponentPlus implements TaskComponent {
    @Override
    public void run(FlowContext context, Node node) throws Throwable {
        //通过 meta:{async:true} 来启用
        boolean async = node.metaOrDefault("async", false);

        if (async) {
            //异步时要阻断自动流转(如果要手动下一就,也要阻断)
            context.interrupt();
        
            RunUtil.async(() -> {
                try {
                    doRun(context, node, async);
                } catch (Throwable err) {
                    onError(context, node, err);
                }
            });
        } else {
            doRun(context, node, async);
        }
    }

    protected void onError(FlowContext context, Node node, Throwable err) {
        context.stop();
    }

    protected abstract void doRun(FlowContext context, Node node, boolean async) throws Throwable;
}

应用示例

@Component("a")
public class TaskComponentImpl extends TaskComponentPlus {
    @Override
    protected void doRun(FlowContext context, Node node, boolean async) throws Throwable {
        System.out.println("do...: " + node.id());

        if (async) {
            context.manualNext(node); //手动下一步
        }
    }
}

建议封装个基类出来。

2、链配置(极简模式)

#flow/async_case1.chain.yml 
id: c1
layout:
  - task: "@a"
  - task: "@a"
  - task: "@a"

3、执行测试

@SolonTest
public class AsyncTest {
    @Inject
    FlowEngine flowEngine;

    @Test
    public void case1() throws Throwable {
        flowEngine.eval("c1"); //会分别打印现三个节点的 id
    }
}

4、非 solon 环境参考(以原生 java 环境为例)

public class AsyncTest {
    @Test
    public void case1() throws Throwable {
        MapContainer container = new MapContainer();
        //注册组件
        container.putComponent("a", new TaskComponentImpl());
        
    
        FlowEngine engine = FlowEngine.newInstance();
        //切换驱动器
        engine.register(new SimpleFlowDriver(container));
        
       
        //加载链
        engine.load("classpath:demo/async/async_case1.chain.yml");
        
        //执行 
        engine.eval("c1"); //会分别打印现三个节点的 id

        //因为是异步,所以要阻一下
        System.in.read();
    }
}