Solon v3.7.2

熟悉 Completable 响应式接口

</> markdown

Solon-Rx(约2Kb)是基于 reactive-streams 封装的 RxJava 极简版(约 2Mb 左右)。目前仅一个接口 Completable,意为:可完成的发布者。

使用场景及接口:

接口说明备注
Completable作为返回类型
Completable::doOnError(err->{...})当出错时
Completable::doOnErrorResume(err->Completable)当出错时,恢复为一个新流v3.7.2 后支持
Completable::doOnComplete(()->{...})当完成时
Completable.complete()构建完成发布者
Completable.error(cause)构建异常发布者
Completable.create(emitter->{...})构建发射器发布者
Completable.then(()->Completable)当完成后(然后),下一个新流
Completable.then(Completable)当完成后(然后),下一个新流
Completable.subscribeOn(executor)订阅于v3.7.2 后支持
Completable.delay(delay, unit)订阅延时v3.7.2 后支持

1、作为返回类型(主要用于过滤器)

@FunctionalInterface
public interface ExFilter {
    /**
     * 过滤
     *
     * @param ctx   交换上下文
     * @param chain 过滤链
     */
    Completable doFilter(ExContext ctx, ExFilterChain chain);
}

2、构建返回对象(即,发布者)

@Component
public class CloudGatewayFilterImpl implements CloudGatewayFilter {
    @Override
    public Completable doFilter(ExContext ctx, ExFilterChain chain) {
        String token = ctx.rawHeader("TOKEN");
        if (token == null) {
            ctx.newResponse().status(401);
            return Completable.complete();
        }
        
        return chain.doFilter(ctx);
    }
}

3、主要事件应用示例

  • doOnError 事件应用

当出错时,记录异常日志。//事件,还会传递给后续的观察者。

@Component(index = -99)
public class CloudGatewayFilterImpl implements CloudGatewayFilter {
    @Override
    public Completable doFilter(ExContext ctx, ExFilterChain chain) {
        return chain.doFilter(ctx).doOnError(e -> {
            log.error("{}", e);
        });
    }
}
  • doOnErrorResume 事件应用

当出错时,调整输出状态。//以新的流,替代旧的流(之前的 OnError 事件,不再传递)

@Component(index = -99)
public class CloudGatewayFilterImpl implements CloudGatewayFilter {
    @Override
    public Completable doFilter(ExContext ctx, ExFilterChain chain) {
        return chain.doFilter(ctx).doOnErrorResume(e -> {
            if (e instanceof StatusException) {
                StatusException se = (StatusException) e;

                ctx.newResponse().status(se.getCode());
            } else {
                ctx.newResponse().status(500);
            }

            return Completable.complete();
        });
    }
}
  • doOnComplete 事件应用

调整响应头和响应体。//事件,还会传递给后续的观察者。

//同步修改
@Component(index = -99)
public class CloudGatewayFilterImpl implements CloudGatewayFilter {
    @Override
    public Completable doFilter(ExContext ctx, ExFilterChain chain) {
        return chain.doFilter(ctx).doOnComplete(e -> {
            ctx.newResponse().headerAdd("X-TraceId", "xxx");
            ctx.newResponse().body(Buffer.buffer("no!"));
        });
    }
}

//异步修改,可以再用 `Completable.create` 嵌套下
@Component(index = -99)
public class CloudGatewayFilterImpl implements CloudGatewayFilter {
    @Override
    public Completable doFilter(ExContext ctx, ExFilterChain chain) {
        return Completable.create(emitter -> {
            chain.doFilter(ctx)
                    .doOnComplete(() -> {
                        ExBody exBody = ctx.newResponse().getBody();
                        if (exBody instanceof ExBodyOfStream) {
                            ExBodyOfStream streamBody = (ExBodyOfStream) exBody;
                            ((HttpClientResponse) streamBody.getStream()).body().andThen(bodyAr -> {
                                if (bodyAr.succeeded()) {
                                    // 获取响应体内容
                                    String content = bodyAr.result().toString();
                                    ctx.newResponse().header("MD5", Utils.md5(content));
                                    ctx.newResponse().body(Buffer.buffer(content + "#demo"));
                                    emitter.onComplete();
                                } else {
                                    emitter.onError(bodyAr.cause());
                                }
                            });
                        }
                    })
                    .doOnError(err -> {
                        emitter.onError(err);
                    })
                    .subscribe();
        });
    }
}