Java代码实现时的问题
在Scala中
.withTimestampAssigner(new SerializableTimestampAssigner
换在java中实现,调用该方法总是出问题,于是我换作了
1 | DataStream<Tuple2<String, Long>> watermarkStream = inputMap.assignTimestampsAndWatermarks( new AssignerWithPeriodicWatermarks<Tuple2<String, Long>>() {<br> Long currentMaxTimestamp = 0L;<br> SimpleDateFormat sdf = new SimpleDateFormat( "yyyy-MM-dd HH:mm:ss" );<br> @Nullable <br> @Override <br> public Watermark getCurrentWatermark() {<br> return new Watermark(currentMaxTimestamp - 10000L);<br> }<br><br> @Override <br> public long extractTimestamp(Tuple2<String, Long> stringLongTuple2, long l) {<br> long timeStamp = stringLongTuple2.f1;<br> currentMaxTimestamp = Math.max(currentMaxTimestamp, timeStamp);<br> System.out.println( "currentMaxTimeStamp: " +sdf.format(currentMaxTimestamp));<br> return timeStamp;<br> }<br>});<br> |
请问老师如果在java中用SerializableTimestampAssigner的话应该如何操作?
# 具体错误:
好像是泛型引起的
25
收起
正在回答
1回答
我在java中测试了一下,也是同样的问题,代码本身看起来是没有问题的,目前只能定义为flink提供的这个新的api对java代码支持的有问题,估计源码的bug
new AssignerWithPeriodicWatermarks这种方式是之前的老API,这种方式是没问题的。
相似问题
登录后可查看更多问答,登录/注册
恭喜解决一个难题,获得1积分~
来为老师/同学的回答评分吧