```xml
<dependency>
    <groupId>org.noear</groupId>
    <artifactId>solon-data-rx-sqlutils</artifactId>
</dependency>
```

对应的同步版本：[solon-data-sqlutils](/article/855)

### 1、描述

数据扩展插件。基于 r2dbc 和 io.projectreactor 适配的 sql 基础使用工具，比较反朴归真。v3.0.5 后支持

* 支持事务管理（暂时，需要使用 r2dbc 进行手动事务管理）
* 支持多数据源
* 支持流式输出
* 支持批量执行
* 支持存储过程

一般用于 SQL 很少的响应式项目；或者对性能要求极高的项目；或者作为引擎用；等...... RxSqlUtils 总体上分为查询操作（query 开发）和更新操作（update 开头）。

### 2、配置示例

配置数据源（具体参考：[《数据源的配置与构建》](/article/794)）。配置构建时注意事项：

* 使用 R2dbcConnectionFactory 做为数据源类型
* 使用 r2dbcUrl 作为连接地址（r2dbc 协议的连接地址）

```yaml
solon.dataSources:
  demo!:
    class: "org.noear.solon.data.datasource.R2dbcConnectionFactory"
    r2dbcUrl: "r2dbc:h2:mem:///test;DB_CLOSE_ON_EXIT=FALSE;MODE=MYSQL;DATABASE_TO_LOWER=TRUE;IGNORECASE=TRUE;CASE_INSENSITIVE_IDENTIFIERS=TRUE"
```

配置数据源后，可按数据源名直接注入（或手动获取）：

```java
//注入
@Component
public class DemoService {
    @Inject //默认数据源
    RxSqlUtils sqlUtils;
    
    @Inject("demo") //demo 数据源
    RxSqlUtils sqlUtils;
}

//或者手动获取
RxSqlUtils sqlUtils = RxSqlUtils.ofName("db1");
```

可以更换默认的行转换器（可选）：

```java
@Component
public class RxRowConverterFactoryImpl implements RxRowConverterFactory<Object> {
    @Override
    public RxRowConverter<Object> create(Class<?> tClass) {
        return new RowConverterImpl(tClass);
    }

    private static class RowConverterImpl implements RxRowConverter<Object> {
        private final Class<?> tClass;

        public RowConverterImpl(Class<?> tClass) {
            this.tClass = tClass;
        }


        @Override
        public Object convert(Row row, RowMetadata metaData) {
            Map<String, Object> map = new LinkedHashMap<>();
            for (int i = 0; i < metaData.getColumnMetadatas().size(); i++) {
                String name = metaData.getColumnMetadata(i).getName();
                Object value = row.get(i);
                map.put(name, value);
            }

            if (Map.class == tClass) {
                return map;
            } else {
                return ONode.load(map).toObject(tClass);
            }
        }
    }
}
```

可添加的命令拦截器（可选。比如：添加 sql 打印，记录执行时间）：

```java
@Component
public class RxSqlCommandInterceptorImpl implements RxSqlCommandInterceptor {
    @Override
    public Publisher doIntercept(RxSqlCommandInvocation inv) {
        System.out.println("sql:" + inv.getCommand().getSql());
        if (inv.getCommand().isBatch()) {
            System.out.println("args:" + inv.getCommand().getArgsColl());
        } else {
            System.out.println("args:" + inv.getCommand().getArgs());
        }
        System.out.println("----------");

        return inv.invoke();
    }
}
```

### 3、初始化数据库操作

准备数据库创建脚本资源文件（`resources/demo.sql`）

```sql
CREATE TABLE `test` (
    `id` int NOT NULL,
    `v1` int DEFAULT NULL,
    `v2` int DEFAULT NULL,
    `v3` varchar(50) DEFAULT NULL ,
    PRIMARY KEY (`id`)
);
```

初始化数据库（即，执行脚本文件）

```java
@Configuration
public class DemoConfig {
    @Bean
    public void init(RxSqlUtils sqlUtils) throws Exception {
        sqlUtils.initDatabase("classpath:demo.sql");
    }
}
```


