自定义source代码无法运行
1 | package com.gaojingsi.scala<br><br>import org.apache.flink.streaming.api.functions.source.SourceFunction<br><br><br>class CustomIncreasementSource extends SourceFunction[Long] {<br> var running = false<br> var number = 1<br><br> override def run(sourceContext: SourceFunction.SourceContext[Long]): Unit = {<br> running = true<br> while (running) {<br> sourceContext.collect(number)<br> number += 1<br> Thread.sleep(1000)<br> }<br> }<br><br> override def cancel(): Unit = {<br> running = false<br> }<br>}<br> |
1 | package com.gaojingsi.scala<br><br>import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment<br><br><br>object CustomSourceStudyScala {<br><br> def main(args: Array[String]): Unit = {<br> val streamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment<br><br> import org.apache.flink.api.scala._<br> val ds = streamExecutionEnvironment.addSource(new CustomIncreasementSource).setParallelism(1)<br> ds.print().setParallelism(1)<br><br> streamExecutionEnvironment.execute("CustomSourceStudyScala")<br> }<br><br>}<br> |
以上是我的自定义source代码,运行就报错:
我的依赖如下:
请问可能的原因,嘿嘿,才疏学浅。
10
收起
正在回答
1回答
代码本身没有问题,我用你的代码在我本地测试了,是可以正常运行的,使用的flink1.11.1依赖版本。
可能是idea中scala的版本设置的有问题,需要使用2.12的,确认一下
如果scala版本没有问题的话你尝试使用课程中用的flink依赖版本,使用flink1.11.1版本试一下
相似问题
登录后可查看更多问答,登录/注册
恭喜解决一个难题,获得1积分~
来为老师/同学的回答评分吧