刚开始运行是正常的,但是过了一会儿报错,这个是什么原因造成的?该如何解决呢?

刚开始运行是正常的,但是过了一会儿报错,这个是什么原因造成的?该如何解决呢?

https://img1.sycdn.imooc.com//climg/652a123c08e73cc716980733.jpg

相关代码如下:

相关代码:

public class TestMain {

    //测试LagMonitor
    public static void main(String[] args) {
        try(DIYKafkaConsumer consumer1 = new DIYKafkaConsumer();
            DIYKafkaConsumer consumer2 = new DIYKafkaConsumer();){
            while (true){
                LagMonitor.monitorSingleConsumer(consumer1, 100l, "second");
                LagMonitor.monitorSingleConsumer(consumer2, 200l, "second");
            }
        }catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

相关代码:

public class LagMonitor {

    /**
     * 监听某个consumer是否异常
     * @param consumer 需要指定的消费者
     * @param maxLag 指定消费者的lag阈值
     * @param topics 消费者所要消费的主题
     * @throws InterruptedException
     */
    public static void monitorSingleConsumer(KafkaConsumer<String, String> consumer, Long maxLag, String... topics) throws InterruptedException{
        //订阅指定的topic(指定的topic中后续如果接收到数据,就会被取出)
        consumer.subscribe(Arrays.asList(topics));
        consumer.poll(Duration.ofSeconds(1));
        long startLag = 0l;
        //休眠1分钟
//        TimeUnit.MINUTES.sleep(1);
        long endLag = startLag;

            for (String topic : topics) {
                List<PartitionInfo> partitionInfos = consumer.partitionsFor(topic);
                long endLagSingleTopic = partitionInfos.stream()
                        .mapToLong(info -> consumer.currentLag(new TopicPartition(topic, info.partition())).orElse(0))
                        .sum();
                endLag += endLagSingleTopic;
            }

        long totalLag = endLag - startLag;
        if (totalLag > maxLag) {
            System.out.println("消费者组:" + consumer.groupMetadata().groupId() + "中的消费者:" + consumer.groupMetadata().memberId() + ",可能出问题了");
        }
    }
}

相关代码:

public class DIYKafkaConsumer extends KafkaConsumer {
    private static Properties prop = new Properties();

    static {
        //指定kafka的broker地址
        prop.put("bootstrap.servers", "bigdata01:9092, bigdata02:9092, bigdata03:9092");
        //指定key-value的反序列化类型
        prop.put("key.deserializer", StringDeserializer.class.getName());
        prop.put("value.deserializer", StringDeserializer.class.getName());

        //指定消费者组
        prop.put("group.id", "group-diy");
    }

    public DIYKafkaConsumer() {
        super(prop);
    }
}

相关代码:

public class ProducerDemo {
    public static void main(String[] args) {
        Properties prop = new Properties();
        //指定kafka的broker地址
        prop.put("bootstrap.servers", "bigdata01:9092, bigdata02:9092, bigdata03:9092");
        //指定key-value数据的序列化格式
        prop.put("key.serializer", StringSerializer.class.getName());
        prop.put("value.serializer", StringSerializer.class.getName());

        try(
            //创建kafka生产者
            KafkaProducer<String, String> producer = new KafkaProducer<>(prop);
        ){
            //指定topic
            String topic = "second";
            //向topic中产生数据
            for (int i = 0; i < 1000; i++) {
                producer.send(new ProducerRecord<String, String>(topic, "hello,first"));
            }
        }
    }
}


正在回答

登陆购买课程后可参与讨论,去登陆

1回答

我看了下你的代码封装了好几层,并且里面还有静态代码块之类的东西。

1.先完善一下项目中日志的相关配置,保证可以正常输出日志信息,这样报错信息会更完整

2.前期先不要封装这么多层,静态代码块里面的东西也建议先移到外面

3.从目前的报错信息上来看属于消费这块的代码问题,所以建议使用常规的操作,先避免静态代码块之类的东西

  • 蒋昌魁 提问者 #1

    https://img1.sycdn.imooc.com//climg/652bcd520898e11212750742.jpg

    老师,使用这种方式也报了同样的错误。如果只建立一个消费者时,是不会报错的,一切正常。但是当有两个消费者的时候就会报错

    以下,是具体信息:

    2023-10-15 19:03:03,047 [main] [org.apache.kafka.common.utils.AppInfoParser] [INFO] - Kafka version: 3.5.0

    2023-10-15 19:03:03,047 [main] [org.apache.kafka.common.utils.AppInfoParser] [INFO] - Kafka commitId: c97b88d5db4de28d

    2023-10-15 19:03:03,048 [main] [org.apache.kafka.common.utils.AppInfoParser] [INFO] - Kafka startTimeMs: 1697367783047

    2023-10-15 19:03:03,050 [main] [xyz.jingri.test.DIYKafkaConsumer] [INFO] - [Consumer clientId=consumer-group-diy-1, groupId=group-diy] Subscribed to topic(s): second

    2023-10-15 19:03:03,051 [main] [xyz.jingri.test.DIYKafkaConsumer] [INFO] - [Consumer clientId=consumer-group-diy-2, groupId=group-diy] Subscribed to topic(s): second

    2023-10-15 19:03:03,785 [main] [org.apache.kafka.clients.Metadata] [INFO] - [Consumer clientId=consumer-group-diy-1, groupId=group-diy] Cluster ID: Mm0FMR4dQDKdW3Xg0cadsA

    2023-10-15 19:03:03,786 [main] [org.apache.kafka.clients.consumer.internals.ConsumerCoordinator] [INFO] - [Consumer clientId=consumer-group-diy-1, groupId=group-diy] Discovered group coordinator bigdata03:9092 (id: 2147483645 rack: null)

    2023-10-15 19:03:03,794 [main] [org.apache.kafka.clients.consumer.internals.ConsumerCoordinator] [INFO] - [Consumer clientId=consumer-group-diy-1, groupId=group-diy] (Re-)joining group

    2023-10-15 19:03:03,815 [main] [org.apache.kafka.clients.consumer.internals.ConsumerCoordinator] [INFO] - [Consumer clientId=consumer-group-diy-1, groupId=group-diy] Request joining group due to: need to re-join with the given member-id: consumer-group-diy-1-c50cff66-3d6f-4c7a-8b28-d24b82f42232

    2023-10-15 19:03:03,816 [main] [org.apache.kafka.clients.consumer.internals.ConsumerCoordinator] [INFO] - [Consumer clientId=consumer-group-diy-1, groupId=group-diy] Request joining group due to: rebalance failed due to 'The group member needs to have a valid member id before actually entering a consumer group.' (MemberIdRequiredException)

    2023-10-15 19:03:03,816 [main] [org.apache.kafka.clients.consumer.internals.ConsumerCoordinator] [INFO] - [Consumer clientId=consumer-group-diy-1, groupId=group-diy] (Re-)joining group

    2023-10-15 19:03:12,238 [main] [org.apache.kafka.clients.consumer.internals.ConsumerCoordinator] [INFO] - [Consumer clientId=consumer-group-diy-2, groupId=group-diy] Resetting generation and member id due to: consumer pro-actively leaving the group

    2023-10-15 19:03:12,239 [main] [org.apache.kafka.clients.consumer.internals.ConsumerCoordinator] [INFO] - [Consumer clientId=consumer-group-diy-2, groupId=group-diy] Request joining group due to: consumer pro-actively leaving the group

    2023-10-15 19:03:12,258 [main] [org.apache.kafka.common.metrics.Metrics] [INFO] - Metrics scheduler closed

    2023-10-15 19:03:12,258 [main] [org.apache.kafka.common.metrics.Metrics] [INFO] - Closing reporter org.apache.kafka.common.metrics.JmxReporter

    2023-10-15 19:03:12,259 [main] [org.apache.kafka.common.metrics.Metrics] [INFO] - Metrics reporters closed

    2023-10-15 19:03:12,292 [main] [org.apache.kafka.common.utils.AppInfoParser] [INFO] - App info kafka.consumer for consumer-group-diy-2 unregistered

    2023-10-15 19:03:41,865 [main] [org.apache.kafka.clients.consumer.internals.ConsumerCoordinator] [INFO] - [Consumer clientId=consumer-group-diy-1, groupId=group-diy] Member consumer-group-diy-1-c50cff66-3d6f-4c7a-8b28-d24b82f42232 sending LeaveGroup request to coordinator bigdata03:9092 (id: 2147483645 rack: null) due to the consumer is being closed

    2023-10-15 19:03:41,868 [main] [org.apache.kafka.clients.consumer.internals.ConsumerCoordinator] [INFO] - [Consumer clientId=consumer-group-diy-1, groupId=group-diy] Resetting generation and member id due to: consumer pro-actively leaving the group

    2023-10-15 19:03:41,869 [main] [org.apache.kafka.clients.consumer.internals.ConsumerCoordinator] [INFO] - [Consumer clientId=consumer-group-diy-1, groupId=group-diy] Request joining group due to: consumer pro-actively leaving the group

    2023-10-15 19:03:41,878 [main] [org.apache.kafka.common.metrics.Metrics] [INFO] - Metrics scheduler closed

    2023-10-15 19:03:41,879 [main] [org.apache.kafka.common.metrics.Metrics] [INFO] - Closing reporter org.apache.kafka.common.metrics.JmxReporter

    2023-10-15 19:03:41,880 [main] [org.apache.kafka.common.metrics.Metrics] [INFO] - Metrics reporters closed

    2023-10-15 19:03:41,931 [main] [org.apache.kafka.common.utils.AppInfoParser] [INFO] - App info kafka.consumer for consumer-group-diy-1 unregistered

    Exception in thread "main" java.lang.IllegalStateException: No current assignment for partition second-0

    at org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:369)

