自定义source代码无法运行
package com.gaojingsi.scala
import org.apache.flink.streaming.api.functions.source.SourceFunction
class CustomIncreasementSource extends SourceFunction[Long] {
var running = false
var number = 1
override def run(sourceContext: SourceFunction.SourceContext[Long]): Unit = {
running = true
while (running) {
sourceContext.collect(number)
number += 1
Thread.sleep(1000)
}
}
override def cancel(): Unit = {
running = false
}
}
package com.gaojingsi.scala
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
object CustomSourceStudyScala {
def main(args: Array[String]): Unit = {
val streamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
import org.apache.flink.api.scala._
val ds = streamExecutionEnvironment.addSource(new CustomIncreasementSource).setParallelism(1)
ds.print().setParallelism(1)
streamExecutionEnvironment.execute("CustomSourceStudyScala")
}
}
以上是我的自定义source代码,运行就报错:
我的依赖如下:
请问可能的原因,嘿嘿,才疏学浅。
10
收起
正在回答
1回答
代码本身没有问题,我用你的代码在我本地测试了,是可以正常运行的,使用的flink1.11.1依赖版本。
可能是idea中scala的版本设置的有问题,需要使用2.12的,确认一下
如果scala版本没有问题的话你尝试使用课程中用的flink依赖版本,使用flink1.11.1版本试一下
相似问题
登录后可查看更多问答,登录/注册
恭喜解决一个难题,获得1积分~
来为老师/同学的回答评分吧
0 星