### 4、查询操作

* 查询并获取值（只查一列）

```java
public void getValue() throws SQLException {
    //获取值
    Mono<Long> val = sqlUtils.sql("select count(*) from appx")
                       .queryValue();
    
    //获取值列表
    Flux<Integer> valList = sqlUtils.sql("select app_id from appx limit 5")
                                    .queryValueList();
}
```

* 查询并获取行

```java
// Entity 形式
public void getRow() throws SQLException {
    //获取行列表
    Flux<Appx> rowList = sqlUtils.sql("select * from appx limit 2")
                                 .queryRowList(Appx.calss);
    //获取行
    Mono<Appx> row = sqlUtils.sql("select * from appx where app_id=?", 11)
                       .queryRow(Appx.calss);
}

// Map 形式
public void getRowMap() throws SQLException {
    //获取行列表
    Flux<Map> rowList = sqlUtils.sql("select * from appx limit 2")
                                 .queryRowList(Map.calss);
    //获取行
    Mono<Map> row = sqlUtils.sql("select * from appx where app_id=?", 11)
                       .queryRow(Map.calss);
}
```



### 5、查询构建器操作



以上几种查询方式，都是一行代码就解决的。复杂的查询怎么办？比如管理后台的条件统计，可以先使用构建器：


```java
public Flux<Appx> findDataStat(int group_id, String channel, int scale) throws SQLException {
    SqlBuilder sqlSpec = new SqlBuilder();
    sqlSpec.append("select group_id, sum(amount) amount from appx ")
           .append("where group_id = ? ", group_id)
           .appendIf(channel != null, "and channel like ? ", channel + "%");

    //可以分离控制
    if(scale > 10){
        sqlSpec.append("and scale = ? ", scale);
    }
    
    sqlSpec.append("group by group_id ");

    return sqlUtils.sql(sqlSpec).queryRowList(Appx.class);
}
```

管理后台常见的分页查询：

```java
public Page<Appx> findDataPage(int group_id, String channel) throws SQLException {
    SqlBuilder sqlSpec = new SqlBuilder()
      .append("from appx  where group_id = ? ", group_id)
      .appendIf(channel != null, "and channel like ? ", channel + "%");
    
    //备份
    sqlSpec.backup();
    sqlSpec.insert("select * ");
    sqlSpec.append("limit ?,? ", 10,10); //分页获取列表
    
    //查询列表
    Flux<Appx> list = sqlUtils.sql(sqlSpec).queryRowList(Appx.class);
    
    //回滚（可以复用备份前的代码构建）
    sqlSpec.restore();
    sqlSpec.insert("select count(*) ");
    
    //查询总数
    Mono<Long> total = sqlUtils.sql(sqlSpec).queryValue(Long.class);
    
    return new Page(list, total);
}
```

构建器支持 `?...` 集合占位符查询：

```java
public Flux<Appx> findDataList() throws SQLException {
    SqlBuilder sqlSpec = new SqlBuilder()
        .append("select * from appx  where app_id in (?...) ", Arrays.asList(1,2,3,4));
    
    //查询列表
    return sqlUtils.sql(sqlSpec).queryRowList(Appx.class);
}
```


### 6、更新操作

* 插入


```java
public void add() throws SQLException {
    sqlUtils.sq("insert test(id,v1,v2) values(?,?,?)", 2, 2, 2).update()
              .then(sqlUtils.sq("insert test(id,v1,v2) values(?,?,?)", 2, 2, 2).update())
              .block(); //.block() = 马上执行
    
    //返回自增主键
    Mono<Long> key = sqlUtils.sql("insert test(id,v1,v2) values(?,?,?)", 2, 2, 2)
                       .updateReturnKey(Long.class);
}
```


* 更新


```java
public void exe() throws SQLException {
    sqlUtils.sql("delete from test where id=?", 2).update().block();
}
```

* 批量执行（插入、或更新、或删除）