    at org.apache.kafka.clients.consumer.internals.SubscriptionState.partitionLag(SubscriptionState.java:541)

    at org.apache.kafka.clients.consumer.KafkaConsumer.currentLag(KafkaConsumer.java:2292)

    at xyz.jingri.test.LagMonitor.lambda$monitorSingleConsumer$0(LagMonitor.java:40)

    at java.util.stream.ReferencePipeline$5$1.accept(ReferencePipeline.java:227)

    at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1384)

    at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)

    at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)

    at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)

    at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)

    at java.util.stream.LongPipeline.reduce(LongPipeline.java:439)

    at java.util.stream.LongPipeline.sum(LongPipeline.java:397)

    at xyz.jingri.test.LagMonitor.monitorSingleConsumer(LagMonitor.java:41)

    at xyz.jingri.test.TestMain.main(TestMain.java:33)

    Disconnected from the target VM, address: '127.0.0.1:62597', transport: 'socket'


    2023-10-15 19:35:19
  • 蒋昌魁 提问者 #2

    如果只有一个消费者,但是如果开启循环,也会报错;目前我自己尝试了下,只有当不开启循环且只有一个消费者运行时不报错

    2023-10-15 19:37:01
  • 徐老师 回复 提问者 蒋昌魁 #3
    你的代码里面两个消费者的监控代码里面指定的都是同一个topic吧,不要重复指定
    2023-10-15 19:43:07
问题已解决,确定采纳
还有疑问,暂不采纳

恭喜解决一个难题,获得1积分~

来为老师/同学的回答评分吧

0 星
请稍等 ...
意见反馈 帮助中心 APP下载
官方微信

在线咨询

领取优惠

免费试听

领取大纲

扫描二维码,添加
你的专属老师