1.问题描述:
kafka服务器的版本 需要与我们springboot 版本对应,否则由于kafka版本差异,发送消息时可能出现问题;
kafka 与 springboot 对应关系: 官网:kafka 与 springboot 对应关系.
更老版本的对应如下图:  简而言之:kafka服务器安装的版本 与 我们使用的springboot 版本要对应;不对应就可能出问题,发消息报错、消息发出去主题里没有呀。。。。。。
2.解决方案:
- 更换kafka服务器的版本,与springboot版本匹配;
- 更换springboot版本,与kafka服务器安装的kafka版本匹配;
- springboot项目中,使用 kafka生产者时,使用原生API发送消息;监听的消费者,使用springboot的监听者;
方案1和方案2 代价较大,方案3 实测即可不受版本影响;
3.实际操作:
3.1 yml 配置:
kafka:
bootstrap-servers: 000.000.000.001:9092,000.000.000.002:9092
producer:
retries: 3
batch-size: 16384
buffer-memory: 33554432
acks: 1
consumer:
group-id: test
enable-auto-commit: false
auto-offset-reset: latest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
listener:
concurrency: 1
ack-mode: MANUAL
3.2 生产者:
3.2.1 配置 KafkaProducer:
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.List;
import java.util.Properties;
@Configuration
public class KafkaConfig {
@Value("#{'${spring.kafka.bootstrap-servers}'.split(',')}")
private List<String> bootstrapServers;
@Value("${spring.kafka.producer.retries}")
private int retries;
@Value("${spring.kafka.producer.batch-size}")
private int batchSize;
@Value("${spring.kafka.producer.buffer-memory}")
private int bufferMemory;
@Value("${spring.kafka.producer.acks}")
private String acks;
@Bean
public KafkaProducer<String, String> initKafkaTemplate() {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.RETRIES_CONFIG, retries);
props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize);
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory);
props.put(ProducerConfig.ACKS_CONFIG,acks);
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
return producer;
}
}
3.2.2 配置 MessageSenderClient :
import com.google.gson.Gson;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.concurrent.Future;
@Slf4j
@Component
public class MessageSenderClient {
@Resource
private KafkaProducer kafkaProducer;
public void send(String topic, Object data) {
if (StringUtils.isEmpty(topic) || data == null) {
throw new IllegalStateException("The send message parameter cannot be null");
}
try {
Gson gson=new Gson();
ProducerRecord<String, String> kafkaMessage = new ProducerRecord<>(topic,gson.toJson(data));
Future<RecordMetadata> metadataFuture = kafkaProducer.send(kafkaMessage);
RecordMetadata recordMetadata = metadataFuture.get();
log.info("MessageSenderClient kafka send Produce ok:{}", gson.toJson(data));
} catch (Exception e) {
log.info("MessageSenderClient kafka send error,", e);
}
}
public void send(String topic, Integer partition, Object data) {
if (StringUtils.isEmpty(topic) || data == null) {
throw new IllegalStateException("The send message parameter cannot be null");
}
if (partition == null) {
send(topic, data);
}
try {
Gson gson=new Gson();
ProducerRecord<String, String> kafkaMessage = new ProducerRecord<>(topic, partition, null,gson.toJson(data));
Future<RecordMetadata> metadataFuture = kafkaProducer.send(kafkaMessage);
RecordMetadata recordMetadata = metadataFuture.get();
log.info("MessageSenderClient kafka send Produce ok:{}", recordMetadata.toString());
} catch (Exception e) {
log.info("MessageSenderClient kafka send error,", e);
}
}
}
3.2.3 发送消息示例 :
@Slf4j
@SpringBootTest
public class KafkaTest {
@Autowired
private MessageSenderClient kafkaProducer;
@Test
void testKafkaProducer() {
kafkaProducer.send("GroupQueue", "payment发送一条新消息");
}
}
结果: 发送成功; 
3.3 消费者
3.3.1 消费者示例:
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class KafkaConsumerTest {
@KafkaListener(topics = {"GroupQueue"})
void onMessage1(ConsumerRecord<String, String> record, Acknowledgment acknowledgment) {
System.out.println("kafka consumer:" + record.topic() + "-" + record.partition() + "-" + record.value());
String value = record.value();
System.out.println("value:"+value);
acknowledgment.acknowledge();
}
}
3.3.2 消费者监听测试:
注意:由于我们kafka配置的从最新的开始消费,所以测试时,记得先启动消费者开启监听,生产者再发次消息;消费者才能收到最新的那条消息; 结果ok: 
4.小结:
本文不如不当之处,请各位大佬及时指正;
|