sparkStreaming+Redis问题

sparkStreaming+Redis问题

sparkStreaming+kafka+Redis,运行一段时间后【5-10分钟】,读取不到kafka数据了,经测试发现是redis问题,使用了jedis连接池,并是在foreachPartition中创建与关闭的。请问如何在SparkStreaming中使用redis连接池。简易代码如下:

kafkaDStream.foreachRDD { rdd =>
  (!rdd.isEmpty()) {
    offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
    (range <- offsetRanges) {
      ({range.topic}{range.partition}{range.fromOffset}{range.untilOffset})
    }

    rdd.map(row => {
      (row.key()row.value())
    }).foreachPartition(partitionOfRecords => {
      {
        //xxxx      
      } {
        ex: => ex.printStackTrace()
      } {
          }
    })
kafkaDStream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
  }
}


正在回答 回答被采纳积分+1

登陆购买课程后可参与讨论,去登陆

1回答
徐老师 2022-04-15 23:06:44

如果是在foreachpartition中使用连接,常见的是使用单连接,每个分区的数据获取一次连接,用完后关闭,这种比较稳定。

当然连接池也可以使用,只不过在foreachpartition中使用意义不大,在课程最后一周(第23周数据中台)里面的第7-9小节里面有讲到sparkstreaming+kafka+redis的使用,也用到了redis的静态连接池,可以参考一下。

  • 提问者 china7610 #1

    多谢,多谢!
    如果在foreachpartition 同时使用ES也是这个道理吧,也不需要创建ES连接池对吗?

    2022-04-15 23:35:11
  • 是的,使用连接池如果控制不好很容易出问题,针对这种一次操作一个分区的操作,每次获取连接也是没问题的。
    2022-04-15 23:37:24
  • 提问者 china7610 回复 徐老师 #3

    多谢,多谢!

    2022-04-16 08:12:44
问题已解决,确定采纳
还有疑问,暂不采纳

恭喜解决一个难题,获得1积分~

来为老师/同学的回答评分吧

0 星
请稍等 ...
意见反馈 帮助中心 APP下载
官方微信

在线咨询

领取优惠

免费试听

领取大纲

扫描二维码,添加
你的专属老师