Solon v3.0.4.1

pulsar2-solon-plugin [试用]

</> markdown
<dependency>
    <groupId>org.noear</groupId>
    <artifactId>pulsar2-solon-plugin</artifactId>
</dependency>

1、基础使用参考

  • 创建相应的消息体Java Bean
public class MyMsg {
    private String data;
    
    public MyMsg() {}
    public MyMsg(String data) {
        this.data = data;
    }

    public String getData() {
        return data;
    }
}
  • Java Config 配置生产者
@Configuration
public class Config {

    @Bean
    public ProducerFactory producerFactory() {
        return new ProducerFactory()
            .addProducer("my-topic", MyMsg.class)
            .addProducer("other-topic", String.class);
    }
}

该插件已经默认注入 PulsarTemplate Java Bean 了,可以直接通过@Inject注解来获取到 PulsarTemplate实例

@Component
class MyProducer {

	@Inject
	private PulsarTemplate producer;

	void sendHelloWorld() throws PulsarClientException {
		producer.send("my-topic", new MyMsg("Hello world!"));
	}
}

  • Java Config 配置消费者

    @PulsarConsumer 注解只能添加在方法上

@Component
class MyConsumer {
    
    @PulsarConsumer(topic="my-topic", clazz=MyMsg.class)
    void consume(MyMsg msg) {
    	// TODO process your message
    	System.out.println(msg.getData());
    }
}
  • Java Config 批量配置消费者

只需将 @PulsarConsumer 注解的 batch属性设置为 true.

@Component
class MyBatchConsumer {
    @PulsarConsumer(topic = "my-topic",
            clazz=MyMsg.class,
            consumerName = "my-consumer",
            subscriptionName = "my-subscription",
            batch = true)
    public void consumeString(Messages<MyMsg> msgs) {
    		msgs.forEach((msg) -> {
    				System.out.println(msg);
    		});
    	}
    		
}
  • java Config 配置消息消费返回后确认的消息

@PulsarConsumer 注解的 batch属性设置为 true.

@Component
class MyBatchConsumer {
    @PulsarConsumer(topic = "my-topic",
            clazz=MyMsg.class,
            consumerName = "my-consumer",
            subscriptionName = "my-subscription",
            batch = true)
    public List<MessageId> consumeString(Messages<MyMsg> msgs) {
    		List<MessageId> ackList = new ArrayList<>();
    		msgs.forEach((msg) -> {
    				System.out.println(msg);
    				ackList.add(msg.getMessageId());
    		});
    		return ackList;
    	}
    		
}
  • java Config 配置消息消费后,需手工确认的消息

@PulsarConsumer 注解的 batch属性设置为 true. 和 batchAckMode 属性设置为 BatchAckMode.MANUAL

@Component
class MyBatchConsumer {
    
    @PulsarConsumer(topic = "my-topic",
            clazz=MyMsg.class,
            consumerName = "my-consumer",
            subscriptionName = "my-subscription",
            batch = true,
            batchAckMode = BatchAckMode.MANUAL)
    public void consumeString(Messages<MyMsg> msgs,Consumer<MyMsg> consumer) {
    			List<MessageId> ackList = new ArrayList<>();
	    		msgs.forEach((msg) -> {
	    			try {
	    				System.out.println(msg);
	    				ackList.add(msg.getMessageId());
	    			} catch (Exception ex) {
		    			System.err.println(ex.getMessage());
	    				consumer.negativeAcknowledge(msg);
		    		}
	    		});
	    		consumer.acknowledge(ackList);
	}
    		
}
  • 最小化配置

solon.pulsar2.service-url=pulsar://localhost:6650

  • 配置参考

Default configuration:

#PulsarClient
solon.pulsar2.service-url=pulsar://localhost:6650
solon.pulsar2.io-threads=10
solon.pulsar2.listener-threads=10
solon.pulsar2.enable-tcp-no-delay=false
solon.pulsar2.keep-alive-interval-sec=20
solon.pulsar2.connection-timeout-sec=10
solon.pulsar2.operation-timeout-sec=15
solon.pulsar2.starting-backoff-interval-ms=100
solon.pulsar2.max-backoff-interval-sec=10
solon.pulsar2.consumer-name-delimiter=
solon.pulsar2.namespace=default
solon.pulsar2.tenant=public
solon.pulsar2.auto-start=true
solon.pulsar2.allow-interceptor=false

