dami2 - bus 之流事件(stream)
1、发送(流事件)与监听(流事件)
public class DemoApp {
static String topic = "demo.hello";
public static void main(String[] args) {
//监听流事件
Dami.bus().<String, String>listen(topic, (event, att, data, sink) -> {
System.err.println(data);
sink.onNext("hi");
sink.onComplete();
});
//发送流事件
Flux.from(Dami.bus().<String, String>stream(topic, "hello")).doOnNext(item -> {
System.err.println(item);
}).subscribe();
}
}
提醒:事件监听的参数差别(开发工具通过参数个数,推断为不同的类型):
| 事件 | 监听 lambda 参数 | 对应监听器接口 |
|---|---|---|
| send | (event)->{} | EventListener |
| call | (event, data, sink)->{} | CallEventListener |
| stream | (event, att, data, sink)->{} | StreamEventListener |
2、流事件的内部处理
流事件的内部同样是“通用事件”(仅是一种体验简化)。改成“通用事件”如下:
public class DemoApp {
static String topic = "demo.hello";
public static void main(String[] args) {
//监听流事件
Dami.bus().<StreamPayload<String, ? super String>>listen(topic, event -> {
System.err.println(event.getPayload().getData());
event.getPayload().getSink().onNext("hi");
event.getPayload().getSink().onComplete();
});
//发送流事件
Flux.<String>from(subscriber -> {
Dami.bus().<StreamPayload<String, ? super String>>send(topic, new StreamPayload<>("hello", subscriber));
}).doOnNext(item -> {
System.err.println(item);
}).subscribe();
}
}
如果要用通用事件监听所有事件(通过 payload 类型进行识别,并对应处理):
public class UniEventListener implements EventListener<Object> {
@Override
public void onEvent(Event<Object> event) throws Throwable {
if (event.getPayload() instanceof CallPayload) {
//is call
} else if (event.getPayload() instanceof StreamPayload) {
//is stream
System.err.println(event.<StreamPayload<String,String>>getPayloadAs().getData());
event.<StreamPayload<String,String>>getPayloadAs().getSink().onNext("hi");
event.<StreamPayload<String,String>>getPayloadAs().getSink().onComplete();
} else {
//is send
}
}
}
//或者 lambda 方式:
public class DemoApp {
static String topic = "demo.hello";
public static void main(String[] args) {
Dami.bus().listen(topic, event -> {
if (event.getPayload() instanceof CallPayload) {
//is call
} else if (event.getPayload() instanceof StreamPayload) {
//is stream
System.err.println(event.<StreamPayload<String,String>>getPayloadAs().getData());
event.<StreamPayload<String,String>>getPayloadAs().getSink().onNext("hi");
event.<StreamPayload<String,String>>getPayloadAs().getSink().onComplete();
} else {
//is send
}
});
}
}