关于 RDD.cache() 持久化是否需要接受返回值的疑问?
问题描述:
视频中老师有说到 .cache() 必须要有一个引用接收,如果没有定义 cache() 方法返回的值的话,cache 是无效的,但是我在实际代码测试中发现,好像不需要接收返回值,是可以 rdd.cache() 直接调用的

老师视频中的意思是必须:val rdd = sc.parallelize(1 to 10).map(...).cache()
我的意思是可以:val rdd = sc.parallelize(1 to 10).map(...)
rdd.cache()
实测代码如下
val sparkConf = new SparkConf()
.setMaster("local")
.setAppName(this.getClass.getName)
val sc = new SparkContext(sparkConf)
val rdd = sc.parallelize(1 to 5).map(i => {
println("正在 map i " + i)
i * 2
})
rdd.cache()
rdd.foreach(i => print(i))
println("cached 上面是第一次打印(map 函数会执行,会打印 正在 map i)==============")
rdd.foreach(i => print(i))
println("cached 上面是第二次打印,rdd 已经被缓存了,map 函数不会被执行==============")
rdd.unpersist()
rdd.foreach(i => print(i))
println("unpersist 上面是第三次,清空了缓存,所以 map 函数又会执行,会打印 正在 map i)==============")
sc.stop()日志输出
正在 map i 1 正在 map i 2 正在 map i 3 正在 map i 4 正在 map i 5 246810 cached 上面是第一次打印(map 函数会执行,会打印 正在 map i)============== 246810 cached 上面是第二次打印,rdd 已经被缓存了,map 函数不会被执行============== 正在 map i 1 正在 map i 2 正在 map i 3 正在 map i 4 正在 map i 5 246810 unpersist 上面是第三次,清空了缓存,所以 map 函数又会执行,会打印 正在 map i)==============
9
收起
正在回答 回答被采纳积分+1
1回答
徐老师
2022-03-05 17:07:13
确认了一下现在不是必须要接受返回值。
这个问题是在工作中最开始用spark的时候我记得当时在线上遇到过这种问题,比较早了,当时应该是15年还是16年。
现在的cache方法返回的是调用它的那个rdd自身。
cache底层会调用persist,persist最终的源码是这样的,最终返回的是this,可以不用接收返回值也是可以的,当然了,接受返回值也不影响。
private def persist(newLevel: StorageLevel, allowOverride: Boolean): this.type = {
// TODO: Handle changes of StorageLevel
if (storageLevel != StorageLevel.NONE && newLevel != storageLevel && !allowOverride) {
throw new UnsupportedOperationException(
"Cannot change storage level of an RDD after it was already assigned a level")
}
// If this is the first time this RDD is marked for persisting, register it
// with the SparkContext for cleanups and accounting. Do this only once.
if (storageLevel == StorageLevel.NONE) {
sc.cleaner.foreach(_.registerRDDForCleanup(this))
sc.persistRDD(this)
}
storageLevel = newLevel
this
}
恭喜解决一个难题,获得1积分~
来为老师/同学的回答评分吧
0 星