```xml
<dependency>
    <groupId>org.noear</groupId>
    <artifactId>pulsar2-solon-plugin</artifactId>
</dependency>
```


#### 1、基础使用参考

* 创建相应的消息体Java Bean

```java
public class MyMsg {
    private String data;
    
    public MyMsg() {}
    public MyMsg(String data) {
        this.data = data;
    }

    public String getData() {
        return data;
    }
}
```

* Java Config 配置生产者


```java
@Configuration
public class Config {

    @Bean
    public ProducerFactory producerFactory() {
        return new ProducerFactory()
            .addProducer("my-topic", MyMsg.class)
            .addProducer("other-topic", String.class);
    }
}

```
该插件已经默认注入 ` PulsarTemplate `  Java Bean 了，可以直接通过` @Inject `注解来获取到 ` PulsarTemplate `实例

```java
@Component
class MyProducer {

	@Inject
	private PulsarTemplate producer;

	void sendHelloWorld() throws PulsarClientException {
		producer.send("my-topic", new MyMsg("Hello world!"));
	}
}

```

* Java Config 配置消费者

 `@PulsarConsumer` 注解只能添加在方法上

```java
@Component
class MyConsumer {
    
    @PulsarConsumer(topic="my-topic", clazz=MyMsg.class)
    void consume(MyMsg msg) {
    	// TODO process your message
    	System.out.println(msg.getData());
    }
}
```

* Java Config 批量配置消费者

只需将 `@PulsarConsumer` 注解的 ` batch `属性设置为 `true`.

```java
@Component
class MyBatchConsumer {
    @PulsarConsumer(topic = "my-topic",
            clazz=MyMsg.class,
            consumerName = "my-consumer",
            subscriptionName = "my-subscription",
            batch = true)
    public void consumeString(Messages<MyMsg> msgs) {
    		msgs.forEach((msg) -> {
    				System.out.println(msg);
    		});
    	}
    		
}
```

* java Config 配置消息消费返回后确认的消息


将 `@PulsarConsumer` 注解的 ` batch `属性设置为 `true`.

```java
@Component
class MyBatchConsumer {
    @PulsarConsumer(topic = "my-topic",
            clazz=MyMsg.class,
            consumerName = "my-consumer",
            subscriptionName = "my-subscription",
            batch = true)
    public List<MessageId> consumeString(Messages<MyMsg> msgs) {
    		List<MessageId> ackList = new ArrayList<>();
    		msgs.forEach((msg) -> {
    				System.out.println(msg);
    				ackList.add(msg.getMessageId());
    		});
    		return ackList;
    	}
    		
}
```


* java Config 配置消息消费后，需手工确认的消息

将 `@PulsarConsumer` 注解的 ` batch `属性设置为 `true`. 和 ` batchAckMode ` 属性设置为 `BatchAckMode.MANUAL`

```java
@Component
class MyBatchConsumer {
    
    @PulsarConsumer(topic = "my-topic",
            clazz=MyMsg.class,
            consumerName = "my-consumer",
            subscriptionName = "my-subscription",
            batch = true,
            batchAckMode = BatchAckMode.MANUAL)
    public void consumeString(Messages<MyMsg> msgs,Consumer<MyMsg> consumer) {
    			List<MessageId> ackList = new ArrayList<>();
	    		msgs.forEach((msg) -> {
	    			try {
	    				System.out.println(msg);
	    				ackList.add(msg.getMessageId());
	    			} catch (Exception ex) {
		    			System.err.println(ex.getMessage());
	    				consumer.negativeAcknowledge(msg);
		    		}
	    		});
	    		consumer.acknowledge(ackList);
	}
    		
}
```


* 最小化配置

```properties

solon.pulsar2.service-url=pulsar://localhost:6650

```

* 配置参考

Default configuration:

```
#PulsarClient
solon.pulsar2.service-url=pulsar://localhost:6650
solon.pulsar2.io-threads=10
solon.pulsar2.listener-threads=10
solon.pulsar2.enable-tcp-no-delay=false
solon.pulsar2.keep-alive-interval-sec=20
solon.pulsar2.connection-timeout-sec=10
solon.pulsar2.operation-timeout-sec=15
solon.pulsar2.starting-backoff-interval-ms=100
solon.pulsar2.max-backoff-interval-sec=10
solon.pulsar2.consumer-name-delimiter=
solon.pulsar2.namespace=default
solon.pulsar2.tenant=public
solon.pulsar2.auto-start=true
solon.pulsar2.allow-interceptor=false

#Consumer
solon.pulsar2.consumer.default.dead-letter-policy-max-redeliver-count=-1
solon.pulsar2.consumer.default.ack-timeout-ms=3000
```

