::org.apache.rocketmq
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client-java</artifactId>
<version>${rocketmq5.version}</version>
</dependency>
1、描述
原始状态的 rocketmq5 集成非常方便,也更适合定制。有些同学,可能对原始接口会比较陌生,会希望有个具体的示例。
完整的集成代码参考:
https://gitee.com/opensolon/solon-examples/tree/main/b.Integration/demoB002-rocketmq5
希望更加简化使用的同学,可以使用:
rocketmq5-solon-cloud-plugin (使用更简单,定制性弱些)
2、配置项示例
添加 yml 配置。并约定(也可按需定义):
- "solon.rocketmq",作为配置前缀
- "properties",作为公共配置
- "producer",作为生态者专属配置
- "consumer",作为消费者专属配置
具体的配置属性,参考自:ClientConfigurationBuilder,ProducerBuilder, PushConsumerBuilder
solon.app:
name: "demo-app"
group: "demo"
# 配置可以自由定义,与 @Bean 代码对应起来即可(以下为参考)
solon.rocketmq:
properties: #公共配置(配置项,参考:ClientConfigurationBuilder)
endpoints: "127.0.0.1:8081"
sessionCredentialsProvider:
"@type": "demoB002.SessionCredentialsProviderImpl" # solon 支持 "@type" 类型申明当前实例数据
accessKey: "xxx"
accessSecret: "xxx"
securityToken: "xxx"
requestTimeout: "10s"
producer: #生产者专属配置(配置项,参考:ProducerBuilder)
maxAttempts: 3
consumer: #消费者专属配置(配置项,参考:PushConsumerBuilder)
consumerGroup: "${solon.app.group}_${solon.app.name}"
consumptionThreadCount: 2
maxCacheMessageCount: 1
maxCacheMessageSizeInBytes: 1
添加 java 配置器
@Configuration
public class RocketmqConfig {
private ClientServiceProvider clientProvider = ClientServiceProvider.loadService();
@Bean
public ClientConfiguration client(@Inject("${solon.rocketmq.properties}") Properties common){
ClientConfigurationBuilder builder = ClientConfiguration.newBuilder();
//注入属性
Utils.injectProperties(builder, common);
return builder.build();
}
@Bean
public Producer producer(@Inject("${solon.rocketmq.producer}") Properties producer,
ClientConfiguration clientConfiguration) throws ClientException {
ProducerBuilder producerBuilder = clientProvider.newProducerBuilder();
//注入属性
if (producer.size() > 0) {
Utils.injectProperties(producerBuilder, producer);
}
producerBuilder.setClientConfiguration(clientConfiguration);
return producerBuilder.build();
}
@Bean
public PushConsumer consumer(@Inject("${solon.rocketmq.consumer}") Properties consumer,
ClientConfiguration clientConfiguration,
MessageListener messageListener) throws ClientException{
//按需选择 PushConsumerBuilder 或 SimpleConsumerBuilder
PushConsumerBuilder consumerBuilder = clientProvider.newPushConsumerBuilder();
//注入属性
Utils.injectProperties(consumerBuilder, consumer);
Map<String, FilterExpression> subscriptionExpressions = new HashMap<>();
subscriptionExpressions.put("topic.test", new FilterExpression("*"));
consumerBuilder.setSubscriptionExpressions(subscriptionExpressions);
consumerBuilder.setClientConfiguration(clientConfiguration);
consumerBuilder.setMessageListener(messageListener);
return consumerBuilder.build();
}
}
//这个实现类,(相对于 StaticSessionCredentialsProvider)方便配置自动注入
public class SessionCredentialsProviderImpl implements SessionCredentialsProvider {
private String accessKey;
private String accessSecret;
private String securityToken;
private SessionCredentials sessionCredentials;
@Override
public SessionCredentials getSessionCredentials() {
if (sessionCredentials == null) {
if (securityToken == null) {
sessionCredentials = new SessionCredentials(accessKey, accessSecret);
} else {
sessionCredentials = new SessionCredentials(accessKey, accessSecret, securityToken);
}
}
return sessionCredentials;
}
}
3、代码应用
发送(或生产),这里代控制器由用户请求再发送消息(仅供参考):
@Controller
public class DemoController {
@Inject
private Producer producer;
@Mapping("/send")
public void send(String msg) throws ClientException {
//发送
producer.send(new MessageBuilderImpl()
.setTopic("topic.test")
.setBody(msg.getBytes())
.build());
}
}
监听(或消费),这里采用订阅回调的方式:(仅供参考)
@Component
public class DemoMessageListener implements MessageListener {
@Override
public ConsumeResult consume(MessageView messageView) {
System.out.println(messageView);
return ConsumeResult.SUCCESS;
}
}