Solon v3.0.4.1

::com.rabbitmq

</> markdown
<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>${rabbitmq.version}</version>
</dependency>

1、描述

原始状态的 rabbitmq 集成非常方便,也更适合定制。有些同学,可能对原始接口会比较陌生,会希望有个具体的示例。

完整的集成代码参考:

https://gitee.com/opensolon/solon-examples/tree/main/b.Integration/demoB003-rabbitmq

希望更加简化使用的同学,可以使用:

rabbitmq-solon-cloud-plugin (使用更简单,定制性弱些)

2、配置项示例

添加 yml 配置。并约定(也可按需定义):

  • "solon.rabbitmq",作为配置前缀
  • "properties",作为公共配置
  • "producer",作为生态者专属配置
  • "consumer",作为消费者专属配置

具体的配置属性,参考自:ConnectionFactory,Channel

solon.app:
  name: "demo-app"
  group: "demo"

# 配置可以自由定义,与 @Bean 代码对应起来即可(以下为参考)
solon.rabbitmq:
  properties:  #公共配置(配置项,参考:ConnectionFactory)
    host: "127.0.0.1"
    port: "5672"
    virtualHost: "/"
    username: "root"
    password: "123456"
    automaticRecovery: true
    networkRecoveryInterval: 5000
  producer: #生产者专属配置(配置项,参考:Channel)
    waitForConfirms: 0
  consumer: #消费者专属配置(配置项,参考:Channel)
    basicQos:
      prefetchCount: 10
      prefetchSize: 0
      global: false
    queueDeclare:
      queue: "${solon.app.group}_${solon.app.name}"
      durable: true
      exclusive: false
      autoDelete: false
      arguments: {}

添加 java 配置器

@Configuration
public class RabbitmqConfig {
    public static final String EXCHANGE_NAME = "demo-exchange";

    @Bean
    public Channel client(@Inject("${solon.rabbitmq.properties}") Properties common) throws Exception {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        // 注入属性
        Utils.injectProperties(connectionFactory, common);

        // 创建连接与通道
        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();

        // 配置交换机
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);

        return channel;
    }

    @Bean
    public void producer(@Inject("${solon.rabbitmq.producer}") Props producer,
                         Channel channel) throws Exception {

        //申明需要发布确认(以提高可靠性)
        channel.confirmSelect();
        long waitForConfirms = producer.getLong("waitForConfirms", 0L);
    }

    @Bean
    public void consumer(@Inject("${solon.rabbitmq.consumer}") Props consumer,
                         Channel channel,
                         Consumer messageConsumer) throws Exception {

        // for basicQos
        int prefetchCount = consumer.getInt("basicQos.prefetchCount", 10);
        int prefetchSize = consumer.getInt("basicQos.prefetchSize", 0);
        boolean global = consumer.getBool("basicQos.global", false);
        channel.basicQos(prefetchSize, prefetchCount, global);

        // for queueDeclare
        String queue = consumer.get("queueDeclare.queue");
        boolean durable = consumer.getBool("queueDeclare.durable", true);
        boolean exclusive = consumer.getBool("queueDeclare.exclusive", false);
        boolean autoDelete = consumer.getBool("queueDeclare.autoDelete", false);
        Map arguments = consumer.getMap("queueDeclare.arguments");

        channel.queueDeclare(queue, durable, exclusive, autoDelete, arguments);
        channel.queueBind(queue, EXCHANGE_NAME, queue);

        //for basicConsume
        channel.basicConsume(queue, false, messageConsumer);
    }
}

3、代码应用

发送(或生产),这里代控制器由用户请求再发送消息(仅供参考):

@Controller
public class DemoController {
    @Inject
    private Channel producer;

    @Mapping("/send")
    public void send(String msg) throws IOException {
        //发送
        AMQP.BasicProperties msgProperties = new AMQP.BasicProperties();
        producer.basicPublish(RabbitmqConfig.EXCHANGE_NAME, "topic.test", msgProperties, msg.getBytes());
    }
}

监听(或消费),这里采用订阅回调的方式:(仅供参考)

@Component
public class DemoMessageConsumer extends DefaultConsumer implements Consumer {

    public DemoMessageConsumer(Channel channel) {
        super(channel);
    }

    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        System.out.println(body);

        getChannel().basicAck(envelope.getDeliveryTag(), false);
    }
}