关于 RDD.cache() 持久化是否需要接受返回值的疑问?

关于 RDD.cache() 持久化是否需要接受返回值的疑问?

问题描述:

视频中老师有说到 .cache() 必须要有一个引用接收,如果没有定义 cache() 方法返回的值的话,cache 是无效的,但是我在实际代码测试中发现,好像不需要接收返回值,是可以 rdd.cache() 直接调用的

https://img1.sycdn.imooc.com//climg/6223068a098f4d7f26221424.jpg


老师视频中的意思是必须:val rdd = sc.parallelize(1 to 10).map(...).cache()

我的意思是可以:val rdd = sc.parallelize(1 to 10).map(...)

                           rdd.cache()


实测代码如下

val sparkConf = new SparkConf()
  .setMaster("local")
  .setAppName(this.getClass.getName)
val sc = new SparkContext(sparkConf)

val rdd = sc.parallelize(1 to 5).map(i => {
  println("正在 map i " + i)
  i * 2
})

rdd.cache()

rdd.foreach(i => print(i))
println("cached 上面是第一次打印(map 函数会执行,会打印 正在 map i)==============")
rdd.foreach(i => print(i))
println("cached 上面是第二次打印,rdd 已经被缓存了,map 函数不会被执行==============")

rdd.unpersist()

rdd.foreach(i => print(i))
println("unpersist 上面是第三次,清空了缓存,所以 map 函数又会执行,会打印 正在 map i)==============")

sc.stop()


日志输出

正在 map i 1
正在 map i 2
正在 map i 3
正在 map i 4
正在 map i 5
246810
cached 上面是第一次打印(map 函数会执行,会打印 正在 map i)==============
246810
cached 上面是第二次打印,rdd 已经被缓存了,map 函数不会被执行==============
正在 map i 1
正在 map i 2
正在 map i 3
正在 map i 4
正在 map i 5
246810
unpersist 上面是第三次,清空了缓存,所以 map 函数又会执行,会打印 正在 map i)==============


正在回答 回答被采纳积分+1

登陆购买课程后可参与讨论,去登陆

1回答
徐老师 2022-03-05 17:07:13

确认了一下现在不是必须要接受返回值。

这个问题是在工作中最开始用spark的时候我记得当时在线上遇到过这种问题,比较早了,当时应该是15年还是16年。

现在的cache方法返回的是调用它的那个rdd自身。

cache底层会调用persist,persist最终的源码是这样的,最终返回的是this,可以不用接收返回值也是可以的,当然了,接受返回值也不影响。

private def persist(newLevel: StorageLevel, allowOverride: Boolean): this.type = {
  // TODO: Handle changes of StorageLevel
  if (storageLevel != StorageLevel.NONE && newLevel != storageLevel && !allowOverride) {
    throw new UnsupportedOperationException(
      "Cannot change storage level of an RDD after it was already assigned a level")
  }
  // If this is the first time this RDD is marked for persisting, register it
  // with the SparkContext for cleanups and accounting. Do this only once.
  if (storageLevel == StorageLevel.NONE) {
    sc.cleaner.foreach(_.registerRDDForCleanup(this))
    sc.persistRDD(this)
  }
  storageLevel = newLevel
  this
}


  • 提问者 ischand #1

    是的,我也看到这部分代码了,所以提出了这个疑问,懒,不想接收返回值

    2022-03-05 17:16:10
问题已解决,确定采纳
还有疑问,暂不采纳

恭喜解决一个难题,获得1积分~

来为老师/同学的回答评分吧

0 星
请稍等 ...
意见反馈 帮助中心 APP下载
官方微信

在线咨询

领取优惠

免费试听

领取大纲

扫描二维码,添加
你的专属老师