```java
public void exeBatch() throws SQLException {
    List<Object[]> argsList = new ArrayList<>();
    argsList.add(new Object[]{1, 1, 1});
    argsList.add(new Object[]{2, 2, 2});
    argsList.add(new Object[]{3, 3, 3});
    argsList.add(new Object[]{4, 4, 4});
    argsList.add(new Object[]{5, 5, 5});

    Flux<Long> rows = sqlUtils.sql("insert test(id,v1,v2) values(?,?,?)")
                         .params(argsList)
                         .updateBatch();
}
```


### 7、接口说明

RxSqlUtils（Sql 响应式工具类）

```java
public interface RxSqlUtils {
    static RxSqlUtils of(ConnectionFactory ds) {
        assert ds != null;
        return new SimpleRxSqlUtils(ds);
    }

    static RxSqlUtils ofName(String dsName) {
        return of(Solon.context().getBean(dsName));
    }

    /**
     * 初始化数据库
     *
     * @param scriptUri 示例：`classpath:demo.sql` 或 `file:demo.sql`
     */
    default void initDatabase(String scriptUri) throws IOException, SQLException {
        String sql = ResourceUtil.findResourceAsString(scriptUri);

        for (String s1 : sql.split(";")) {
            if (s1.trim().length() > 10) {
                this.sql(s1).update().block();
            }
        }
    }

    /**
     * 执行代码
     *
     * @param sql  代码
     * @param args 参数
     */
    RxSqlQuerier sql(String sql, Object... args);

    /**
     * 执行代码
     *
     * @param sqlSpec 代码声明
     */
    default RxSqlQuerier sql(SqlSpec sqlSpec) {
        return sql(sqlSpec.getSql(), sqlSpec.getArgs());
    }
}
```

RxSqlQuerier （Sql 响应式查询器）

```java
public interface RxSqlQuerier {
    /**
     * 绑定参数
     */
    RxSqlQuerier params(Object... args);

    /**
     * 绑定参数
     */
    <S> RxSqlQuerier params(S args, RxStatementBinder<S> binder);

    /**
     * 绑定参数（用于批处理）
     */
    RxSqlQuerier params(Collection<Object[]> argsList);

    /**
     * 绑定参数（用于批处理）
     */
    <S> RxSqlQuerier params(Collection<S> argsList, Supplier<RxStatementBinder<S>> binderSupplier);

    /// /////////////////////////////

    /**
     * 查询并获取值
     *
     * @return 值
     */
    @Nullable
    <T> Mono<T> queryValue(Class<T> tClass);

    /**
     * 查询并获取值列表
     *
     * @return 值列表
     */
    @Nullable
    <T> Flux<T> queryValueList(Class<T> tClass);

    /**
     * 查询并获取行
     *
     * @param tClass Map.class or T.class
     * @return 值
     */
    @Nullable
    <T> Mono<T> queryRow(Class<T> tClass);

    /**
     * 查询并获取行
     *
     * @return 值
     */
    @Nullable
    <T> Mono<T> queryRow(RxRowConverter<T> converter);

    /**
     * 查询并获取行列表
     *
     * @param tClass Map.class or T.class
     * @return 值列表
     */
    @Nullable
    <T> Flux<T> queryRowList(Class<T> tClass);

    /**
     * 查询并获取行列表
     *
     * @return 值列表
     */
    @Nullable
    <T> Flux<T> queryRowList(RxRowConverter<T> converter);

    /**
     * 更新（插入、或更新、或删除）
     *
     * @return 受影响行数
     */
    Mono<Long> update();

    /**
     * 更新并返回主键
     *
     * @return 主键
     */
    @Nullable
    <T> Mono<T> updateReturnKey(Class<T> tClass);

    /**
     * 批量更新（插入、或更新、或删除）
     *
     * @return 受影响行数组
     */
    Flux<Long> updateBatch();
}
```


### 配套示例：

https://gitee.com/opensolon/solon-examples/tree/main/4.Solon-Data/demo4010-sqlutils_rx