#Consumer
solon.pulsar2.consumer.default.dead-letter-policy-max-redeliver-count=-1
solon.pulsar2.consumer.default.ack-timeout-ms=3000

TLS connection configuration:

solon.pulsar2.service-url=pulsar+ssl://localhost:6651
solon.pulsar2.tlsTrustCertsFilePath=/etc/pulsar/tls/ca.crt
solon.pulsar2.tlsCiphers=TLS_DH_RSA_WITH_AES_256_GCM_SHA384,TLS_DH_RSA_WITH_AES_256_CBC_SHA
solon.pulsar2.tlsProtocols=TLSv1.3,TLSv1.2
solon.pulsar2.allowTlsInsecureConnection=false
solon.pulsar2.enableTlsHostnameVerification=false

solon.pulsar2.tlsTrustStorePassword=brokerpw
solon.pulsar2.tlsTrustStorePath=/var/private/tls/broker.truststore.jks
solon.pulsar2.tlsTrustStoreType=JKS

solon.pulsar2.useKeyStoreTls=false

Pulsar client authentication (Only one of the options can be used)

# TLS
solon.pulsar2.tls-auth-cert-file-path=/etc/pulsar/tls/cert.cert.pem
solon.pulsar2.tls-auth-key-file-path=/etc/pulsar/tls/key.key-pk8.pem

#Token based
solon.pulsar2.token-auth-value=43th4398gh340gf34gj349gh304ghryj34fh

#OAuth2 based
solon.pulsar2.oauth2-issuer-url=https://accounts.google.com
solon.pulsar2.oauth2-credentials-url=file:/path/to/file
solon.pulsar2.oauth2-audience=https://broker.example.com

2、进阶用法

  • 响应式 Reactor support ,待完善
@Configuration
public class MyFluxConsumers {

    @Bean
    public FluxConsumer myFluxConsumer(FluxConsumerFactory fluxConsumerFactory) {
        return fluxConsumerFactory.newConsumer(
            PulsarFluxConsumer.builder()
                .setTopic("flux-topic")
                .setConsumerName("flux-consumer")
                .setSubscriptionName("flux-subscription")
                .setMessageClass(MyMsg.class)
                .build());
    }
}
@Component
public class MyFluxConsumerService {
    
    @Inject
    private FluxConsumer myFluxConsumer;

    public void subscribe() {
        myFluxConsumer
            .asSimpleFlux()
            .subscribe(msg -> System.out.println(msg.getData()));
    }
}
  • (可选) 如果您希望手动确认消息,则可以以不同的方式配置您的消费者.
PulsarFluxConsumer.builder()
    .setTopic("flux-topic")
    .setConsumerName("flux-consumer")
    .setSubscriptionName("flux-subscription")
    .setMessageClass(MyMsg.class)
    .setSimple(false) // This is your required change in bean configuration class
    .build());
@Component
public class MyFluxConsumerService {
    
    @Inject
    private FluxConsumer myFluxConsumer;

    public void subscribe() {
        myFluxConsumer.asFlux()
            .subscribe(msg -> {
                try {
                    final MyMsg myMsg = (MyMsg) msg.getMessage().getValue();

                    System.out.println(myMsg.getData());

                    // you need to acknowledge the message manually on finished job
                    msg.getConsumer().acknowledge(msg.getMessage());
                } catch (PulsarClientException e) {
                    // you need to negatively acknowledge the message manually on failures
                    msg.getConsumer().negativeAcknowledge(msg.getMessage());
                }
            });
    }
}

3、调式模式

默认在日志文件输出 DEBUG .

solon.pulsar2.allow-interceptor=true

默认注入 DefaultConsumerInterceptor的实例.或者可自定义:

消费者Consumer Interceptor Example:

@Component
public class PulsarConsumerInterceptor extends DefaultConsumerInterceptor<Object> {
    @Override
    public Message beforeConsume(Consumer<Object> consumer, Message message) {
        System.out.println("do something");
        return super.beforeConsume(consumer, message);
    }
}

生产者Producer Interceptor Example:

@Component
public class PulsarProducerInterceptor extends DefaultProducerInterceptor {

    @Override
    public Message beforeSend(Producer producer, Message message) {
        super.beforeSend(producer, message);
        System.out.println("do something");
        return message;
    }

    @Override
    public void onSendAcknowledgement(Producer producer, Message message, MessageId msgId, Throwable exception) {
        super.onSendAcknowledgement(producer, message, msgId, exception);
    }
}