任务提交后,刚开始显示正常运行,但是时不时运行中的任务会报错又重连再报错
老师,提交任务成功后,在hdfs中可以成功创建出表的schema文件,但是运行的任务会间断性时不时的出现如下错误,向kafka中生产数据,也没有在hdfs上生成数据文件,网上找了下相关原因,但是资料比较少,三个异常都去找了下,也不清楚到底是哪里的问题
2023-12-20 21:29:14
java.lang.RuntimeException: One or more fetchers have encountered exception
at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager.checkErrors(SplitFetcherManager.java:261)
at org.apache.flink.connector.base.source.reader.SourceReaderBase.getNextFetch(SourceReaderBase.java:169)
at org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:131)
at org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:417)
at org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:550)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:839)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:788)
at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:931)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.RuntimeException: SplitFetcher thread 0 received unexpected exception while polling the records
at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:165)
at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:114)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
... 1 more
Caused by: org.apache.kafka.common.errors.TimeoutException: Timeout of 60000ms expired before the position for partition paimon_word-1 could be determined
任务代码:
object FlinkSqlWriteToPaimonForBucket_1 { def main(args: Array[String]): Unit = { //创建执行环境 val env = StreamExecutionEnvironment.getExecutionEnvironment env.setRuntimeMode(RuntimeExecutionMode.STREAMING) //设置全局并行度为5,因为数据源topic的分区和结果表bucket都是5(一一对应可以使得效率最高) env.setParallelism(5) //注意:在流处理模式中,操作Paimon表时需要开启Checkpoint。 env.enableCheckpointing(5000) //获取Checkpoint的配置对象 val cpConfig = env.getCheckpointConfig //在任务故障和手工停止任务时都会保留之前生成的Checkpoint数据 cpConfig.setExternalizedCheckpointCleanup(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION) //设置Checkpoint后的状态数据的存储位置 cpConfig.setCheckpointStorage("hdfs://bigdata01:9000/flink-chk/word_filter") val tEnv = StreamTableEnvironment.create(env) //创建数据源表-普通表 //注意:此时这个表示在Flink SQL中默认的Catalog里面创建的 tEnv.executeSql( """ |CREATE TABLE word_source( | id BIGINT, | word STRING |)WITH( | 'connector' = 'kafka', | 'topic' = 'paimon_word', | 'properties.bootstrap.servers' = 'bigdata01:9092,bigdata02:9092,bigdata03:9092', | 'properties.group.id' = 'gid-paimon-1', | 'scan.startup.mode' = 'group-offsets', | 'properties.auto.offset.reset' = 'latest', | 'format' = 'json', | 'json.fail-on-missing-field' = 'false', | 'json.ignore-parse-errors' = 'true' |) |""".stripMargin) //创建Paimon类型的Catalog tEnv.executeSql( """ |CREATE CATALOG paimon_catalog WITH( | 'type'='paimon', | 'warehouse'='hdfs://bigdata01:9000/paimon' |) |""".stripMargin) tEnv.executeSql("USE CATALOG paimon_catalog") //创建目的地表-Paimon表 tEnv.executeSql( """ |CREATE TABLE IF NOT EXISTS word_filter( | id BIGINT, | word STRING, | dt STRING, | PRIMARY KEY (id, dt) NOT ENFORCED |)PARTITIONED BY (dt) WITH ( | 'bucket' = '5' |) |""".stripMargin) //向目的地表中写入数据 tEnv.executeSql( """ |INSERT INTO `paimon_catalog`.`default`.`word_filter` |SELECT | id, | word, | '20230101' AS dt |FROM `default_catalog`.`default_database`.`word_source` |-- 此处不能使用!=,需要使用<> |WHERE word <> 'hello11' |""".stripMargin) } }
正在回答 回答被采纳积分+1
- 参与学习 1151 人
- 提交作业 5960 份
- 解答问题 1144 个
不用Java初级内容充数!不用与大数据岗位无关内容占课时!我们做的就是“精华版”大数据课程
了解课程
恭喜解决一个难题,获得1积分~
来为老师/同学的回答评分吧
0 星