Solon v3.1.2

statefulflow - 有状态的流引擎

</> markdown

(v3.1.3 后支持)有状态流引擎(StatefulFlowEngine),是在 FlowEngine 基础上增加状态支持的增强接口。可直接用于:断点操控场景(比如调试),等待介入场景(比如行政审批)。需要引入扩展依赖:

<dependency>
    <groupId>org.noear</groupId>
    <artifactId>solon-flow</artifactId>
    <version>3.1.3-SNAPSHOT</version>
</dependency>

有状态流引擎的主要接口包括:

主要接口描述
StatefulFlowEngine有状态流引擎接口
StatefulFlowDriver有状态流驱动器接口。是 StatefulFlowEngine 能力的关键支撑
StateController状态控制器接口。提供状态控制(是否可操作,是否自动前进)
StateRepository状态仓库接口。提供状态保存与获取

StateController 框架内提供有(也可按需定制):

实现描述
BlockStateController阻塞状态控制器(所有节点有权操作,类似超级管理员)
ActorStateController参与者状态控制器(节点元信息匹配参与者后有权操作,没有配置的会自动前进)

StateRepository 框架内提供有(也可按需定制):

实现描述
InMemoryStateRepository内存状态仓库(基于 Map 封装)
RedisStateRepositoryRedis 状态仓库(基于 Redis 封装)

1、StatefulFlowEngine 兼容有状态与无状态两种场景

是否需要状态处理,通过 FlowContext 是否有实例id进行识别

new FlowContext(); //没有实例id,按无状态处理

new FlowContext("1"); //有实例id,按需要状态处理

2、StatefulFlowEngine 的构建及两组接口

主要是 StatefulSimpleFlowDriver 的构建。按需选择(或定制)合适的 stateController 和 stateRepository。构建的 StatefulFlowEngine 可以替换掉默认的 FlowEngine 实例。

@Configuration
public class DemoCom {
    //替换掉默认的引擎实例
    @Bean
    public StatefulFlowEngine flowEngine() {
        return StatefulFlowEngine.newInstance(StatefulSimpleFlowDriver.builder()
                .stateController(new BlockStateController())
                .stateRepository(new InMemoryStateRepository())
                .build());
    }
}

一组用于“断点操控”场景的接口:

接口描述
stepForward(...)->StatefulNode单步前进(相当于提交状态为“完成”)
stepBack(...)->StatefulNode单步后退(相当于提交状态为“回退”)

一组用于“等待介入”场景的接口(中间需要参与者介入):

接口描述
getActivityNode(...)->StatefulNode获取当前参与者的活动节点|
postActivityState(...)提交活动状态(会产生“前进”或“后退”的效果)
getActivityNodes(...)->Collection<StatefulNode>获取下一步所有活动节点
postActivityStateIfWaiting(...)提交活动状态(会检测是否在等待参与者操作)

3、状态类型 StateType(用于执行控制,或表达当前状态)

状态代码描述
UNKNOWN0未知(也表过无权限操作)
WAITING1001等待(也表过有权限操作)
COMPLETED1002完成(或通过)
TERMINATED1003终止(或否决)
RETURNED1004退回(或撤回)
RESTART1005重新开始

4、业务状态记录的定制参考

业务状态记录可以在应用侧实现(推荐)。也可以通过定制 StateRepository 实现:

//可以在 RedisStateRepository 基础上做扩展,或者基于 StateRepository 接口完整实现
public class RedisStateRepositoryEx extends RedisStateRepository {
    public RedisStateRepositoryEx(RedisClient client) {
        super(client);
    }

    public RedisStateRepositoryEx(RedisClient client, String statePrefix) {
        super(client, statePrefix);
    }

    /**
     * 提交活动状态时(事件处理)
     */
    @Override
    public void onPostActivityState(FlowContext context, Node node, StateType state) {
        String instanceId = context.getInstanceId();
        String chainId = node.getChain().getId();
        String activityNodeId = node.getId();
        String userId = context.get("userId"); //需要什么通过 context 传递
        String op = context.get("op"); //获取业务操作说明
        long created = System.currentTimeMillis();

        //业务保存
    }

    /**
     * 获取活动状态记录
     */
    public List getActivityStateRecords(FlowContext context) {
        String instanceId = context.getInstanceId();
        return null; //业务查询
    }
}

构建引擎实例

@Configuration
public class DemoCom {
    @Bean
    public RedisStateRepositoryEx stateRepository(){
        return new RedisStateRepositoryEx(...);
    }
    @Bean
    public StatefulFlowEngine flowEngine(RedisStateRepositoryEx stateRepository) {
        return StatefulFlowEngine.newInstance(StatefulSimpleFlowDriver.builder()
                .stateController(new ActorStateController("userId", "roleId")) //按需设计参与者的配置属性
                .stateRepository(stateRepository)
                .build());
    }
}

场景代码应用

public void test(StatefulFlowEngine flowEngine, RedisStateRepositoryEx stateRepository) {
    FlowContext context;
    StatefulNode statefulNode;

    context = new FlowContext("instance-1").put("actor", "刘涛"); //使用实例id

    statefulNode = flowEngine.getActivityNode("c1", context);
    assert "step1".equals(statefulNode.getNode().getId());
    assert StateType.WAITING == statefulNode.getState();

    //等待当前用户处理
    
    context.put("op", "通过");
    flowEngine.postActivityState(context, statefulNode.getNode(), StateType.COMPLETED);
    
    
    //获取状态记录
    List list = stateRepository.getActivityStateRecords(context);
}