Solon v3.7.2

dami2 - bus 之流事件(stream)

</> markdown

1、发送(流事件)与监听(流事件)

public class DemoApp {
    static String topic = "demo.hello";

    public static void main(String[] args) {
        //监听流事件
        Dami.bus().<String, String>listen(topic, (event, att, data, sink) -> {
            System.err.println(data);
            sink.onNext("hi");
            sink.onComplete();
        });

        //发送流事件
        Flux.from(Dami.bus().<String, String>stream(topic, "hello")).doOnNext(item -> {
            System.err.println(item);
        }).subscribe();
    }
}

提醒:事件监听的参数差别(开发工具通过参数个数,推断为不同的类型):

事件监听 lambda 参数对应监听器接口
send(event)->{}EventListener
call(event, data, sink)->{}CallEventListener
stream(event, att, data, sink)->{}StreamEventListener

2、流事件的内部处理

流事件的内部同样是“通用事件”(仅是一种体验简化)。改成“通用事件”如下:

public class DemoApp {
    static String topic = "demo.hello";

    public static void main(String[] args) {
        //监听流事件
        Dami.bus().<StreamPayload<String, ? super String>>listen(topic, event -> {
            System.err.println(event.getPayload().getData());
            event.getPayload().getSink().onNext("hi");
            event.getPayload().getSink().onComplete();
        });

        //发送流事件
        Flux.<String>from(subscriber -> {
            Dami.bus().<StreamPayload<String, ? super String>>send(topic, new StreamPayload<>("hello", subscriber));
        }).doOnNext(item -> {
            System.err.println(item);
        }).subscribe();
    }
}

如果要用通用事件监听所有事件(通过 payload 类型进行识别,并对应处理):

public class UniEventListener implements EventListener<Object> {
    @Override
    public void onEvent(Event<Object> event) throws Throwable {
        if (event.getPayload() instanceof CallPayload) {
            //is call
        } else if (event.getPayload() instanceof StreamPayload) {
            //is stream
            System.err.println(event.<StreamPayload<String,String>>getPayloadAs().getData());
            event.<StreamPayload<String,String>>getPayloadAs().getSink().onNext("hi");
            event.<StreamPayload<String,String>>getPayloadAs().getSink().onComplete();
        } else {
            //is send
        }
    }
}

//或者 lambda 方式:
public class DemoApp {
    static String topic = "demo.hello";
    
    public static void main(String[] args) {
        Dami.bus().listen(topic, event -> {
            if (event.getPayload() instanceof CallPayload) {
                //is call
            } else if (event.getPayload() instanceof StreamPayload) {
                //is stream
                System.err.println(event.<StreamPayload<String,String>>getPayloadAs().getData());
                event.<StreamPayload<String,String>>getPayloadAs().getSink().onNext("hi");
                event.<StreamPayload<String,String>>getPayloadAs().getSink().onComplete();
            } else {
                //is send
            }
        });
    }
}