Macula Boot Starter Kafka
概述
该模块依赖spring-kafka接入kafka服务。
组件坐标
<dependency>
<groupId>dev.macula.boot</groupId>
<artifactId>macula-boot-starter-kafka</artifactId>
<version>${macula.version}</version>
</dependency>
使用配置
spring:
kafka:
bootstrap-servers: 197.168.25.196:9092 # 指定kafka server的地址,集群配多个,中间,逗号隔开
producer:
batch-size: 1000 # 批量发送的消息数量
buffer-memory: 33554432 # 32MB的批处理缓冲区
retries: 3 # 重试次数
consumer:
group-id: crm-user-service # 默认消费者组
auto-offset-reset: earliest # 最早未被消费的offset
max-poll-records: 4000 # 批量一次最大拉取数据量
enable-auto-commit: true # 是否自动提交
auto-commit-interval: 1000 # 自动提交时间间隔,单位ms
核心功能
发送数据
@RunWith(SpringRunner.class)
@SpringBootTest
public class KafkaProducerTest {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
@Test
public void testSend(){
for (int i = 0; i < 5000; i++) {
Map<String, Object> map = new LinkedHashMap<>();
map.put("datekey", 20210610);
map.put("userid", i);
map.put("salaryAmount", i);
//向kafka的big_data_topic主题推送数据
kafkaTemplate.send("big_data_topic", JSONObject.toJSONString(map));
}
}
}
消费数据
@Component
public class BigDataTopicListener {
private static final Logger log = LoggerFactory.getLogger(BigDataTopicListener.class);
/**
* 监听kafka数据
* @param consumerRecords
* @param ack
*/
@KafkaListener(topics = {"big_data_topic"})
public void consumer(ConsumerRecord<?, ?> consumerRecord) {
log.info("收到bigData推送的数据'{}'", consumerRecord.toString());
//...
//db.save(consumerRecord);//插入或者更新数据
}
}
批量消费模式
增加如下配置:
spring:
kafka:
listener:
type: BATCH
concurrency: 3 # 批消费并发量,小于或等于Topic的分区数
依赖引入
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<!-- optional - only needed when using kafka-streams -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
</dependency>
</dependencies>
版权说明
- spring-kafka:https://github.com/spring-projects/spring-kafka/blob/main/LICENCE.txt