sparkStreaming+Redis问题
sparkStreaming+kafka+Redis,运行一段时间后【5-10分钟】,读取不到kafka数据了,经测试发现是redis问题,使用了jedis连接池,并是在foreachPartition中创建与关闭的。请问如何在SparkStreaming中使用redis连接池。简易代码如下:
kafkaDStream.foreachRDD { rdd => (!rdd.isEmpty()) { offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges (range <- offsetRanges) { ({range.topic}{range.partition}{range.fromOffset}{range.untilOffset}) } rdd.map(row => { (row.key()row.value()) }).foreachPartition(partitionOfRecords => { { //xxxx } { ex: => ex.printStackTrace() } { } }) kafkaDStream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges) } }
8
收起
正在回答 回答被采纳积分+1
1回答
大数据工程师 2024 版
- 参与学习 1151 人
- 提交作业 5960 份
- 解答问题 1144 个
不用Java初级内容充数!不用与大数据岗位无关内容占课时!我们做的就是“精华版”大数据课程
了解课程
恭喜解决一个难题,获得1积分~
来为老师/同学的回答评分吧
0 星