5ep9lzv 发表于 2024-10-10 14:34:42

新手福利:Apache Spark入门攻略


    <p style="font-size: 16px; color: black; line-height: 40px; text-align: left; margin-bottom: 15px;">【编者按】时<span style="color: black;">迄今</span>日,Spark已<span style="color: black;">作为</span>大数据<span style="color: black;">行业</span>最火的一个开源项目,具备高性能、易于<span style="color: black;">运用</span>等特性。然而<span style="color: black;">做为</span>一个<span style="color: black;">青年</span>的开源项目,其<span style="color: black;">运用</span>上存在的挑战<span style="color: black;">也</span>不可为不大,<span style="color: black;">这儿</span>为<span style="color: black;">大众</span>分享SciSpike软件架构师Ashwini Kuntamukkala在Dzone上进行的Spark入门总结(虽然有些<span style="color: black;">地区</span>基于的是Spark 1.0版本,但仍然值得阅读)——Apache Spark:An Engine for Large-Scale Data Processing,由OneAPM工程师翻译。</p>
    <p style="font-size: 16px; color: black; line-height: 40px; text-align: left; margin-bottom: 15px;">本文聚焦Apache Spark入门,<span style="color: black;">认识</span>其在大数据<span style="color: black;">行业</span>的地位,覆盖Apache Spark的安装及应用程序的<span style="color: black;">创立</span>,并解释<span style="color: black;">有些</span><span style="color: black;">平常</span>的<span style="color: black;">行径</span>和操作。</p>
    <p style="font-size: 16px; color: black; line-height: 40px; text-align: left; margin-bottom: 15px;"><span style="color: black;">1、</span> <span style="color: black;">为何</span>要<span style="color: black;">运用</span>Apache Spark</p>
    <p style="font-size: 16px; color: black; line-height: 40px; text-align: left; margin-bottom: 15px;">时下,<span style="color: black;">咱们</span>正处在一个“大数据”的时代,每时每刻,都有<span style="color: black;">各样</span>类型的数据被生产。而<span style="color: black;">这里</span>紫外,数据增幅的速度<span style="color: black;">亦</span>在<span style="color: black;">明显</span><span style="color: black;">增多</span>。从广义上看,这些数据<span style="color: black;">包括</span>交易数据、社交媒<span style="color: black;">身体</span>容(<span style="color: black;">例如</span>文本、图像和视频)以及传感器数据。<span style="color: black;">那样</span>,<span style="color: black;">为何</span>要在这些内容上投入如此多精力,其<span style="color: black;">原由</span>无非<span style="color: black;">便是</span>从海量数据中提取洞见<span style="color: black;">能够</span>对生活和生产实践进行很好的<span style="color: black;">指点</span>。</p>
    <p style="font-size: 16px; color: black; line-height: 40px; text-align: left; margin-bottom: 15px;">在几年前,<span style="color: black;">仅有</span>少部分<span style="color: black;">机构</span><span style="color: black;">持有</span>足够的技术力量和资金去储存和挖掘<span style="color: black;">海量</span>数据,并对其挖掘从而<span style="color: black;">得到</span>洞见。然而,被雅虎2009年开源的Apache Hadoop对这一<span style="color: black;">情况</span>产生了颠覆性的冲击——<span style="color: black;">经过</span><span style="color: black;">运用</span>商用服务器<span style="color: black;">构成</span>的集群大幅度地降低了海量数据处理的门槛。<span style="color: black;">因此呢</span>,许多行业(<span style="color: black;">例如</span>Health care、Infrastructure、Finance、Insurance、Telematics、Consumer、Retail、Marketing、E-commerce、Media、 Manufacturing和Entertainment)<span style="color: black;">起始</span>了Hadoop的征程,走上了海量数据提取价值的道路。着眼Hadoop,其<span style="color: black;">重点</span><span style="color: black;">供给</span>了两个方面的功能:</p><span style="color: black;">经过</span>水平扩展商用主机,HDFS<span style="color: black;">供给</span>了一个<span style="color: black;">低价</span>的方式对海量数据进行容错存储。MapReduce计算范例,<span style="color: black;">供给</span>了一个简单的编程模型来挖掘数据并<span style="color: black;">得到</span>洞见。<p style="font-size: 16px; color: black; line-height: 40px; text-align: left; margin-bottom: 15px;">下图展示了MapReduce的数据处理流程,其中一个Map-Reduce step的输出将<span style="color: black;">做为</span>下一个典型Hadoop job的输入结果。</p><img src="https://p3-sign.toutiaoimg.com/6209/1500182428~noop.image?_iz=58558&amp;from=article.pc_detail&amp;lk3s=953192f4&amp;x-expires=1728803992&amp;x-signature=japwkYgthNuReentzA2J619v5dc%3D" style="width: 50%; margin-bottom: 20px;">
    <p style="font-size: 16px; color: black; line-height: 40px; text-align: left; margin-bottom: 15px;">在<span style="color: black;">全部</span>过程中,中间结果会借助磁盘传递,<span style="color: black;">因此呢</span>对比计算,<span style="color: black;">海量</span>的Map-Reduced作业都受限于IO。然而<span style="color: black;">针对</span>ETL、数据整合和清理<span style="color: black;">这般</span>的用例<span style="color: black;">来讲</span>,IO约束并不会产生很大的影响,<span style="color: black;">由于</span>这些场景对数据处理时间<span style="color: black;">常常</span>不会有较高的<span style="color: black;">需要</span>。然而,在现实世界中,<span style="color: black;">一样</span>存在许多对延时<span style="color: black;">需求</span>较为苛刻的用例,<span style="color: black;">例如</span>:</p>对流数据进行处理来做近实时分析。举个例子,<span style="color: black;">经过</span>分析点击流数据做视频<span style="color: black;">举荐</span>,从而<span style="color: black;">加强</span>用户的参与度。在这个用例中,<span style="color: black;">研发</span>者必须在精度和延时之间做平衡。在大型数据集上进行交互式分析,数据<span style="color: black;">专家</span><span style="color: black;">能够</span>在数据集上做ad-hoc<span style="color: black;">查找</span>。<img src="https://p3-sign.toutiaoimg.com/6207/2188934884~noop.image?_iz=58558&amp;from=article.pc_detail&amp;lk3s=953192f4&amp;x-expires=1728803992&amp;x-signature=LBHaO7RPDsHW8gnD2oqU%2FlDg%2BZ8%3D" style="width: 50%; margin-bottom: 20px;">
    <p style="font-size: 16px; color: black; line-height: 40px; text-align: left; margin-bottom: 15px;">毫无疑问,历经数年发展,Hadoop生态圈中的丰富工具已深受用户<span style="color: black;">喜欢</span>,然而<span style="color: black;">这儿</span>仍然存在众多问题给<span style="color: black;">运用</span>带来了挑战:</p>
    <p style="font-size: 16px; color: black; line-height: 40px; text-align: left; margin-bottom: 15px;">1.<span style="color: black;">每一个</span>用例都需要多个<span style="color: black;">区别</span>的技术堆栈来支撑,在<span style="color: black;">区别</span><span style="color: black;">运用</span>场景下,<span style="color: black;">海量</span>的<span style="color: black;">处理</span><span style="color: black;">方法</span><span style="color: black;">常常</span>捉襟见肘。</p>
    <p style="font-size: 16px; color: black; line-height: 40px; text-align: left; margin-bottom: 15px;">2.在生产环境中<span style="color: black;">公司</span><span style="color: black;">常常</span>需要精通数门技术。</p>
    <p style="font-size: 16px; color: black; line-height: 40px; text-align: left; margin-bottom: 15px;">3.许多技术存在版本兼容性问题。</p>
    <p style="font-size: 16px; color: black; line-height: 40px; text-align: left; margin-bottom: 15px;">4.<span style="color: black;">没法</span>在并行job中更快地共享数据。</p>
    <p style="font-size: 16px; color: black; line-height: 40px; text-align: left; margin-bottom: 15px;">而<span style="color: black;">经过</span>Apache Spark,<span style="color: black;">以上</span>问题迎刃而解!Apache Spark是一个轻量级的内存集群计算平台,<span style="color: black;">经过</span><span style="color: black;">区别</span>的组件来支撑批、流和交互式用例,如下图。</p><img src="https://p3-sign.toutiaoimg.com/6205/8225807729~noop.image?_iz=58558&amp;from=article.pc_detail&amp;lk3s=953192f4&amp;x-expires=1728803992&amp;x-signature=PTexmpMRVuSs4TOZvO7TkikLi50%3D" style="width: 50%; margin-bottom: 20px;">
    <p style="font-size: 16px; color: black; line-height: 40px; text-align: left; margin-bottom: 15px;"><span style="color: black;">2、</span> 关于Apache Spark</p>
    <p style="font-size: 16px; color: black; line-height: 40px; text-align: left; margin-bottom: 15px;">Apache Spark是个开源和兼容Hadoop的集群计算平台。由加州大学伯克利分校的AMPLabs<span style="color: black;">研发</span>,<span style="color: black;">做为</span>Berkeley Data Analytics Stack(BDAS)的一部分,当下由大数据<span style="color: black;">机构</span>Databricks保驾护航,<span style="color: black;">更加是</span>Apache旗下的顶级项目,下图<span style="color: black;">表示</span>了Apache Spark堆栈中的<span style="color: black;">区别</span>组件。</p><img src="https://p3-sign.toutiaoimg.com/6209/1500442322~noop.image?_iz=58558&amp;from=article.pc_detail&amp;lk3s=953192f4&amp;x-expires=1728803992&amp;x-signature=TKowtAVNW8I%2BG%2Fbh4HdQ05I28JY%3D" style="width: 50%; margin-bottom: 20px;">
    <p style="font-size: 16px; color: black; line-height: 40px; text-align: left; margin-bottom: 15px;">Apache Spark的5大<span style="color: black;">优良</span>:</p>
    <p style="font-size: 16px; color: black; line-height: 40px; text-align: left; margin-bottom: 15px;">1.更高的性能,<span style="color: black;">由于</span>数据被加载到集群主机的分布式内存中。数据可以被快速的转换迭代,并缓存用以后续的频繁<span style="color: black;">拜访</span><span style="color: black;">需要</span>。<span style="color: black;">非常多</span>对Spark感兴趣的<span style="color: black;">伴侣</span>可能<span style="color: black;">亦</span>会听过<span style="color: black;">这般</span>一句话——在数据<span style="color: black;">所有</span>加载到内存的<span style="color: black;">状况</span>下,Spark<span style="color: black;">能够</span>比Hadoop快100倍,在内存<span style="color: black;">不足</span>存放所有数据的<span style="color: black;">状况</span>下快Hadoop 10倍。</p><img src="https://p3-sign.toutiaoimg.com/6207/2189117384~noop.image?_iz=58558&amp;from=article.pc_detail&amp;lk3s=953192f4&amp;x-expires=1728803992&amp;x-signature=PIRpwmOWSJtV0Skz7mi9Jlaujy0%3D" style="width: 50%; margin-bottom: 20px;">
    <p style="font-size: 16px; color: black; line-height: 40px; text-align: left; margin-bottom: 15px;">2.<span style="color: black;">经过</span><span style="color: black;">创立</span>在Java、Scala、Python、SQL(应对交互式<span style="color: black;">查找</span>)的标准API以方便各行各业<span style="color: black;">运用</span>,<span style="color: black;">同期</span>还含有<span style="color: black;">海量</span>开箱即用的<span style="color: black;">设备</span>学习库。</p>
    <p style="font-size: 16px; color: black; line-height: 40px; text-align: left; margin-bottom: 15px;">3.与现有Hadoop v1 (SIMR) 和2.x (YARN) 生态兼容,<span style="color: black;">因此呢</span><span style="color: black;">公司</span><span style="color: black;">能够</span>进行无缝迁移。</p><img src="https://p3-sign.toutiaoimg.com/6210/1468360755~noop.image?_iz=58558&amp;from=article.pc_detail&amp;lk3s=953192f4&amp;x-expires=1728803992&amp;x-signature=uzTP4wZ8CLLVW1txrIm5v61s3Cg%3D" style="width: 50%; margin-bottom: 20px;">
    <p style="font-size: 16px; color: black; line-height: 40px; text-align: left; margin-bottom: 15px;">4.方便下载和安装。方便的shell(REPL: Read-Eval-Print-Loop)<span style="color: black;">能够</span>对API进行交互式的学习。</p>
    <p style="font-size: 16px; color: black; line-height: 40px; text-align: left; margin-bottom: 15px;">5.借助高等级的架构<span style="color: black;">加强</span>生产力,从而<span style="color: black;">能够</span>讲精力放到计算上。</p>
    <p style="font-size: 16px; color: black; line-height: 40px; text-align: left; margin-bottom: 15px;"><span style="color: black;">同期</span>,Apache Spark由Scala实现,代码非常简洁。</p>
    <p style="font-size: 16px; color: black; line-height: 40px; text-align: left; margin-bottom: 15px;"><span style="color: black;">3、</span>安装Apache Spark</p>
    <p style="font-size: 16px; color: black; line-height: 40px; text-align: left; margin-bottom: 15px;">下表列出了<span style="color: black;">有些</span>重要链接和先决<span style="color: black;">要求</span>:</p>
    <p style="font-size: 16px; color: black; line-height: 40px; text-align: left; margin-bottom: 15px;">如图6所示,Apache Spark的<span style="color: black;">安排</span>方式<span style="color: black;">包含</span>standalone、Hadoop V1 SIMR、Hadoop 2 YARN/Mesos。Apache Spark<span style="color: black;">需要</span><span style="color: black;">必定</span>的Java、Scala或Python知识。<span style="color: black;">这儿</span>,<span style="color: black;">咱们</span>将专注standalone配置下的安装和运行。</p>
    <p style="font-size: 16px; color: black; line-height: 40px; text-align: left; margin-bottom: 15px;">1.安装JDK 1.6+、Scala 2.10+、Python 和sbt</p>
    <p style="font-size: 16px; color: black; line-height: 40px; text-align: left; margin-bottom: 15px;">2.下载Apache Spark 1.0.1 Release</p>
    <p style="font-size: 16px; color: black; line-height: 40px; text-align: left; margin-bottom: 15px;">3.在指定目录下Untar和Unzip spark-1.0.1.tgz </p>akuntamukkala@localhost~/Downloads$ pwd
    /Users/akuntamukkala/Downloads akuntamukkala@localhost~/Downloads$ tar -zxvf spark- 1.0.1.tgz -C /Users/akuntamukkala/spark<p style="font-size: 16px; color: black; line-height: 40px; text-align: left; margin-bottom: 15px;">4.运行sbt<span style="color: black;">创立</span>Apache Spark</p>akuntamukkala@localhost~/spark/spark-1.0.1$ pwd /Users/akuntamukkala/spark/spark-1.0.1 akuntamukkala@localhost~/spark/spark-1.0.1$ sbt/sbt assembly<p style="font-size: 16px; color: black; line-height: 40px; text-align: left; margin-bottom: 15px;">5.发布Scala的Apache Spark standalone REPL</p>/Users/akuntamukkala/spark/spark-1.0.1/bin/spark-shell<p style="font-size: 16px; color: black; line-height: 40px; text-align: left; margin-bottom: 15px;"><span style="color: black;">倘若</span>是Python</p>/Users/akuntamukkala/spark/spark-1.0.1/bin/ pyspark<p style="font-size: 16px; color: black; line-height: 40px; text-align: left; margin-bottom: 15px;"><span style="color: black;">4、</span>Apache Spark的工作模式</p>
    <p style="font-size: 16px; color: black; line-height: 40px; text-align: left; margin-bottom: 15px;">Spark引擎<span style="color: black;">供给</span>了在集群中所有主机上进行分布式内存数据处理的能力,下图<span style="color: black;">表示</span>了一个典型Spark job的处理流程。</p><img src="https://p26-sign.toutiaoimg.com/6205/8226280776~noop.image?_iz=58558&amp;from=article.pc_detail&amp;lk3s=953192f4&amp;x-expires=1728803992&amp;x-signature=e6ygYWu9oeWGg77mRxsJ2qbFpP8%3D" style="width: 50%; margin-bottom: 20px;">
    <p style="font-size: 16px; color: black; line-height: 40px; text-align: left; margin-bottom: 15px;">下图<span style="color: black;">表示</span>了Apache Spark<span style="color: black;">怎样</span>在集群中执行一个作业。</p><img src="https://p3-sign.toutiaoimg.com/6204/8330476471~noop.image?_iz=58558&amp;from=article.pc_detail&amp;lk3s=953192f4&amp;x-expires=1728803992&amp;x-signature=Tszdoybhx49gdxE8eLAIpGoiRXw%3D" style="width: 50%; margin-bottom: 20px;">
    <p style="font-size: 16px; color: black; line-height: 40px; text-align: left; margin-bottom: 15px;">Master<span style="color: black;">掌控</span>数据<span style="color: black;">怎样</span>被分割,利用了数据本地性,并在Slaves上跟踪所有分布式计算。在某个Slave不可用时,其存储的数据会分配给其他可用的Slaves。虽然当下(1.0.1版本)Master还存在单点故障,但后期必然会被修复。</p>
    <p style="font-size: 16px; color: black; line-height: 40px; text-align: left; margin-bottom: 15px;"><span style="color: black;">5、</span>弹性分布式数据集(Resilient Distributed Dataset,RDD)</p>
    <p style="font-size: 16px; color: black; line-height: 40px; text-align: left; margin-bottom: 15px;">弹性分布式数据集(RDD,从Spark 1.3版本<span style="color: black;">起始</span>已被DataFrame替代)是Apache Spark的核心理念。它<span style="color: black;">是由于</span>数据<span style="color: black;">构成</span>的不可变分布式集合,其<span style="color: black;">重点</span>进行两个操作:transformation和action。Transformation是类似在RDD上做 filter、map或union 以生成另一个RDD的操作,而action则是count、first、take(n)、collect 等促发一个计算并返回值到Master<span style="color: black;">或</span>稳定存储系统的操作。Transformations<span style="color: black;">通常</span>都是lazy的,直到action执行后才会被执行。Spark Master/Driver会<span style="color: black;">保留</span>RDD上的Transformations。<span style="color: black;">这般</span>一来,<span style="color: black;">倘若</span>某个RDD丢失(<span style="color: black;">亦</span><span style="color: black;">便是</span>salves宕掉),它<span style="color: black;">能够</span>快速和<span style="color: black;">方便</span>地转换到集群中存活的主机上。这<span style="color: black;">亦</span><span style="color: black;">便是</span>RDD的弹性所在。</p>
    <p style="font-size: 16px; color: black; line-height: 40px; text-align: left; margin-bottom: 15px;">下图展示了Transformation的lazy:</p><img src="https://p3-sign.toutiaoimg.com/6206/6639556052~noop.image?_iz=58558&amp;from=article.pc_detail&amp;lk3s=953192f4&amp;x-expires=1728803992&amp;x-signature=2isfhf7%2F8WKfzD%2FRtweGvYAqP78%3D" style="width: 50%; margin-bottom: 20px;">
    <p style="font-size: 16px; color: black; line-height: 40px; text-align: left; margin-bottom: 15px;"><span style="color: black;">咱们</span><span style="color: black;">能够</span><span style="color: black;">经过</span>下面示例来理解这个概念:从文本中<span style="color: black;">发掘</span>5个最常用的word。下图<span style="color: black;">表示</span>了一个可能的<span style="color: black;">处理</span><span style="color: black;">方法</span>。</p>
    <p style="font-size: 16px; color: black; line-height: 40px; text-align: left; margin-bottom: 15px;">在上面命令中,<span style="color: black;">咱们</span>对文本进行读取并且<span style="color: black;">创立</span>字符串的RDD。<span style="color: black;">每一个</span>条目<span style="color: black;">表率</span>了文本中的1行。</p>scala&gt; val hamlet = sc.textFile(“/Users/akuntamukkala/temp/gutenburg.txt”)
    hamlet: org.apache.spark.rdd.RDD = MappedRDD at textFile at &lt;console&gt;:12scala&gt; val topWordCount = hamlet.flatMap(str=&gt;str.split(“ “)). filter(!_.isEmpty).map(word=&gt;(word,1)).reduceByKey(_+_).map{case (word, count) =&gt; (count, word)}.sortByKey(false)
    topWordCount: org.apache.spark.rdd.RDD[(Int, String)] = MapPartitionsRDD at sortByKey at &lt;console&gt;:14<p style="font-size: 16px; color: black; line-height: 40px; text-align: left; margin-bottom: 15px;">1. <span style="color: black;">经过</span><span style="color: black;">以上</span>命令<span style="color: black;">咱们</span><span style="color: black;">能够</span><span style="color: black;">发掘</span>这个操作非常简单——<span style="color: black;">经过</span>简单的Scala API来连接transformations和actions。</p>
    <p style="font-size: 16px; color: black; line-height: 40px; text-align: left; margin-bottom: 15px;">2. 可能存在某些words被1个以上空格分隔的<span style="color: black;">状况</span>,<span style="color: black;">引起</span>有些words是空字符串,<span style="color: black;">因此呢</span>需要<span style="color: black;">运用</span>filter(!_.isEmpty)将它们过滤掉。</p>
    <p style="font-size: 16px; color: black; line-height: 40px; text-align: left; margin-bottom: 15px;">3.<span style="color: black;">每一个</span>word都被映射成一个键值对:map(word=&gt;(word,1))。</p>
    <p style="font-size: 16px; color: black; line-height: 40px; text-align: left; margin-bottom: 15px;">4.为了合计所有计数,<span style="color: black;">这儿</span>需要调用一个reduce<span style="color: black;">过程</span>——reduceByKey(_+_)。 _+_ <span style="color: black;">能够</span>非常<span style="color: black;">方便</span>地为<span style="color: black;">每一个</span>key赋值。</p>
    <p style="font-size: 16px; color: black; line-height: 40px; text-align: left; margin-bottom: 15px;">5. <span style="color: black;">咱们</span>得到了words以及各自的counts,下一步需要做的是<span style="color: black;">按照</span>counts排序。在Apache Spark,用户只能<span style="color: black;">按照</span>key排序,而不是值。<span style="color: black;">因此呢</span>,<span style="color: black;">这儿</span>需要<span style="color: black;">运用</span>map{case (word, count) =&gt; (count, word)}将(word, count)流转到(count, word)。</p>
    <p style="font-size: 16px; color: black; line-height: 40px; text-align: left; margin-bottom: 15px;">6. 需要计算最常用的5个words,<span style="color: black;">因此呢</span>需要<span style="color: black;">运用</span>sortByKey(false)做一个计数的递减排序。</p>
    <p style="font-size: 16px; color: black; line-height: 40px; text-align: left; margin-bottom: 15px;"><span style="color: black;">以上</span>命令<span style="color: black;">包括</span>了一个.take(5) (an action operation, which triggers computation)和在</p>/Users/akuntamukkala/temp/gutenburg.txt文本中输出10个最常用的words。在Python shell中用户<span style="color: black;">能够</span>实现<span style="color: black;">一样</span>的功能。

    <p style="font-size: 16px; color: black; line-height: 40px; text-align: left; margin-bottom: 15px;">RDD lineage<span style="color: black;">能够</span><span style="color: black;">经过</span>toDebugString(一个值得记住的操作)来跟踪。</p>scala&gt; topWordCount.take(5).foreach(x=&gt;println(x))
    (1044,the)
    (730,and)
    (679,of)
    (648,to)
    (511,I)<p style="font-size: 16px; color: black; line-height: 40px; text-align: left; margin-bottom: 15px;">常用的Transformations:</p>
    <div style="color: black; text-align: left; margin-bottom: 10px;">
      <div style="color: black; text-align: left; margin-bottom: 10px;">Transformation &amp; PurposeExample &amp; Resultfilter(func)<strong style="color: blue;">Purpose:</strong>new RDD by selecting those data elements on which func returns truescala&gt; val rdd = sc.parallelize(List(“ABC”,”BCD”,”DEF”)) scala&gt; val filtered = rdd.filter(_.contains(“C”)) scala&gt; filtered.collect<strong style="color: blue;">Result:</strong>Array = Array(ABC, BCD)map(func) <strong style="color: blue;">Purpose:</strong>return new RDD by applying func on each data elementscala&gt; val rdd=sc.parallelize(List(1,2,3,4,5)) scala&gt; val times2 = rdd.map(_*2) scala&gt; times2.collect<strong style="color: blue;">Result:</strong>Array = Array(2, 4, 6, 8, 10)flatMap(func)<strong style="color: blue;">Purpose:</strong>Similar to map but func returns a Seq instead of a value. For example, mapping a sentence into a Seq of wordsscala&gt; val rdd=sc.parallelize(List(“Spark is awesome”,”It is fun”)) scala&gt; val fm=rdd.flatMap(str=&gt;str.split(“ “)) scala&gt; fm.collect<strong style="color: blue;">Result:</strong>Array = Array(Spark, is, awesome, It, is, fun)reduceByKey(func,)<strong style="color: blue;">Purpose:</strong>To aggregate values of a key using a function. “numTasks” is an optional parameter to specify number of reduce tasksscala&gt; val word1=fm.map(word=&gt;(word,1)) scala&gt; val wrdCnt=word1.reduceByKey(_+_) scala&gt; wrdCnt.collect<strong style="color: blue;">Result:</strong>Array[(String, Int)] = Array((is,2), (It,1), (awesome,1), (Spark,1), (fun,1))groupByKey() <strong style="color: blue;">Purpose:</strong>To convert (K,V) to (K,Iterable&lt;V&gt;)scala&gt; val cntWrd = wrdCnt.map{case (word, count) =&gt; (count, word)} scala&gt; cntWrd.groupByKey.collect<strong style="color: blue;">Result:</strong>Array[(Int, Iterable)] = Array((1,ArrayBuffer(It, awesome, Spark, fun)), (2,ArrayBuffer(is)))distinct()<strong style="color: blue;">Purpose:</strong>Eliminate duplicates from RDDscala&gt; fm.distinct.collect <strong style="color: blue;">Result:</strong>Array = Array(is, It, awesome, Spark, fun)</div>
    </div>
    <p style="font-size: 16px; color: black; line-height: 40px; text-align: left; margin-bottom: 15px;">常用的集合操作:</p>
    <div style="color: black; text-align: left; margin-bottom: 10px;">
      <div style="color: black; text-align: left; margin-bottom: 10px;">
            <p style="font-size: 16px; color: black; line-height: 40px; text-align: left; margin-bottom: 15px;">Transformation and PurposeExample and Resultunion </p><strong style="color: blue;">Purpose:</strong>
            <p style="font-size: 16px; color: black; line-height: 40px; text-align: left; margin-bottom: 15px;">new RDD containing all elements from source RDD and argument.Scala&gt; val rdd1=sc.parallelize(List(‘A’,’B’))</p>
            <p style="font-size: 16px; color: black; line-height: 40px; text-align: left; margin-bottom: 15px;">scala&gt; val rdd2=sc.parallelize(List(‘B’,’C’))</p>
            <p style="font-size: 16px; color: black; line-height: 40px; text-align: left; margin-bottom: 15px;">scala&gt; rdd1.union(rdd2).collect</p><strong style="color: blue;">Result:</strong>
            <p style="font-size: 16px; color: black; line-height: 40px; text-align: left; margin-bottom: 15px;">Array = Array(A, B, B, C)intersection </p><strong style="color: blue;">Purpose:</strong>
            <p style="font-size: 16px; color: black; line-height: 40px; text-align: left; margin-bottom: 15px;">new RDD containing only common elements from source RDD and argument.Scala&gt; rdd1.intersection(rdd2).collect</p><strong style="color: blue;">Result:</strong>
            <p style="font-size: 16px; color: black; line-height: 40px; text-align: left; margin-bottom: 15px;">Array = Array(B)cartesian </p><strong style="color: blue;">Purpose:</strong>
            <p style="font-size: 16px; color: black; line-height: 40px; text-align: left; margin-bottom: 15px;">new RDD cross product of all elements from source RDD and argumentScala&gt; rdd1.cartesian(rdd2).collect</p><strong style="color: blue;">Result:</strong>
            <p style="font-size: 16px; color: black; line-height: 40px; text-align: left; margin-bottom: 15px;">Array[(Char, Char)] = Array((A,B), (A,C), (B,B), (B,C))subtract </p><strong style="color: blue;">Purpose:</strong>new RDD created by removing data elements in source RDD in common with argumentscala&gt; rdd1.subtract(rdd2).collect<strong style="color: blue;">Result:</strong>
            <p style="font-size: 16px; color: black; line-height: 40px; text-align: left; margin-bottom: 15px;">Array = Array(A)join(RDD,) </p><strong style="color: blue;">Purpose:</strong>
            <p style="font-size: 16px; color: black; line-height: 40px; text-align: left; margin-bottom: 15px;">When invoked on (K,V) and (K,W), this operation creates a new RDD of (K, (V,W))scala&gt; val personFruit = sc.parallelize(Seq((“Andy”, “Apple”), (“Bob”, “Banana”), (“Charlie”, “Cherry”), (“Andy”,”Apricot”)))</p>
            <p style="font-size: 16px; color: black; line-height: 40px; text-align: left; margin-bottom: 15px;">scala&gt; val personSE = sc.parallelize(Seq((“Andy”, “Google”), (“Bob”, “Bing”), (“Charlie”, “Yahoo”), (“Bob”,”AltaVista”)))</p>
            <p style="font-size: 16px; color: black; line-height: 40px; text-align: left; margin-bottom: 15px;">scala&gt; personFruit.join(personSE).collect</p><strong style="color: blue;">Result:</strong>
            <p style="font-size: 16px; color: black; line-height: 40px; text-align: left; margin-bottom: 15px;">Array[(String, (String, String))] = Array((Andy,(Apple,Google)), (Andy,(Apricot,Google)), (Charlie,(Cherry,Yahoo)), (Bob,(Banana,Bing)), (Bob,(Banana,AltaVista)))cogroup(RDD,)</p><strong style="color: blue;">Purpose:</strong>
            <p style="font-size: 16px; color: black; line-height: 40px; text-align: left; margin-bottom: 15px;">To convert (K,V) to (K,Iterable)scala&gt; personFruit.cogroup(personSe).collect
            </p><strong style="color: blue;">Result:</strong>Array[(String, (Iterable, Iterable))] = Array((Andy,(ArrayBuffer(Apple, Apricot),ArrayBuffer(google))), (Charlie,(ArrayBuffer(Cherry),ArrayBuffer(Yahoo))), (Bob,(ArrayBuffer(Banana),ArrayBuffer(Bing, AltaVista))))
      </div>
    </div>
    <p style="font-size: 16px; color: black; line-height: 40px; text-align: left; margin-bottom: 15px;"><span style="color: black;">更加多</span>transformations信息,请查看</p>http://spark.apache.org/docs/latest/programming-guide.html#transformations
    <p style="font-size: 16px; color: black; line-height: 40px; text-align: left; margin-bottom: 15px;">常用的actions</p>
    <div style="color: black; text-align: left; margin-bottom: 10px;">
      <div style="color: black; text-align: left; margin-bottom: 10px;">Action &amp; PurposeExample &amp; Resultcount <strong style="color: blue;">Purpose:</strong>get the number of data elements in the RDDscala&gt; val rdd = sc.parallelize(list(‘A’,’B’,’c’)) scala&gt; rdd.count<strong style="color: blue;">Result:</strong>long = 3collect <strong style="color: blue;">Purpose:</strong>get all the data elements in an RDD as an arrayscala&gt; val rdd = sc.parallelize(list(‘A’,’B’,’c’)) scala&gt; rdd.collect<strong style="color: blue;">Result:</strong>Array = Array(A, B, c)reduce(func) <strong style="color: blue;">Purpose:</strong>Aggregate the data elements in an RDD using this function which takes two arguments and returns onescala&gt; val rdd = sc.parallelize(list(1,2,3,4)) scala&gt; rdd.reduce(_+_)<strong style="color: blue;">Result:</strong>Int = 10take (n) <strong style="color: blue;">Purpose:</strong>: fetch first n data elements in an RDD. computed by driver program.Scala&gt; val rdd = sc.parallelize(list(1,2,3,4)) scala&gt; rdd.take(2)<strong style="color: blue;">Result:</strong>Array = Array(1, 2)foreach(func)<strong style="color: blue;">Purpose:</strong>execute function for each data element in RDD. usually used to update an accumulator(discussed later) or interacting with external systems.Scala&gt; val rdd = sc.parallelize(list(1,2,3,4)) scala&gt; rdd.foreach(x=&gt;println(“%s*10=%s”. format(x,x*10)))<strong style="color: blue;">Result:</strong>1*10=10 4*10=40 3*10=30 2*10=20first<strong style="color: blue;">Purpose:</strong>retrieves the first data element in RDD. Similar to take(1)scala&gt; val rdd = sc.parallelize(list(1,2,3,4)) scala&gt; rdd.first <strong style="color: blue;">Result:</strong>Int = 1saveAsTextFile(path) <strong style="color: blue;">Purpose:</strong>Writes the content of RDD to a text file or a set of text files to local file system/ HDFSscala&gt; val hamlet = sc.textFile(“/users/akuntamukkala/ temp/gutenburg.txt”) scala&gt; hamlet.filter(_.contains(“Shakespeare”)). saveAsTextFile(“/users/akuntamukkala/temp/ filtered”)<strong style="color: blue;">Result:</strong>akuntamukkala@localhost~/temp/filtered$ ls _SUCCESS part-00000 part-00001</div>
    </div>
    <p style="font-size: 16px; color: black; line-height: 40px; text-align: left; margin-bottom: 15px;"><span style="color: black;">更加多</span>actions参见</p>http://spark.apache.org/docs/latest/programming-guide.html#actions
    <p style="font-size: 16px; color: black; line-height: 40px; text-align: left; margin-bottom: 15px;"><span style="color: black;">6、</span>RDD持久性</p>
    <p style="font-size: 16px; color: black; line-height: 40px; text-align: left; margin-bottom: 15px;">Apache Spark中一个<span style="color: black;">重点</span>的能力<span style="color: black;">便是</span>在集群内存中持久化/缓存RDD。这将<span style="color: black;">明显</span>地<span style="color: black;">提高</span>交互速度。下表<span style="color: black;">表示</span>了Spark中<span style="color: black;">各样</span>选项。</p>
    <div style="color: black; text-align: left; margin-bottom: 10px;">
      <div style="color: black; text-align: left; margin-bottom: 10px;">Storage LevelPurposeMEMORY_ONLY (Default level)This option stores RDD in available cluster memory as deserialized Java objects. Some partitions may not be cached if there is not enough cluster memory. Those partitions will be recalculated on the fly as needed.MEMORY_AND_DISKThis option stores RDD as deserialized Java objects. If RDD does not fit in cluster memory, then store those partitions on the disk and read them as needed.MEMORY_ONLY_SERThis options stores RDD as serialized Java objects (One byte array per partition). This is more CPU intensive but saves memory as it is more space efficient. Some partitions may not be cached. Those will be recalculated on the fly as needed.MEMORY_ONLY_DISK_SERThis option is same as above except that disk is used when memory is not sufficient.DISC_ONLYThis option stores the RDD only on the diskMEMORY_ONLY_2, MEMORY_AND_DISK_2, etc.Same as other levels but partitions are replicated on 2 slave nodes</div>
    </div>
    <p style="font-size: 16px; color: black; line-height: 40px; text-align: left; margin-bottom: 15px;">上面的存储等级<span style="color: black;">能够</span><span style="color: black;">经过</span>RDD. cache操作上的 persist操作<span style="color: black;">拜访</span>,<span style="color: black;">能够</span>方便地指定MEMORY_ONLY选项。关于持久化等级的<span style="color: black;">更加多</span>信息,<span style="color: black;">能够</span><span style="color: black;">拜访</span><span style="color: black;">这儿</span></p>http://spark.apache.org/docs/latest/programming-guide.html#rdd-persistence。

    <p style="font-size: 16px; color: black; line-height: 40px; text-align: left; margin-bottom: 15px;">Spark<span style="color: black;">运用</span>Least Recently Used (LRU)算法来移除缓存中旧的、不常用的RDD,从而释放出<span style="color: black;">更加多</span>可用内存。<span style="color: black;">一样</span>还<span style="color: black;">供给</span>了一个unpersist 操作来强制移除缓存/持久化的RDD。</p>
    <p style="font-size: 16px; color: black; line-height: 40px; text-align: left; margin-bottom: 15px;"><span style="color: black;">7、</span>变量共享</p>
    <p style="font-size: 16px; color: black; line-height: 40px; text-align: left; margin-bottom: 15px;">Accumulators。Spark<span style="color: black;">供给</span>了一个非常<span style="color: black;">方便</span>地途径来避免可变的计数器和计数器同步问题——Accumulators。Accumulators在一个Spark context中<span style="color: black;">经过</span>默认值初始化,这些计数器在Slaves节点上可用,<span style="color: black;">然则</span>Slaves节点<span style="color: black;">不可</span>对其进行读取。它们的<span style="color: black;">功效</span><span style="color: black;">便是</span>来获取原子更新,并将其转发到Master。Master是<span style="color: black;">独一</span><span style="color: black;">能够</span>读取和计算所有更新合集的节点。举个例子:</p>akuntamukkala@localhost~/temp$ cat output.log
    error
    warning
    info
    trace
    error
    info
    info
    scala&gt; val nErrors=sc.accumulator(0.0)
    scala&gt; val logs = sc.textFile(“/Users/akuntamukkala/temp/output.log”)
    scala&gt; logs.filter(_.contains(“error”)).foreach(x=&gt;nErrors+=1)
    scala&gt; nErrors.value
    Result:Int = 2<p style="font-size: 16px; color: black; line-height: 40px; text-align: left; margin-bottom: 15px;">Broadcast Variables。<span style="color: black;">实质</span>生产中,<span style="color: black;">经过</span>指定key在RDDs上对数据进行合并的场景非常<span style="color: black;">平常</span>。在这种<span style="color: black;">状况</span>下,很可能会<span style="color: black;">显现</span>给slave nodes发送大体积数据集的<span style="color: black;">状况</span>,让其负责托管需要做join的数据。<span style="color: black;">因此呢</span>,<span style="color: black;">这儿</span>很可能存在巨大的性能瓶颈,<span style="color: black;">由于</span>网络IO比内存<span style="color: black;">拜访</span>速度慢100倍。为<span style="color: black;">认识</span>决这个问题,Spark<span style="color: black;">供给</span>了Broadcast Variables,如其名<span style="color: black;">叫作</span><span style="color: black;">同样</span>,它会向slave nodes进行广播。<span style="color: black;">因此呢</span>,节点上的RDD操作<span style="color: black;">能够</span>快速<span style="color: black;">拜访</span>Broadcast Variables值。举个例子,期望计算一个文件中所有路线项的运输成本。<span style="color: black;">经过</span>一个look-up table指定每种运输类型的成本,这个look-up table就<span style="color: black;">能够</span><span style="color: black;">做为</span>Broadcast Variables。</p>akuntamukkala@localhost~/temp$ cat packagesToShip.txt ground
    express
    media
    priority
    priority
    ground
    express
    media
    scala&gt; val map = sc.parallelize(Seq((“ground”,1),(“med”,2), (“priority”,5),(“express”,10))).collect.toMap
    map: scala.collection.immutable.Map = Map(ground -&gt; 1, media -&gt; 2, priority -&gt; 5, express -&gt; 10)
    scala&gt; val bcMailRates = sc.broadcast(map)<p style="font-size: 16px; color: black; line-height: 40px; text-align: left; margin-bottom: 15px;"><span style="color: black;">以上</span>命令中,<span style="color: black;">咱们</span><span style="color: black;">创立</span>了一个broadcast variable,基于服务类别成本的map。</p>scala&gt; val pts = sc.textFile(“/Users/akuntamukkala/temp/packagesToShip.txt”)<p style="font-size: 16px; color: black; line-height: 40px; text-align: left; margin-bottom: 15px;">在<span style="color: black;">以上</span>命令中,<span style="color: black;">咱们</span><span style="color: black;">经过</span>broadcast variable的mailing rates来计算运输成本。</p>scala&gt; pts.map(shipType=&gt;(shipType,1)).reduceByKey(_+_). map{case (shipType,nPackages)=&gt;(shipType,nPackages*bcMailRates. value(shipType))}.collect<p style="font-size: 16px; color: black; line-height: 40px; text-align: left; margin-bottom: 15px;"><span style="color: black;">经过</span><span style="color: black;">以上</span>命令,<span style="color: black;">咱们</span><span style="color: black;">运用</span>accumulator来累加所有运输的成本。<span style="color: black;">仔细</span>信息可<span style="color: black;">经过</span>下面的PDF查看</p>http://ampcamp.berkeley.edu/wp-content/uploads/2012/06/matei-zaharia-amp-camp-2012-advanced-spark.pdf。
    <p style="font-size: 16px; color: black; line-height: 40px; text-align: left; margin-bottom: 15px;"><span style="color: black;">8、</span>Spark SQL</p>
    <p style="font-size: 16px; color: black; line-height: 40px; text-align: left; margin-bottom: 15px;"><span style="color: black;">经过</span>Spark Engine,Spark SQL<span style="color: black;">供给</span>了一个<span style="color: black;">方便</span>的途径来进行交互式分析,<span style="color: black;">运用</span>一个被<span style="color: black;">叫作</span>为SchemaRDD类型的RDD。SchemaRDD<span style="color: black;">能够</span><span style="color: black;">经过</span>已有RDDs<span style="color: black;">创立</span>,或者其他<span style="color: black;">外边</span>数据格式,<span style="color: black;">例如</span>Parquet files、JSON数据,<span style="color: black;">或</span>在Hive上运行HQL。SchemaRDD非常类似于RDBMS中的表格。一旦数据被导入SchemaRDD,Spark引擎就<span style="color: black;">能够</span>对它进行批或流处理。Spark SQL<span style="color: black;">供给</span>了两种类型的Contexts——SQLContext和HiveContext,扩展了SparkContext的功能。</p>
    <p style="font-size: 16px; color: black; line-height: 40px; text-align: left; margin-bottom: 15px;">SparkContext<span style="color: black;">供给</span>了到简单SQL parser的<span style="color: black;">拜访</span>,而HiveContext则<span style="color: black;">供给</span>了到HiveQL parser的<span style="color: black;">拜访</span>。HiveContext<span style="color: black;">准许</span>企业利用已有的Hive<span style="color: black;">基本</span><span style="color: black;">设备</span>。</p>
    <p style="font-size: 16px; color: black; line-height: 40px; text-align: left; margin-bottom: 15px;"><span style="color: black;">这儿</span>看一个简单的SQLContext示例。</p>
    <p style="font-size: 16px; color: black; line-height: 40px; text-align: left; margin-bottom: 15px;">下面文本中的用户数据<span style="color: black;">经过</span>“|”来分割。</p>John Smith|38|M|201 East Heading Way #2203,Irving, TX,75063 Liana Dole|22|F|1023 West Feeder Rd, Plano,TX,75093 Craig Wolf|34|M|75942 Border Trail,Fort Worth,TX,75108 John Ledger|28|M|203 Galaxy Way,Paris, TX,75461 Joe Graham|40|M|5023 Silicon Rd,London,TX,76854<p style="font-size: 16px; color: black; line-height: 40px; text-align: left; margin-bottom: 15px;">定义Scala case class来<span style="color: black;">暗示</span>每一行:</p>case class Customer(name:String,age:Int,gender:String,address: String)<p style="font-size: 16px; color: black; line-height: 40px; text-align: left; margin-bottom: 15px;">下面的代码片段<span style="color: black;">表现</span>了<span style="color: black;">怎样</span><span style="color: black;">运用</span>SparkContext来<span style="color: black;">创立</span>SQLContext,读取输入文件,将每一行都转换成SparkContext中的一条记录,并<span style="color: black;">经过</span>简单的SQL语句来<span style="color: black;">查找</span>30岁以下的男性用户。</p>val sparkConf = new SparkConf.setAppName(“Customers”)
    val sc = new SparkContext(sparkConf)
    val sqlContext = new SQLContext(sc)
    val r = sc.textFile(“/Users/akuntamukkala/temp/customers.txt”) val records = r.map(_.split(‘|’))
    val c = records.map(r=&gt;Customer(r(0),r(1).trim.toInt,r(2),r(3))) c.registerAsTable(“customers”)sqlContext.sql(“select * from customers where gender=’M’ and age &lt;
    30”).collect.foreach(println) Result:[John Ledger,28,M,203 Galaxy Way,Paris,
    TX,75461]<p style="font-size: 16px; color: black; line-height: 40px; text-align: left; margin-bottom: 15px;"><span style="color: black;">更加多</span><span style="color: black;">运用</span>SQL和HiveQL的示例请<span style="color: black;">拜访</span>下面链接</p>
    <p style="font-size: 16px; color: black; line-height: 40px; text-align: left; margin-bottom: 15px;">https://spark.apache.org/docs/latest/sql-programming-guide.html、</p>https://databricks-training.s3.amazonaws.com/data-exploration-using-spark-sql.html。
    <img src="https://p3-sign.toutiaoimg.com/6206/6639598009~noop.image?_iz=58558&amp;from=article.pc_detail&amp;lk3s=953192f4&amp;x-expires=1728803992&amp;x-signature=ox3YXb%2F1dQp0I8zIgsK0qwvjYbs%3D" style="width: 50%; margin-bottom: 20px;">
    <p style="font-size: 16px; color: black; line-height: 40px; text-align: left; margin-bottom: 15px;"><span style="color: black;">9、</span>Spark Streaming</p>
    <p style="font-size: 16px; color: black; line-height: 40px; text-align: left; margin-bottom: 15px;">Spark Streaming<span style="color: black;">供给</span>了一个可扩展、容错、<span style="color: black;">有效</span>的途径来处理流数据,<span style="color: black;">同期</span>还利用了Spark的简易编程模型。从真正<span style="color: black;">道理</span>上讲,Spark Streaming会将流数据转换成micro batches,从而将Spark批处理编程模型应用到流用例中。这种统一的编程模型让Spark<span style="color: black;">能够</span>很好地整合批量处理和交互式流分析。下图<span style="color: black;">表示</span>了Spark Streaming<span style="color: black;">能够</span>从<span style="color: black;">区别</span>数据源中读取数据进行分析。</p><img src="https://p3-sign.toutiaoimg.com/6206/6639669490~noop.image?_iz=58558&amp;from=article.pc_detail&amp;lk3s=953192f4&amp;x-expires=1728803992&amp;x-signature=L0hkarcBXMEse2wAoOOKsu5wQRY%3D" style="width: 50%; margin-bottom: 20px;">
    <p style="font-size: 16px; color: black; line-height: 40px; text-align: left; margin-bottom: 15px;">Spark Streaming中的核心抽象是Discretized Stream(DStream)。DStream由一组RDD<span style="color: black;">构成</span>,<span style="color: black;">每一个</span>RDD都<span style="color: black;">包括</span>了规<span style="color: black;">按时</span>间(可配置)流入的数据。图12很好地展示了Spark Streaming<span style="color: black;">怎样</span><span style="color: black;">经过</span>将流入数据转换成一系列的RDDs,再转换成DStream。<span style="color: black;">每一个</span>RDD都<span style="color: black;">包括</span>两秒(设定的区间长度)的数据。在Spark Streaming中,最小长度<span style="color: black;">能够</span>设置为0.5秒,<span style="color: black;">因此呢</span>处理延时<span style="color: black;">能够</span>达到1秒以下。</p>
    <p style="font-size: 16px; color: black; line-height: 40px; text-align: left; margin-bottom: 15px;">Spark Streaming<span style="color: black;">一样</span><span style="color: black;">供给</span>了 window operators,它有助于更有效率在一组RDD( a rolling window of time)上进行计算。<span style="color: black;">同期</span>,DStream还<span style="color: black;">供给</span>了一个API,其操作符(transformations和output operators)<span style="color: black;">能够</span><span style="color: black;">帮忙</span>用户直接操作RDD。下面不妨看向<span style="color: black;">包括</span>在Spark Streaming下载中的一个简单示例。示例是在Twitter流中找出趋势hashtags,详见下面代码。</p>spark-1.0.1/examples/src/main/scala/org/apache/spark/examples/streaming/TwitterPopularTags.scala
    val sparkConf = new SparkConf.setAppName(“TwitterPopularTags”)
    val ssc = new StreamingContext(sparkConf, Seconds(2))
    val stream = TwitterUtils.createStream(ssc, None, filters)<p style="font-size: 16px; color: black; line-height: 40px; text-align: left; margin-bottom: 15px;"><span style="color: black;">以上</span>代码用于<span style="color: black;">创立</span>Spark Streaming Context。Spark Streaming将在DStream中<span style="color: black;">创立</span>一个RDD,<span style="color: black;">包括</span>了每2秒流入的tweets。</p>val hashTags = stream.flatMap(status =&gt; status.getText.split(“ “).filter(_.startsWith(“#”)))<p style="font-size: 16px; color: black; line-height: 40px; text-align: left; margin-bottom: 15px;">上述代码片段将Tweet转换成一组words,并过滤出所有以a#开头的。</p>val topCounts60 = hashTags.map((_, 1)).reduceByKeyAndWindow(_ + _, Seconds(60)).map{case (topic, count) =&gt; (count, topic)}. transform(_.sortByKey(false))<p style="font-size: 16px; color: black; line-height: 40px; text-align: left; margin-bottom: 15px;"><span style="color: black;">以上</span>代码展示了<span style="color: black;">怎样</span>整合计算60秒内一个hashtag流入的总次数。</p>topCounts60.foreachRDD(rdd =&gt; {
    val topList = rdd.take(10)
    println(“\nPopular topics in last 60 seconds (%s
    total):”.format(rdd.count)) topList.foreach{case (count, tag) =&gt; println(“%s (%s
    tweets)”.format(tag, count))} })<p style="font-size: 16px; color: black; line-height: 40px; text-align: left; margin-bottom: 15px;">上面代码将找出top 10趋势tweets,<span style="color: black;">而后</span>将其打印。</p>ssc.start<p style="font-size: 16px; color: black; line-height: 40px; text-align: left; margin-bottom: 15px;"><span style="color: black;">以上</span>代码让Spark Streaming Context <span style="color: black;">起始</span>检索tweets。<span style="color: black;">一块</span>聚焦<span style="color: black;">有些</span>常用操作,假设<span style="color: black;">咱们</span>正在从一个socket中读入流文本。</p>al lines = ssc.socketTextStream(“localhost”, 9999, StorageLevel.MEMORY_AND_DISK_SER)<img src="https://p3-sign.toutiaoimg.com/6208/1493869038~noop.image?_iz=58558&amp;from=article.pc_detail&amp;lk3s=953192f4&amp;x-expires=1728803992&amp;x-signature=jUGjgzvQRcM9bClUhxtYSehJnYs%3D" style="width: 50%; margin-bottom: 20px;"><img src="https://p3-sign.toutiaoimg.com/6203/8552517487~noop.image?_iz=58558&amp;from=article.pc_detail&amp;lk3s=953192f4&amp;x-expires=1728803992&amp;x-signature=UMnMVL7gGIOLTVJBibNTSOFmNtY%3D" style="width: 50%; margin-bottom: 20px;"><img src="https://p3-sign.toutiaoimg.com/6203/8552790159~noop.image?_iz=58558&amp;from=article.pc_detail&amp;lk3s=953192f4&amp;x-expires=1728803992&amp;x-signature=eBHGnbBwGQLeWTlPoN4nZ0lstTM%3D" style="width: 50%; margin-bottom: 20px;">
    <p style="font-size: 16px; color: black; line-height: 40px; text-align: left; margin-bottom: 15px;"><span style="color: black;">更加多</span>operators请<span style="color: black;">拜访</span></p>http://spark.apache.org/docs/latest/streaming-programming-guide.html#transformations
    <p style="font-size: 16px; color: black; line-height: 40px; text-align: left; margin-bottom: 15px;">Spark Streaming<span style="color: black;">持有</span><span style="color: black;">海量</span>强大的output operators,<span style="color: black;">例如</span>上文<span style="color: black;">说到</span>的 foreachRDD,<span style="color: black;">认识</span><span style="color: black;">更加多</span>可<span style="color: black;">拜访</span> </p>http://spark.apache.org/docs/latest/streaming-programming-guide.html#output-operations。
    <p style="font-size: 16px; color: black; line-height: 40px; text-align: left; margin-bottom: 15px;">十、附加学习资源</p>




qzmjef 发表于 2024-10-13 02:32:27

感谢您的精彩评论,为我带来了新的思考角度。
页: [1]
查看完整版本: 新手福利:Apache Spark入门攻略