任务提交后,刚开始显示正常运行,但是时不时运行中的任务会报错又重连再报错

任务提交后,刚开始显示正常运行,但是时不时运行中的任务会报错又重连再报错

老师,提交任务成功后,在hdfs中可以成功创建出表的schema文件,但是运行的任务会间断性时不时的出现如下错误,向kafka中生产数据,也没有在hdfs上生成数据文件,网上找了下相关原因,但是资料比较少,三个异常都去找了下,也不清楚到底是哪里的问题

2023-12-20 21:29:14

java.lang.RuntimeException: One or more fetchers have encountered exception

    at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager.checkErrors(SplitFetcherManager.java:261)

    at org.apache.flink.connector.base.source.reader.SourceReaderBase.getNextFetch(SourceReaderBase.java:169)

    at org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:131)

    at org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:417)

    at org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68)

    at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)

    at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:550)

    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)

    at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:839)

    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:788)

    at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952)

    at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:931)

    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745)

    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)

    at java.lang.Thread.run(Thread.java:748)

Caused by: java.lang.RuntimeException: SplitFetcher thread 0 received unexpected exception while polling the records

    at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:165)

    at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:114)

    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)

    at java.util.concurrent.FutureTask.run(FutureTask.java:266)

    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)

    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)

    ... 1 more

Caused by: org.apache.kafka.common.errors.TimeoutException: Timeout of 60000ms expired before the position for partition paimon_word-1 could be determined

任务代码:

object FlinkSqlWriteToPaimonForBucket_1 {
  def main(args: Array[String]): Unit = {
    //创建执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setRuntimeMode(RuntimeExecutionMode.STREAMING)
    //设置全局并行度为5,因为数据源topic的分区和结果表bucket都是5(一一对应可以使得效率最高)
    env.setParallelism(5)
    //注意:在流处理模式中,操作Paimon表时需要开启Checkpoint。
    env.enableCheckpointing(5000)
    //获取Checkpoint的配置对象
    val cpConfig = env.getCheckpointConfig
    //在任务故障和手工停止任务时都会保留之前生成的Checkpoint数据
    cpConfig.setExternalizedCheckpointCleanup(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
    //设置Checkpoint后的状态数据的存储位置
    cpConfig.setCheckpointStorage("hdfs://bigdata01:9000/flink-chk/word_filter")
    val tEnv = StreamTableEnvironment.create(env)

    //创建数据源表-普通表
    //注意:此时这个表示在Flink SQL中默认的Catalog里面创建的
    tEnv.executeSql(
      """
        |CREATE TABLE word_source(
        |    id BIGINT,
        |    word STRING
        |)WITH(
        |    'connector' = 'kafka',
        |    'topic' = 'paimon_word',
        |    'properties.bootstrap.servers' = 'bigdata01:9092,bigdata02:9092,bigdata03:9092',
        |    'properties.group.id' = 'gid-paimon-1',
        |    'scan.startup.mode' = 'group-offsets',
        |    'properties.auto.offset.reset' = 'latest',
        |    'format' = 'json',
        |    'json.fail-on-missing-field' = 'false',
        |    'json.ignore-parse-errors' = 'true'
        |)
        |""".stripMargin)

    //创建Paimon类型的Catalog
    tEnv.executeSql(
      """
        |CREATE CATALOG paimon_catalog WITH(
        |    'type'='paimon',
        |    'warehouse'='hdfs://bigdata01:9000/paimon'
        |)
        |""".stripMargin)
    tEnv.executeSql("USE CATALOG paimon_catalog")

    //创建目的地表-Paimon表
    tEnv.executeSql(
      """
        |CREATE TABLE IF NOT EXISTS word_filter(
        |    id BIGINT,
        |    word STRING,
        |    dt STRING,
        |    PRIMARY KEY (id, dt) NOT ENFORCED
        |)PARTITIONED BY (dt) WITH (
        |    'bucket' = '5'
        |)
        |""".stripMargin)

    //向目的地表中写入数据
    tEnv.executeSql(
      """
        |INSERT INTO `paimon_catalog`.`default`.`word_filter`
        |SELECT
        |    id,
        |    word,
        |    '20230101' AS dt
        |FROM `default_catalog`.`default_database`.`word_source`
        |-- 此处不能使用!=,需要使用<>
        |WHERE word <> 'hello11'
        |""".stripMargin)
  }
}





正在回答 回答被采纳积分+1

登陆购买课程后可参与讨论,去登陆

1回答
提问者 蒋昌魁 2023-12-20 22:36:12

困惑了大半天,在本地将参数改成了earliest跑了下,找到了原因,原来是kafka集群的三个节点中,不知道什么时候挂了2个节点,所以由于没有协同者,无法正常运行。重启kafka集群后跑通了

https://img1.sycdn.imooc.com/climg/6582fbb309a6b76c17060777.jpg

  • 核心错误是这一行:

    Caused by: org.apache.kafka.common.errors.TimeoutException: Timeout of 60000ms expired before the position for partition paimon_word-1 could be determined


    这行错误说明是从kafka中读取数据的时候超时了。

    2023-12-21 09:54:15
  • 提问者 蒋昌魁 回复 徐老师 #2

    老师,这个问题还没有完全解决,本地没有问题了,昨晚已经跑通了。但是提交到集群之后,会有任务不断挂掉又重启,提示的还是这种错误。我检查了下本地和集群中的环境,打包的时候也是按照视频打包的,该provided都备注上了,并且也把flink lib目录下的kafka connector相关的jar包增加了后缀,不过重复了很多次,集群执行都有这个异常,不过本地执行又完全没问题

    2023-12-21 11:31:43

    java.lang.RuntimeException: One or more fetchers have encountered exception

        at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager.checkErrors(SplitFetcherManager.java:261)

        at org.apache.flink.connector.base.source.reader.SourceReaderBase.getNextFetch(SourceReaderBase.java:169)

        at org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:131)

        at org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:417)

        at org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68)

        at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)

        at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:550)

        at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)

        at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:839)

        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:788)

        at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952)

        at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:931)

        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745)

        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)

        at java.lang.Thread.run(Thread.java:748)

    Caused by: java.lang.RuntimeException: SplitFetcher thread 0 received unexpected exception while polling the records

        at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:165)

        at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:114)

        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)

        at java.util.concurrent.FutureTask.run(FutureTask.java:266)

        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)

        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)

        ... 1 more

    Caused by: org.apache.kafka.common.errors.TimeoutException: Timeout of 60000ms expired before the position for partition paimon_word-4 could be determined



    2023-12-21 11:37:57
  • 提问者 蒋昌魁 回复 徐老师 #3

    执行语句如下:

    /bin/flink run -m yarn-cluster -c xyz.jingri.paimon.rescalebucket.FlinkSqlWriteToPaimonForBucket_1 -yjm 1024 -ytm 1024 /root/db_paimon-1.0-SNAPSHOT.jar

    虚拟机集群节点每个内存都是1G,flink所在节点内存是2G,FlinkSqlWriteToPaimonForBucket_1类是从您那里拷贝过来的,并且本地可以跑通,应该可以排除是代码的问题,大概率是线上环境的问题,但是异常提示很不明确


    2023-12-21 11:42:26
问题已解决,确定采纳
还有疑问,暂不采纳

恭喜解决一个难题,获得1积分~

来为老师/同学的回答评分吧

0 星
请稍等 ...
意见反馈 帮助中心 APP下载
官方微信

在线咨询

领取优惠

免费试听

领取大纲

扫描二维码,添加
你的专属老师