熟悉 Completable 响应式接口
Solon-Rx(约2Kb)是基于 reactive-streams 封装的 RxJava 极简版(约 2Mb 左右)。目前仅一个接口 Completable,意为:可完成的发布者。
使用场景及接口:
| 接口 | 说明 | 备注 |
|---|---|---|
Completable | 作为返回类型 | |
Completable::doOnError(err->{...}) | 当出错时 | |
Completable::doOnErrorResume(err->Completable) | 当出错时,恢复为一个新流 | v3.7.2 后支持 |
Completable::doOnComplete(()->{...}) | 当完成时 | |
Completable.complete() | 构建完成发布者 | |
Completable.error(cause) | 构建异常发布者 | |
Completable.create(emitter->{...}) | 构建发射器发布者 | |
Completable.then(()->Completable) | 当完成后(然后),下一个新流 | |
Completable.then(Completable) | 当完成后(然后),下一个新流 | |
Completable.subscribeOn(executor) | 订阅于 | v3.7.2 后支持 |
Completable.delay(delay, unit) | 订阅延时 | v3.7.2 后支持 |
1、作为返回类型(主要用于过滤器)
@FunctionalInterface
public interface ExFilter {
/**
* 过滤
*
* @param ctx 交换上下文
* @param chain 过滤链
*/
Completable doFilter(ExContext ctx, ExFilterChain chain);
}
2、构建返回对象(即,发布者)
@Component
public class CloudGatewayFilterImpl implements CloudGatewayFilter {
@Override
public Completable doFilter(ExContext ctx, ExFilterChain chain) {
String token = ctx.rawHeader("TOKEN");
if (token == null) {
ctx.newResponse().status(401);
return Completable.complete();
}
return chain.doFilter(ctx);
}
}
3、主要事件应用示例
- doOnError 事件应用
当出错时,记录异常日志。//事件,还会传递给后续的观察者。
@Component(index = -99)
public class CloudGatewayFilterImpl implements CloudGatewayFilter {
@Override
public Completable doFilter(ExContext ctx, ExFilterChain chain) {
return chain.doFilter(ctx).doOnError(e -> {
log.error("{}", e);
});
}
}
- doOnErrorResume 事件应用
当出错时,调整输出状态。//以新的流,替代旧的流(之前的 OnError 事件,不再传递)
@Component(index = -99)
public class CloudGatewayFilterImpl implements CloudGatewayFilter {
@Override
public Completable doFilter(ExContext ctx, ExFilterChain chain) {
return chain.doFilter(ctx).doOnErrorResume(e -> {
if (e instanceof StatusException) {
StatusException se = (StatusException) e;
ctx.newResponse().status(se.getCode());
} else {
ctx.newResponse().status(500);
}
return Completable.complete();
});
}
}
- doOnComplete 事件应用
调整响应头和响应体。//事件,还会传递给后续的观察者。
//同步修改
@Component(index = -99)
public class CloudGatewayFilterImpl implements CloudGatewayFilter {
@Override
public Completable doFilter(ExContext ctx, ExFilterChain chain) {
return chain.doFilter(ctx).doOnComplete(e -> {
ctx.newResponse().headerAdd("X-TraceId", "xxx");
ctx.newResponse().body(Buffer.buffer("no!"));
});
}
}
//异步修改,可以再用 `Completable.create` 嵌套下
@Component(index = -99)
public class CloudGatewayFilterImpl implements CloudGatewayFilter {
@Override
public Completable doFilter(ExContext ctx, ExFilterChain chain) {
return Completable.create(emitter -> {
chain.doFilter(ctx)
.doOnComplete(() -> {
ExBody exBody = ctx.newResponse().getBody();
if (exBody instanceof ExBodyOfStream) {
ExBodyOfStream streamBody = (ExBodyOfStream) exBody;
((HttpClientResponse) streamBody.getStream()).body().andThen(bodyAr -> {
if (bodyAr.succeeded()) {
// 获取响应体内容
String content = bodyAr.result().toString();
ctx.newResponse().header("MD5", Utils.md5(content));
ctx.newResponse().body(Buffer.buffer(content + "#demo"));
emitter.onComplete();
} else {
emitter.onError(bodyAr.cause());
}
});
}
})
.doOnError(err -> {
emitter.onError(err);
})
.subscribe();
});
}
}