pulsar2-solon-plugin [试用]
<dependency>
<groupId>org.noear</groupId>
<artifactId>pulsar2-solon-plugin</artifactId>
</dependency>
1、基础使用参考
- 创建相应的消息体Java Bean
public class MyMsg {
private String data;
public MyMsg() {}
public MyMsg(String data) {
this.data = data;
}
public String getData() {
return data;
}
}
- Java Config 配置生产者
@Configuration
public class Config {
@Bean
public ProducerFactory producerFactory() {
return new ProducerFactory()
.addProducer("my-topic", MyMsg.class)
.addProducer("other-topic", String.class);
}
}
该插件已经默认注入 PulsarTemplate
Java Bean 了,可以直接通过@Inject
注解来获取到 PulsarTemplate
实例
@Component
class MyProducer {
@Inject
private PulsarTemplate producer;
void sendHelloWorld() throws PulsarClientException {
producer.send("my-topic", new MyMsg("Hello world!"));
}
}
-
Java Config 配置消费者
@PulsarConsumer
注解只能添加在方法上
@Component
class MyConsumer {
@PulsarConsumer(topic="my-topic", clazz=MyMsg.class)
void consume(MyMsg msg) {
// TODO process your message
System.out.println(msg.getData());
}
}
- Java Config 批量配置消费者
只需将 @PulsarConsumer
注解的 batch
属性设置为 true
.
@Component
class MyBatchConsumer {
@PulsarConsumer(topic = "my-topic",
clazz=MyMsg.class,
consumerName = "my-consumer",
subscriptionName = "my-subscription",
batch = true)
public void consumeString(Messages<MyMsg> msgs) {
msgs.forEach((msg) -> {
System.out.println(msg);
});
}
}
- java Config 配置消息消费返回后确认的消息
将 @PulsarConsumer
注解的 batch
属性设置为 true
.
@Component
class MyBatchConsumer {
@PulsarConsumer(topic = "my-topic",
clazz=MyMsg.class,
consumerName = "my-consumer",
subscriptionName = "my-subscription",
batch = true)
public List<MessageId> consumeString(Messages<MyMsg> msgs) {
List<MessageId> ackList = new ArrayList<>();
msgs.forEach((msg) -> {
System.out.println(msg);
ackList.add(msg.getMessageId());
});
return ackList;
}
}
- java Config 配置消息消费后,需手工确认的消息
将 @PulsarConsumer
注解的 batch
属性设置为 true
. 和 batchAckMode
属性设置为 BatchAckMode.MANUAL
@Component
class MyBatchConsumer {
@PulsarConsumer(topic = "my-topic",
clazz=MyMsg.class,
consumerName = "my-consumer",
subscriptionName = "my-subscription",
batch = true,
batchAckMode = BatchAckMode.MANUAL)
public void consumeString(Messages<MyMsg> msgs,Consumer<MyMsg> consumer) {
List<MessageId> ackList = new ArrayList<>();
msgs.forEach((msg) -> {
try {
System.out.println(msg);
ackList.add(msg.getMessageId());
} catch (Exception ex) {
System.err.println(ex.getMessage());
consumer.negativeAcknowledge(msg);
}
});
consumer.acknowledge(ackList);
}
}
- 最小化配置
solon.pulsar2.service-url=pulsar://localhost:6650
- 配置参考
Default configuration:
#PulsarClient
solon.pulsar2.service-url=pulsar://localhost:6650
solon.pulsar2.io-threads=10
solon.pulsar2.listener-threads=10
solon.pulsar2.enable-tcp-no-delay=false
solon.pulsar2.keep-alive-interval-sec=20
solon.pulsar2.connection-timeout-sec=10
solon.pulsar2.operation-timeout-sec=15
solon.pulsar2.starting-backoff-interval-ms=100
solon.pulsar2.max-backoff-interval-sec=10
solon.pulsar2.consumer-name-delimiter=
solon.pulsar2.namespace=default
solon.pulsar2.tenant=public
solon.pulsar2.auto-start=true
solon.pulsar2.allow-interceptor=false
#Consumer
solon.pulsar2.consumer.default.dead-letter-policy-max-redeliver-count=-1
solon.pulsar2.consumer.default.ack-timeout-ms=3000
TLS connection configuration:
solon.pulsar2.service-url=pulsar+ssl://localhost:6651
solon.pulsar2.tlsTrustCertsFilePath=/etc/pulsar/tls/ca.crt
solon.pulsar2.tlsCiphers=TLS_DH_RSA_WITH_AES_256_GCM_SHA384,TLS_DH_RSA_WITH_AES_256_CBC_SHA
solon.pulsar2.tlsProtocols=TLSv1.3,TLSv1.2
solon.pulsar2.allowTlsInsecureConnection=false
solon.pulsar2.enableTlsHostnameVerification=false
solon.pulsar2.tlsTrustStorePassword=brokerpw
solon.pulsar2.tlsTrustStorePath=/var/private/tls/broker.truststore.jks
solon.pulsar2.tlsTrustStoreType=JKS
solon.pulsar2.useKeyStoreTls=false
Pulsar client authentication (Only one of the options can be used)
# TLS
solon.pulsar2.tls-auth-cert-file-path=/etc/pulsar/tls/cert.cert.pem
solon.pulsar2.tls-auth-key-file-path=/etc/pulsar/tls/key.key-pk8.pem
#Token based
solon.pulsar2.token-auth-value=43th4398gh340gf34gj349gh304ghryj34fh
#OAuth2 based
solon.pulsar2.oauth2-issuer-url=https://accounts.google.com
solon.pulsar2.oauth2-credentials-url=file:/path/to/file
solon.pulsar2.oauth2-audience=https://broker.example.com
2、进阶用法
- 响应式 Reactor support ,待完善
@Configuration
public class MyFluxConsumers {
@Bean
public FluxConsumer myFluxConsumer(FluxConsumerFactory fluxConsumerFactory) {
return fluxConsumerFactory.newConsumer(
PulsarFluxConsumer.builder()
.setTopic("flux-topic")
.setConsumerName("flux-consumer")
.setSubscriptionName("flux-subscription")
.setMessageClass(MyMsg.class)
.build());
}
}
@Component
public class MyFluxConsumerService {
@Inject
private FluxConsumer myFluxConsumer;
public void subscribe() {
myFluxConsumer
.asSimpleFlux()
.subscribe(msg -> System.out.println(msg.getData()));
}
}
- (可选) 如果您希望手动确认消息,则可以以不同的方式配置您的消费者.
PulsarFluxConsumer.builder()
.setTopic("flux-topic")
.setConsumerName("flux-consumer")
.setSubscriptionName("flux-subscription")
.setMessageClass(MyMsg.class)
.setSimple(false) // This is your required change in bean configuration class
.build());
@Component
public class MyFluxConsumerService {
@Inject
private FluxConsumer myFluxConsumer;
public void subscribe() {
myFluxConsumer.asFlux()
.subscribe(msg -> {
try {
final MyMsg myMsg = (MyMsg) msg.getMessage().getValue();
System.out.println(myMsg.getData());
// you need to acknowledge the message manually on finished job
msg.getConsumer().acknowledge(msg.getMessage());
} catch (PulsarClientException e) {
// you need to negatively acknowledge the message manually on failures
msg.getConsumer().negativeAcknowledge(msg.getMessage());
}
});
}
}
3、调式模式
默认在日志文件输出 DEBUG
.
solon.pulsar2.allow-interceptor=true
默认注入 DefaultConsumerInterceptor
的实例.或者可自定义:
消费者Consumer Interceptor Example:
@Component
public class PulsarConsumerInterceptor extends DefaultConsumerInterceptor<Object> {
@Override
public Message beforeConsume(Consumer<Object> consumer, Message message) {
System.out.println("do something");
return super.beforeConsume(consumer, message);
}
}
生产者Producer Interceptor Example:
@Component
public class PulsarProducerInterceptor extends DefaultProducerInterceptor {
@Override
public Message beforeSend(Producer producer, Message message) {
super.beforeSend(producer, message);
System.out.println("do something");
return message;
}
@Override
public void onSendAcknowledgement(Producer producer, Message message, MessageId msgId, Throwable exception) {
super.onSendAcknowledgement(producer, message, msgId, exception);
}
}