::org.apache.kafka
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${kafka.version}</version>
</dependency>
1、描述
原始状态的 kafka 集成非常方便,也更适合定制。有些同学,可能对原始接口会比较陌生,会希望有个具体的示例。
完整的集成代码参考:
https://gitee.com/opensolon/solon-examples/tree/main/b.Integration/demoB001-kafka
希望更加简化使用的同学,可以使用:
kafka-solon-cloud-plugin (使用更简单,定制性弱些)
2、配置项示例
添加 yml 配置。并约定(也可按需定义):
- "solon.kafka",作为配置前缀
- "properties",作为公共配置
- "producer",作为生态者专属配置
- "consumer",作为消费者专属配置
具体的配置属性,参考自:ProducerConfig,ConsumerConfig
solon.app:
name: "demo-app"
group: "demo"
# 配置前缀,可以自由定义,与 @Bean 代码对应起来即可(以下为参考)
solon.kafka:
properties: #公共配置(配置项,参考:ProducerConfig,ConsumerConfig 的公用部分)
bootstrap:
servers: "127.0.0.1:9092"
key:
serializer: "org.apache.kafka.common.serialization.StringSerializer"
deserializer: "org.apache.kafka.common.serialization.StringDeserializer"
value:
serializer: "org.apache.kafka.common.serialization.StringSerializer"
deserializer: "org.apache.kafka.common.serialization.StringDeserializer"
producer: #生产者专属配置(配置项,参考:ProducerConfig)
acks: "all"
consumer: #消费者专属配置(配置项,参考:ConsumerConfig)
enable:
auto:
commit: "false"
isolation:
level: "read_committed"
group:
id: "${solon.app.group}:${solon.app.name}"
添加 java 配置器
@Configuration
public class KafkaConfig {
@Bean
public KafkaProducer<String, String> producer(@Inject("${solon.kafka.properties}") Properties common,
@Inject("${solon.kafka.producer}") Properties producer) {
Properties props = new Properties();
props.putAll(common);
props.putAll(producer);
return new KafkaProducer<>(props);
}
@Bean
public KafkaConsumer<String, String> consumer(@Inject("${solon.kafka.properties}") Properties common,
@Inject("${solon.kafka.consumer}") Properties consumer) {
Properties props = new Properties();
props.putAll(common);
props.putAll(consumer);
return new KafkaConsumer<>(props);
}
}
3、代码应用
发送(或生产),这里代控制器由用户请求再发送消息(仅供参考):
@Controller
public class DemoController {
@Inject
private KafkaProducer<String, String> producer;
@Mapping("/send")
public void send(String msg) {
//发送
producer.send(new ProducerRecord<>("topic.test", msg));
}
}
拉取(或消费),这里采用定时拦取方式:(仅供参考)
@Component
public class DemoJob {
@Inject
private KafkaConsumer<String, String> consumer;
@Init
public void init() {
//订阅
consumer.subscribe(Arrays.asList("topic.test"));
}
@Scheduled(fixedDelay = 10_000L, initialDelay = 10_000L)
public void job() throws Exception {
//拉取
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(10));
for (ConsumerRecord<String, String> record : records) {
System.out.println(record.value());
//确认
consumer.commitSync();
}
}
}