flow - 节点任务异步控制参考
solon-flow 默认是不支持异步的,但它支持 “阻断”, 和 “下一节点” 调用。从而形成异步模式
1、设计异步任务(仅供参考)
定义任务组件增强版基类,支持通过元数据 meta:{async:true}
配置启用异步(同时支持同步或异步)
public abstract class TaskComponentPlus implements TaskComponent {
@Override
public void run(FlowContext context, Node node) throws Throwable {
//通过 meta:{async:true} 来启用
boolean async = node.metaOrDefault("async", false);
if (async) {
//异步时要阻断自动流转(如果要手动下一就,也要阻断)
context.interrupt();
RunUtil.async(() -> {
try {
doRun(context, node, async);
} catch (Throwable err) {
onError(context, node, err);
}
});
} else {
doRun(context, node, async);
}
}
protected void onError(FlowContext context, Node node, Throwable err) {
context.stop();
}
protected abstract void doRun(FlowContext context, Node node, boolean async) throws Throwable;
}
应用示例
@Component("a")
public class TaskComponentImpl extends TaskComponentPlus {
@Override
protected void doRun(FlowContext context, Node node, boolean async) throws Throwable {
System.out.println("do...: " + node.id());
if (async) {
context.manualNext(node); //手动下一步
}
}
}
建议封装个基类出来。
2、链配置(极简模式)
#flow/async_case1.chain.yml
id: c1
layout:
- task: "@a"
- task: "@a"
- task: "@a"
3、执行测试
@SolonTest
public class AsyncTest {
@Inject
FlowEngine flowEngine;
@Test
public void case1() throws Throwable {
flowEngine.eval("c1"); //会分别打印现三个节点的 id
}
}
4、非 solon 环境参考(以原生 java 环境为例)
public class AsyncTest {
@Test
public void case1() throws Throwable {
MapContainer container = new MapContainer();
//注册组件
container.putComponent("a", new TaskComponentImpl());
FlowEngine engine = FlowEngine.newInstance();
//切换驱动器
engine.register(new SimpleFlowDriver(container));
//加载链
engine.load("classpath:demo/async/async_case1.chain.yml");
//执行
engine.eval("c1"); //会分别打印现三个节点的 id
//因为是异步,所以要阻一下
System.in.read();
}
}