TLS connection configuration:

```
solon.pulsar2.service-url=pulsar+ssl://localhost:6651
solon.pulsar2.tlsTrustCertsFilePath=/etc/pulsar/tls/ca.crt
solon.pulsar2.tlsCiphers=TLS_DH_RSA_WITH_AES_256_GCM_SHA384,TLS_DH_RSA_WITH_AES_256_CBC_SHA
solon.pulsar2.tlsProtocols=TLSv1.3,TLSv1.2
solon.pulsar2.allowTlsInsecureConnection=false
solon.pulsar2.enableTlsHostnameVerification=false

solon.pulsar2.tlsTrustStorePassword=brokerpw
solon.pulsar2.tlsTrustStorePath=/var/private/tls/broker.truststore.jks
solon.pulsar2.tlsTrustStoreType=JKS

solon.pulsar2.useKeyStoreTls=false
```

Pulsar client authentication (Only one of the options can be used)

```
# TLS
solon.pulsar2.tls-auth-cert-file-path=/etc/pulsar/tls/cert.cert.pem
solon.pulsar2.tls-auth-key-file-path=/etc/pulsar/tls/key.key-pk8.pem

#Token based
solon.pulsar2.token-auth-value=43th4398gh340gf34gj349gh304ghryj34fh

#OAuth2 based
solon.pulsar2.oauth2-issuer-url=https://accounts.google.com
solon.pulsar2.oauth2-credentials-url=file:/path/to/file
solon.pulsar2.oauth2-audience=https://broker.example.com
```

#### 2、进阶用法

* 响应式 Reactor support ，待完善


```java
@Configuration
public class MyFluxConsumers {

    @Bean
    public FluxConsumer myFluxConsumer(FluxConsumerFactory fluxConsumerFactory) {
        return fluxConsumerFactory.newConsumer(
            PulsarFluxConsumer.builder()
                .setTopic("flux-topic")
                .setConsumerName("flux-consumer")
                .setSubscriptionName("flux-subscription")
                .setMessageClass(MyMsg.class)
                .build());
    }
}
```

```java
@Component
public class MyFluxConsumerService {
    
    @Inject
    private FluxConsumer myFluxConsumer;

    public void subscribe() {
        myFluxConsumer
            .asSimpleFlux()
            .subscribe(msg -> System.out.println(msg.getData()));
    }
}
```

*  (可选) 如果您希望手动确认消息，则可以以不同的方式配置您的消费者.

```java
PulsarFluxConsumer.builder()
    .setTopic("flux-topic")
    .setConsumerName("flux-consumer")
    .setSubscriptionName("flux-subscription")
    .setMessageClass(MyMsg.class)
    .setSimple(false) // This is your required change in bean configuration class
    .build());
```

```java
@Component
public class MyFluxConsumerService {
    
    @Inject
    private FluxConsumer myFluxConsumer;

    public void subscribe() {
        myFluxConsumer.asFlux()
            .subscribe(msg -> {
                try {
                    final MyMsg myMsg = (MyMsg) msg.getMessage().getValue();

                    System.out.println(myMsg.getData());

                    // you need to acknowledge the message manually on finished job
                    msg.getConsumer().acknowledge(msg.getMessage());
                } catch (PulsarClientException e) {
                    // you need to negatively acknowledge the message manually on failures
                    msg.getConsumer().negativeAcknowledge(msg.getMessage());
                }
            });
    }
}
```

#### 3、调式模式

默认在日志文件输出 `DEBUG` .

```properties
solon.pulsar2.allow-interceptor=true
```

默认注入 `DefaultConsumerInterceptor`的实例.或者可自定义:

*消费者Consumer Interceptor Example:*
```java
@Component
public class PulsarConsumerInterceptor extends DefaultConsumerInterceptor<Object> {
    @Override
    public Message beforeConsume(Consumer<Object> consumer, Message message) {
        System.out.println("do something");
        return super.beforeConsume(consumer, message);
    }
}
```

*生产者Producer Interceptor Example:*

```java
@Component
public class PulsarProducerInterceptor extends DefaultProducerInterceptor {

    @Override
    public Message beforeSend(Producer producer, Message message) {
        super.beforeSend(producer, message);
        System.out.println("do something");
        return message;
    }

    @Override
    public void onSendAcknowledgement(Producer producer, Message message, MessageId msgId, Throwable exception) {
        super.onSendAcknowledgement(producer, message, msgId, exception);
    }
}
```
