Java代码实现时的问题
在Scala中
.withTimestampAssigner(new SerializableTimestampAssigner
换在java中实现,调用该方法总是出问题,于是我换作了
DataStream<Tuple2<String, Long>> watermarkStream = inputMap.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<Tuple2<String, Long>>() {
Long currentMaxTimestamp = 0L;
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
@Nullable
@Override
public Watermark getCurrentWatermark() {
return new Watermark(currentMaxTimestamp - 10000L);
}
@Override
public long extractTimestamp(Tuple2<String, Long> stringLongTuple2, long l) {
long timeStamp = stringLongTuple2.f1;
currentMaxTimestamp = Math.max(currentMaxTimestamp, timeStamp);
System.out.println("currentMaxTimeStamp: "+sdf.format(currentMaxTimestamp));
return timeStamp;
}
});
请问老师如果在java中用SerializableTimestampAssigner的话应该如何操作?
# 具体错误:
好像是泛型引起的
25
收起
正在回答
1回答
我在java中测试了一下,也是同样的问题,代码本身看起来是没有问题的,目前只能定义为flink提供的这个新的api对java代码支持的有问题,估计源码的bug
new AssignerWithPeriodicWatermarks这种方式是之前的老API,这种方式是没问题的。
相似问题
登录后可查看更多问答,登录/注册
恭喜解决一个难题,获得1积分~
来为老师/同学的回答评分吧
0 星