副标题[/!--empirenews.page--]
很多用Spark Streaming 的朋友应该使用过broadcast,大多数情况下广播变量都是以单例模式声明的有没有粉丝想过为什么?浪尖在这里帮大家分析一下,有以下几个原因:
- 广播变量大多数情况下是不会变更的,使用单例模式可以减少spark streaming每次job生成执行,重复生成广播变量带来的开销。
- 单例模式也要做同步。这个对于很多新手来说可以不用考虑同步问题,原因很简单因为新手不会调整spark 程序task的调度模式,而默认采用FIFO的调度模式,基本不会产生并发问题。1).假如你配置了Fair调度模式,同时修改了Spark Streaming运行的并行执行的job数,默认为1,那么就要加上同步代码了。2).还有一个原因,在多输出流的情况下共享broadcast,同时配置了Fair调度模式,也会产生并发问题。
- 注意。有些时候比如广播配置文件,规则等需要变更broadcast,在使用fair的时候可以在foreachrdd里面使用局部变量作为广播,避免相互干扰。
先看例子,后面逐步揭晓内部机制。
1.例子
下面是一个双重检查式的broadcast变量的声明方式。
- object WordBlacklist {
-
- @volatile private var instance: Broadcast[Seq[String]] = null
-
- def getInstance(sc: SparkContext): Broadcast[Seq[String]] = {
- if (instance == null) {
- synchronized {
- if (instance == null) {
- val wordBlacklist = Seq("a", "b", "c")
- instance = sc.broadcast(wordBlacklist)
- }
- }
- }
- instance
- }
- }
广播变量的使用方法如下:
- val lines = ssc.socketTextStream(ip, port)
- val words = lines.flatMap(_.split(" "))
- val wordCounts = words.map((_, 1)).reduceByKey(_ + _)
- wordCounts.foreachRDD { (rdd: RDD[(String, Int)], time: Time) =>
- // Get or register the blacklist Broadcast
- val blacklist = WordBlacklist.getInstance(rdd.sparkContext)
- // Get or register the droppedWordsCounter Accumulator
- val droppedWordsCounter = DroppedWordsCounter.getInstance(rdd.sparkContext)
- // Use blacklist to drop words and use droppedWordsCounter to count them
- val counts = rdd.filter { case (word, count) =>
- if (blacklist.value.contains(word)) {
- droppedWordsCounter.add(count)
- false
- } else {
- true
- }
- }.collect().mkString("[", ", ", "]")
- val output = s"Counts at time $time $counts"
- println(output)
- println(s"Dropped ${droppedWordsCounter.value} word(s) totally")
- println(s"Appending to ${outputFile.getAbsolutePath}")
- Files.append(output + "n", outputFile, Charset.defaultCharset())
- }
2.概念补充

(编辑:晋中站长网)
【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!
|