刚开始运行是正常的,但是过了一会儿报错,这个是什么原因造成的?该如何解决呢?
相关代码如下:
相关代码:
public class TestMain { //测试LagMonitor public static void main(String[] args) { try(DIYKafkaConsumer consumer1 = new DIYKafkaConsumer(); DIYKafkaConsumer consumer2 = new DIYKafkaConsumer();){ while (true){ LagMonitor.monitorSingleConsumer(consumer1, 100l, "second"); LagMonitor.monitorSingleConsumer(consumer2, 200l, "second"); } }catch (InterruptedException e) { e.printStackTrace(); } } }
相关代码:
public class LagMonitor { /** * 监听某个consumer是否异常 * @param consumer 需要指定的消费者 * @param maxLag 指定消费者的lag阈值 * @param topics 消费者所要消费的主题 * @throws InterruptedException */ public static void monitorSingleConsumer(KafkaConsumer<String, String> consumer, Long maxLag, String... topics) throws InterruptedException{ //订阅指定的topic(指定的topic中后续如果接收到数据,就会被取出) consumer.subscribe(Arrays.asList(topics)); consumer.poll(Duration.ofSeconds(1)); long startLag = 0l; //休眠1分钟 // TimeUnit.MINUTES.sleep(1); long endLag = startLag; for (String topic : topics) { List<PartitionInfo> partitionInfos = consumer.partitionsFor(topic); long endLagSingleTopic = partitionInfos.stream() .mapToLong(info -> consumer.currentLag(new TopicPartition(topic, info.partition())).orElse(0)) .sum(); endLag += endLagSingleTopic; } long totalLag = endLag - startLag; if (totalLag > maxLag) { System.out.println("消费者组:" + consumer.groupMetadata().groupId() + "中的消费者:" + consumer.groupMetadata().memberId() + ",可能出问题了"); } } }
相关代码:
public class DIYKafkaConsumer extends KafkaConsumer { private static Properties prop = new Properties(); static { //指定kafka的broker地址 prop.put("bootstrap.servers", "bigdata01:9092, bigdata02:9092, bigdata03:9092"); //指定key-value的反序列化类型 prop.put("key.deserializer", StringDeserializer.class.getName()); prop.put("value.deserializer", StringDeserializer.class.getName()); //指定消费者组 prop.put("group.id", "group-diy"); } public DIYKafkaConsumer() { super(prop); } }
相关代码:
public class ProducerDemo { public static void main(String[] args) { Properties prop = new Properties(); //指定kafka的broker地址 prop.put("bootstrap.servers", "bigdata01:9092, bigdata02:9092, bigdata03:9092"); //指定key-value数据的序列化格式 prop.put("key.serializer", StringSerializer.class.getName()); prop.put("value.serializer", StringSerializer.class.getName()); try( //创建kafka生产者 KafkaProducer<String, String> producer = new KafkaProducer<>(prop); ){ //指定topic String topic = "second"; //向topic中产生数据 for (int i = 0; i < 1000; i++) { producer.send(new ProducerRecord<String, String>(topic, "hello,first")); } } } }
6
收起
正在回答
1回答
我看了下你的代码封装了好几层,并且里面还有静态代码块之类的东西。
1.先完善一下项目中日志的相关配置,保证可以正常输出日志信息,这样报错信息会更完整
2.前期先不要封装这么多层,静态代码块里面的东西也建议先移到外面
3.从目前的报错信息上来看属于消费这块的代码问题,所以建议使用常规的操作,先避免静态代码块之类的东西
大数据工程师 2024 版
- 参与学习 1151 人
- 提交作业 5960 份
- 解答问题 1144 个
不用Java初级内容充数!不用与大数据岗位无关内容占课时!我们做的就是“精华版”大数据课程
了解课程
恭喜解决一个难题,获得1积分~
来为老师/同学的回答评分吧
0 星