ConsumerRecords类和老师的不一样
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class KafkaConsumerService {
@KafkaListener(groupId = "group02", topics = "lc")
public void onMessage(ConsumerRecords<String, Object> record, Acknowledgment acknowledgment, Consumer<?, ?> consumer) {
log.info("###################################record: " + record.value());
acknowledgment.acknowledge();
}
}
正在回答 回答被采纳积分+1
为什么我的record,没有value方法。 另外启动consumer微服务时,会报错。
Caused by: org.springframework.messaging.converter.MessageConversionException: Cannot convert from [java.lang.String] to [org.apache.kafka.clients.consumer.ConsumerRecords] for GenericMessage [payload=Hello kafka!9, headers={kafka_offset=81, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@4debf120, kafka_timestampType=CREATE_TIME, kafka_receivedMessageKey=null, kafka_receivedPartitionId=0, kafka_receivedTopic=lc, kafka_receivedTimestamp=1584430741297, kafka_acknowledgment=Acknowledgment for ConsumerRecord(topic = lc, partition = 0, leaderEpoch = 0, offset = 81, CreateTime = 1584430741297, serialized key size = -1, serialized value size = 13, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = Hello kafka!9), kafka_groupId=group02}]
at org.springframework.messaging.handler.annotation.support.PayloadMethodArgumentResolver.resolveArgument(PayloadMethodArgumentResolver.java:145) ~[spring-messaging-5.2.4.RELEASE.jar:5.2.4.RELEASE]
at org.springframework.kafka.annotation.KafkaListenerAnnotationBeanPostProcessor$KafkaNullAwarePayloadArgumentResolver.resolveArgument(KafkaListenerAnnotationBeanPostProcessor.java:906) ~[spring-kafka-2.3.6.RELEASE.jar:2.3.6.RELEASE]
at org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolverComposite.resolveArgument(HandlerMethodArgumentResolverComposite.java:117) ~[spring-messaging-5.2.4.RELEASE.jar:5.2.4.RELEASE]
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.getMethodArgumentValues(InvocableHandlerMethod.java:148) ~[spring-messaging-5.2.4.RELEASE.jar:5.2.4.RELEASE]
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:116) ~[spring-messaging-5.2.4.RELEASE.jar:5.2.4.RELEASE]
at org.springframework.kafka.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:48) ~[spring-kafka-2.3.6.RELEASE.jar:2.3.6.RELEASE]
at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:326) ~[spring-kafka-2.3.6.RELEASE.jar:2.3.6.RELEASE]
恭喜解决一个难题,获得1积分~
来为老师/同学的回答评分吧
0 星