Flink批处理无法产生结果

Flink批处理无法产生结果

问题描述:

程序运行后,一直处于运行中,无法得出结果。


尝试过的解决方式:

  1. 之前一直是akka超时错误,增加配置后不出现akka超时错误,但是无法出结果。

  2. 增加程序运行配置:-Xms1024m -Xmx1024m 还是无法出结果

相关截图:

https://img1.sycdn.imooc.com//climg/61dc391509caeea318190436.jpg


相关代码:

package com.bigdata.fink.batch.transformation

import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.configuration.Configuration

import scala.collection.mutable.ListBuffer

/**
 * mapPartition算子使用:一次处理一个分区
 */
object BatchMapPartitionScala {
  def main(args: Array[String]): Unit = {

    val env = ExecutionEnvironment.getExecutionEnvironment
    //设置超时时间;akka是flink实现client,jobManager,TaskManager之间的网络通信.
    //单位 毫秒
    env.getJavaEnv.getConfiguration.setInteger("akka.ask.timeout",3000000)
    //单位:毫秒
    env.getJavaEnv.getConfiguration.setInteger("web.timeout",3000000)
//    conf.setInteger("web.timeout",300000)

    import org.apache.flink.api.scala._
    val text = env.fromCollection(Array("hello you","hello me","hello the"))

    text.map(s=>s)
      .setParallelism(2)
      .mapPartition(it=>{
      var list = ListBuffer[String]()
      //可以在此处创建数据库连接,建议把这块代码放到try-catch
      //注意:此时是每个分区获取一个数据库连接,不需要每处理一条数据就获取一次连接,性能较高
      it.foreach(line=>{
        val words = line.split(" ")
        for(word <- words){
          list.append(word)
        }
      })
      list
      //关闭数据库连接
    }).print()
    //注意:针对DataSetAPI,如果在后面调用的是count、collect、print,则最后不需要指定execute即可。
//    env.execute("BatchMapPartitionScala")
  }

}


正在回答 回答被采纳积分+1

登陆购买课程后可参与讨论,去登陆

1回答
徐老师 2022-01-10 22:12:11

这个代码其实没有什么特殊的,可能和你本地电脑的环境有关系,你看一下本地运行离线的wordcount代码是否正常。不需要在代码中添加额外的配置。
还有就是确认一下你的flink版本是否和课程中的一样。

  • 然后再把课程中我提供的代码拿到你本地idea中直接运行一下试试
    2022-01-10 22:13:20
  • 提问者 迪拜trash #2

    wordcount也是不行的。

    由于之前用的scala2.11,所以把POM中2.12统一修改为2.11了,但是刚才把这个修改回去重新跑了一遍还是不行

    2022-01-10 22:21:38
  • 提问者 迪拜trash 回复 徐老师 #3

    还是不行。

    POM中配置:

    org.apache.flinkflink-java1.11.1org.apache.flinkflink-streaming-java_2.121.11.1org.apache.flinkflink-scala_2.121.11.1org.apache.flinkflink-streaming-scala_2.121.11.1org.apache.flinkflink-clients_2.121.11.1org.slf4jslf4j.log4j12


    2022-01-10 22:24:22
问题已解决,确定采纳
还有疑问,暂不采纳

恭喜解决一个难题,获得1积分~

来为老师/同学的回答评分吧

0 星
请稍等 ...
意见反馈 帮助中心 APP下载
官方微信

在线咨询

领取优惠

免费试听

领取大纲

扫描二维码,添加
你的专属老师