statefulflow - 有状态的流引擎
(v3.1.3 后支持)有状态流引擎(StatefulFlowEngine),是在 FlowEngine 基础上增加状态支持的增强接口。可直接用于:断点操控场景(比如调试),等待介入场景(比如行政审批)。需要引入扩展依赖:
<dependency>
<groupId>org.noear</groupId>
<artifactId>solon-flow</artifactId>
<version>3.1.3-SNAPSHOT</version>
</dependency>
有状态流引擎的主要接口包括:
主要接口 | 描述 |
---|---|
StatefulFlowEngine | 有状态流引擎接口 |
StatefulFlowDriver | 有状态流驱动器接口。是 StatefulFlowEngine 能力的关键支撑 |
StateController | 状态控制器接口。提供状态控制(是否可操作,是否自动前进) |
StateRepository | 状态仓库接口。提供状态保存与获取 |
StateController 框架内提供有(也可按需定制):
实现 | 描述 |
---|---|
BlockStateController | 阻塞状态控制器(所有节点有权操作,类似超级管理员) |
ActorStateController | 参与者状态控制器(节点元信息匹配参与者后有权操作,没有配置的会自动前进) |
StateRepository 框架内提供有(也可按需定制):
实现 | 描述 |
---|---|
InMemoryStateRepository | 内存状态仓库(基于 Map 封装) |
RedisStateRepository | Redis 状态仓库(基于 Redis 封装) |
1、StatefulFlowEngine 兼容有状态与无状态两种场景
是否需要状态处理,通过 FlowContext 是否有实例id进行识别
new FlowContext(); //没有实例id,按无状态处理
new FlowContext("1"); //有实例id,按需要状态处理
2、StatefulFlowEngine 的构建及两组接口
主要是 StatefulSimpleFlowDriver 的构建。按需选择(或定制)合适的 stateController 和 stateRepository。构建的 StatefulFlowEngine 可以替换掉默认的 FlowEngine 实例。
@Configuration
public class DemoCom {
//替换掉默认的引擎实例
@Bean
public StatefulFlowEngine flowEngine() {
return StatefulFlowEngine.newInstance(StatefulSimpleFlowDriver.builder()
.stateController(new BlockStateController())
.stateRepository(new InMemoryStateRepository())
.build());
}
}
一组用于“断点操控”场景的接口:
接口 | 描述 |
---|---|
stepForward(...)->StatefulNode | 单步前进(相当于提交状态为“完成”) |
stepBack(...)->StatefulNode | 单步后退(相当于提交状态为“回退”) |
一组用于“等待介入”场景的接口(中间需要参与者介入):
接口 | 描述 |
---|---|
getActivityNode(...)->StatefulNode | 获取当前参与者的活动节点| |
postActivityState(...) | 提交活动状态(会产生“前进”或“后退”的效果) |
getActivityNodes(...)->Collection<StatefulNode> | 获取下一步所有活动节点 |
postActivityStateIfWaiting(...) | 提交活动状态(会检测是否在等待参与者操作) |
3、状态类型 StateType(用于执行控制,或表达当前状态)
状态 | 代码 | 描述 |
---|---|---|
UNKNOWN | 0 | 未知(也表过无权限操作) |
WAITING | 1001 | 等待(也表过有权限操作) |
COMPLETED | 1002 | 完成(或通过) |
TERMINATED | 1003 | 终止(或否决) |
RETURNED | 1004 | 退回(或撤回) |
RESTART | 1005 | 重新开始 |
4、业务状态记录的定制参考
业务状态记录可以在应用侧实现(推荐)。也可以通过定制 StateRepository 实现:
//可以在 RedisStateRepository 基础上做扩展,或者基于 StateRepository 接口完整实现
public class RedisStateRepositoryEx extends RedisStateRepository {
public RedisStateRepositoryEx(RedisClient client) {
super(client);
}
public RedisStateRepositoryEx(RedisClient client, String statePrefix) {
super(client, statePrefix);
}
/**
* 提交活动状态时(事件处理)
*/
@Override
public void onPostActivityState(FlowContext context, Node node, StateType state) {
String instanceId = context.getInstanceId();
String chainId = node.getChain().getId();
String activityNodeId = node.getId();
String userId = context.get("userId"); //需要什么通过 context 传递
String op = context.get("op"); //获取业务操作说明
long created = System.currentTimeMillis();
//业务保存
}
/**
* 获取活动状态记录
*/
public List getActivityStateRecords(FlowContext context) {
String instanceId = context.getInstanceId();
return null; //业务查询
}
}
构建引擎实例
@Configuration
public class DemoCom {
@Bean
public RedisStateRepositoryEx stateRepository(){
return new RedisStateRepositoryEx(...);
}
@Bean
public StatefulFlowEngine flowEngine(RedisStateRepositoryEx stateRepository) {
return StatefulFlowEngine.newInstance(StatefulSimpleFlowDriver.builder()
.stateController(new ActorStateController("userId", "roleId")) //按需设计参与者的配置属性
.stateRepository(stateRepository)
.build());
}
}
场景代码应用
public void test(StatefulFlowEngine flowEngine, RedisStateRepositoryEx stateRepository) {
FlowContext context;
StatefulNode statefulNode;
context = new FlowContext("instance-1").put("actor", "刘涛"); //使用实例id
statefulNode = flowEngine.getActivityNode("c1", context);
assert "step1".equals(statefulNode.getNode().getId());
assert StateType.WAITING == statefulNode.getState();
//等待当前用户处理
context.put("op", "通过");
flowEngine.postActivityState(context, statefulNode.getNode(), StateType.COMPLETED);
//获取状态记录
List list = stateRepository.getActivityStateRecords(context);
}