Flink批处理无法产生结果
问题描述:
程序运行后,一直处于运行中,无法得出结果。
尝试过的解决方式:
之前一直是akka超时错误,增加配置后不出现akka超时错误,但是无法出结果。
增加程序运行配置:-Xms1024m -Xmx1024m 还是无法出结果
相关截图:

相关代码:
package com.bigdata.fink.batch.transformation
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.configuration.Configuration
import scala.collection.mutable.ListBuffer
/**
* mapPartition算子使用:一次处理一个分区
*/
object BatchMapPartitionScala {
def main(args: Array[String]): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
//设置超时时间;akka是flink实现client,jobManager,TaskManager之间的网络通信.
//单位 毫秒
env.getJavaEnv.getConfiguration.setInteger("akka.ask.timeout",3000000)
//单位:毫秒
env.getJavaEnv.getConfiguration.setInteger("web.timeout",3000000)
// conf.setInteger("web.timeout",300000)
import org.apache.flink.api.scala._
val text = env.fromCollection(Array("hello you","hello me","hello the"))
text.map(s=>s)
.setParallelism(2)
.mapPartition(it=>{
var list = ListBuffer[String]()
//可以在此处创建数据库连接,建议把这块代码放到try-catch
//注意:此时是每个分区获取一个数据库连接,不需要每处理一条数据就获取一次连接,性能较高
it.foreach(line=>{
val words = line.split(" ")
for(word <- words){
list.append(word)
}
})
list
//关闭数据库连接
}).print()
//注意:针对DataSetAPI,如果在后面调用的是count、collect、print,则最后不需要指定execute即可。
// env.execute("BatchMapPartitionScala")
}
}9
收起
正在回答 回答被采纳积分+1
1回答
恭喜解决一个难题,获得1积分~
来为老师/同学的回答评分吧
0 星