sparkStreaming+Redis问题
sparkStreaming+kafka+Redis,运行一段时间后【5-10分钟】,读取不到kafka数据了,经测试发现是redis问题,使用了jedis连接池,并是在foreachPartition中创建与关闭的。请问如何在SparkStreaming中使用redis连接池。简易代码如下:
kafkaDStream.foreachRDD { rdd =>
(!rdd.isEmpty()) {
offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
(range <- offsetRanges) {
({range.topic}{range.partition}{range.fromOffset}{range.untilOffset})
}
rdd.map(row => {
(row.key()row.value())
}).foreachPartition(partitionOfRecords => {
{
//xxxx
} {
ex: => ex.printStackTrace()
} {
}
})
kafkaDStream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
}
}8
收起
正在回答 回答被采纳积分+1
1回答
恭喜解决一个难题,获得1积分~
来为老师/同学的回答评分吧
0 星