自定义source代码无法运行

自定义source代码无法运行

package com.gaojingsi.scala

import org.apache.flink.streaming.api.functions.source.SourceFunction


class CustomIncreasementSource extends SourceFunction[Long] {
var running = false
var number = 1

override def run(sourceContext: SourceFunction.SourceContext[Long]): Unit = {
running = true
while (running) {
sourceContext.collect(number)
number += 1
Thread.sleep(1000)
}
}

override def cancel(): Unit = {
running = false
}
}
package com.gaojingsi.scala

import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment


object CustomSourceStudyScala {

def main(args: Array[String]): Unit = {
val streamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

import org.apache.flink.api.scala._
val ds = streamExecutionEnvironment.addSource(new CustomIncreasementSource).setParallelism(1)
ds.print().setParallelism(1)

streamExecutionEnvironment.execute("CustomSourceStudyScala")
}

}

以上是我的自定义source代码,运行就报错:

http://img1.sycdn.imooc.com//climg/6098e7420924045507070331.jpg

我的依赖如下:

http://img1.sycdn.imooc.com//climg/6098e76809b6e32807200560.jpg

请问可能的原因,嘿嘿,才疏学浅。

正在回答

登陆购买课程后可参与讨论,去登陆

1回答

代码本身没有问题,我用你的代码在我本地测试了,是可以正常运行的,使用的flink1.11.1依赖版本。

可能是idea中scala的版本设置的有问题,需要使用2.12的,确认一下

如果scala版本没有问题的话你尝试使用课程中用的flink依赖版本,使用flink1.11.1版本试一下

问题已解决,确定采纳
还有疑问,暂不采纳

恭喜解决一个难题,获得1积分~

来为老师/同学的回答评分吧

0 星
请稍等 ...
意见反馈 帮助中心 APP下载
官方微信

在线咨询

领取优惠

免费试听

领取大纲

扫描二维码,添加
你的专属老师