刚开始运行是正常的,但是过了一会儿报错,这个是什么原因造成的?该如何解决呢?

相关代码如下:
相关代码:
public class TestMain {
//测试LagMonitor
public static void main(String[] args) {
try(DIYKafkaConsumer consumer1 = new DIYKafkaConsumer();
DIYKafkaConsumer consumer2 = new DIYKafkaConsumer();){
while (true){
LagMonitor.monitorSingleConsumer(consumer1, 100l, "second");
LagMonitor.monitorSingleConsumer(consumer2, 200l, "second");
}
}catch (InterruptedException e) {
e.printStackTrace();
}
}
}相关代码:
public class LagMonitor {
/**
* 监听某个consumer是否异常
* @param consumer 需要指定的消费者
* @param maxLag 指定消费者的lag阈值
* @param topics 消费者所要消费的主题
* @throws InterruptedException
*/
public static void monitorSingleConsumer(KafkaConsumer<String, String> consumer, Long maxLag, String... topics) throws InterruptedException{
//订阅指定的topic(指定的topic中后续如果接收到数据,就会被取出)
consumer.subscribe(Arrays.asList(topics));
consumer.poll(Duration.ofSeconds(1));
long startLag = 0l;
//休眠1分钟
// TimeUnit.MINUTES.sleep(1);
long endLag = startLag;
for (String topic : topics) {
List<PartitionInfo> partitionInfos = consumer.partitionsFor(topic);
long endLagSingleTopic = partitionInfos.stream()
.mapToLong(info -> consumer.currentLag(new TopicPartition(topic, info.partition())).orElse(0))
.sum();
endLag += endLagSingleTopic;
}
long totalLag = endLag - startLag;
if (totalLag > maxLag) {
System.out.println("消费者组:" + consumer.groupMetadata().groupId() + "中的消费者:" + consumer.groupMetadata().memberId() + ",可能出问题了");
}
}
}相关代码:
public class DIYKafkaConsumer extends KafkaConsumer {
private static Properties prop = new Properties();
static {
//指定kafka的broker地址
prop.put("bootstrap.servers", "bigdata01:9092, bigdata02:9092, bigdata03:9092");
//指定key-value的反序列化类型
prop.put("key.deserializer", StringDeserializer.class.getName());
prop.put("value.deserializer", StringDeserializer.class.getName());
//指定消费者组
prop.put("group.id", "group-diy");
}
public DIYKafkaConsumer() {
super(prop);
}
}相关代码:
public class ProducerDemo {
public static void main(String[] args) {
Properties prop = new Properties();
//指定kafka的broker地址
prop.put("bootstrap.servers", "bigdata01:9092, bigdata02:9092, bigdata03:9092");
//指定key-value数据的序列化格式
prop.put("key.serializer", StringSerializer.class.getName());
prop.put("value.serializer", StringSerializer.class.getName());
try(
//创建kafka生产者
KafkaProducer<String, String> producer = new KafkaProducer<>(prop);
){
//指定topic
String topic = "second";
//向topic中产生数据
for (int i = 0; i < 1000; i++) {
producer.send(new ProducerRecord<String, String>(topic, "hello,first"));
}
}
}
}6
收起
正在回答
1回答
我看了下你的代码封装了好几层,并且里面还有静态代码块之类的东西。
1.先完善一下项目中日志的相关配置,保证可以正常输出日志信息,这样报错信息会更完整
2.前期先不要封装这么多层,静态代码块里面的东西也建议先移到外面
3.从目前的报错信息上来看属于消费这块的代码问题,所以建议使用常规的操作,先避免静态代码块之类的东西

恭喜解决一个难题,获得1积分~
来为老师/同学的回答评分吧
0 星