::com.rabbitmq
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>${rabbitmq.version}</version>
</dependency>
1、描述
原始状态的 rabbitmq 集成非常方便,也更适合定制。有些同学,可能对原始接口会比较陌生,会希望有个具体的示例。
完整的集成代码参考:
https://gitee.com/opensolon/solon-examples/tree/main/b.Integration/demoB003-rabbitmq
希望更加简化使用的同学,可以使用:
rabbitmq-solon-cloud-plugin (使用更简单,定制性弱些)
2、配置项示例
添加 yml 配置。并约定(也可按需定义):
- "solon.rabbitmq",作为配置前缀
- "properties",作为公共配置
- "producer",作为生态者专属配置
- "consumer",作为消费者专属配置
具体的配置属性,参考自:ConnectionFactory,Channel
solon.app:
name: "demo-app"
group: "demo"
# 配置可以自由定义,与 @Bean 代码对应起来即可(以下为参考)
solon.rabbitmq:
properties: #公共配置(配置项,参考:ConnectionFactory)
host: "127.0.0.1"
port: "5672"
virtualHost: "/"
username: "root"
password: "123456"
automaticRecovery: true
networkRecoveryInterval: 5000
producer: #生产者专属配置(配置项,参考:Channel)
waitForConfirms: 0
consumer: #消费者专属配置(配置项,参考:Channel)
basicQos:
prefetchCount: 10
prefetchSize: 0
global: false
queueDeclare:
queue: "${solon.app.group}_${solon.app.name}"
durable: true
exclusive: false
autoDelete: false
arguments: {}
添加 java 配置器
@Configuration
public class RabbitmqConfig {
public static final String EXCHANGE_NAME = "demo-exchange";
@Bean
public Channel client(@Inject("${solon.rabbitmq.properties}") Properties common) throws Exception {
ConnectionFactory connectionFactory = new ConnectionFactory();
// 注入属性
Utils.injectProperties(connectionFactory, common);
// 创建连接与通道
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
// 配置交换机
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
return channel;
}
@Bean
public void producer(@Inject("${solon.rabbitmq.producer}") Props producer,
Channel channel) throws Exception {
//申明需要发布确认(以提高可靠性)
channel.confirmSelect();
long waitForConfirms = producer.getLong("waitForConfirms", 0L);
}
@Bean
public void consumer(@Inject("${solon.rabbitmq.consumer}") Props consumer,
Channel channel,
Consumer messageConsumer) throws Exception {
// for basicQos
int prefetchCount = consumer.getInt("basicQos.prefetchCount", 10);
int prefetchSize = consumer.getInt("basicQos.prefetchSize", 0);
boolean global = consumer.getBool("basicQos.global", false);
channel.basicQos(prefetchSize, prefetchCount, global);
// for queueDeclare
String queue = consumer.get("queueDeclare.queue");
boolean durable = consumer.getBool("queueDeclare.durable", true);
boolean exclusive = consumer.getBool("queueDeclare.exclusive", false);
boolean autoDelete = consumer.getBool("queueDeclare.autoDelete", false);
Map arguments = consumer.getMap("queueDeclare.arguments");
channel.queueDeclare(queue, durable, exclusive, autoDelete, arguments);
channel.queueBind(queue, EXCHANGE_NAME, queue);
//for basicConsume
channel.basicConsume(queue, false, messageConsumer);
}
}
3、代码应用
发送(或生产),这里代控制器由用户请求再发送消息(仅供参考):
@Controller
public class DemoController {
@Inject
private Channel producer;
@Mapping("/send")
public void send(String msg) throws IOException {
//发送
AMQP.BasicProperties msgProperties = new AMQP.BasicProperties();
producer.basicPublish(RabbitmqConfig.EXCHANGE_NAME, "topic.test", msgProperties, msg.getBytes());
}
}
监听(或消费),这里采用订阅回调的方式:(仅供参考)
@Component
public class DemoMessageConsumer extends DefaultConsumer implements Consumer {
public DemoMessageConsumer(Channel channel) {
super(channel);
}
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println(body);
getChannel().basicAck(envelope.getDeliveryTag(), false);
}
}