大数据开发中Spark常见RDD是怎样的

大数据开发中Spark常见RDD是怎样的,针对这个问题,这篇文章详细介绍了相对应的分析和解答,希望可以帮助更多想解决这个问题的小伙伴找到更简单易行的方法。A list of partitionsA function for computing each splitA list of dependencies on other RDDsOptionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)Optionally, a list of preferred locations to compute each split on (e.g. block locations for an HDFS file)这是RDD的源码中注释中写到的,下面介绍这五种特征属性一组分片(Partition),即数据集的基本组成单位。对于RDD来说,每个分片都会被一个计算任务处理,并决 定并行计算的粒度。用户可以在创建RDD时指定RDD的分片个数,如果没有指定,那么就会采用默认值一个对分区数据进行计算的函数。Spark中RDD的计算是以分片为单位的,每个RDD都会实现 compute 函数以 达到该目的。compute函数会对迭代器进行组合,不需要保存每次计算的结果RDD之间的存在依赖关系。RDD的每次转换都会生成一个新的RDD,RDD之间形成类似于流水线一样的前后依 赖关系(lineage)。在部分分区数据丢失时,Spark可以通过这个依赖关系重新计算丢失的分区数据,而不是 对RDD的所有分区进行重新计算对于 key-value 的RDD而言,可能存在分区器(Partitioner)。Spark 实现了两种类型的分片函数,一个是基于 哈希的HashPartitioner,另外一个是基于范围的RangePartitioner。只有 key-value 的RDD,才可能有 Partitioner,非key-value的RDD的Parititioner的值是None。Partitioner函数决定了RDD本身的分片数量,也 决定了parent RDD Shuffle输出时的分片数量一个列表,存储存储每个Partition的优先位置(preferred location)。对于一个HDFS文件来说,这个列表保 存的就是每个Partition所在的块的位置。按照“移动数据不移动计算”的理念,Spark在任务调度的时候,会尽可 能地将计算任务分配到其所要处理数据块的存储位置从前面的RDD的基本特征入手,在工作中常编写的程序是,创建RDDRDD的转换,RDD的算子的执行,创建对应着外部系统的数据流入Spark集群的必选步骤,至于之间从集合创建的数据,一般在测试时候使用,所以不细述,RDD的转换对应一个专门的算子叫Transformation其是惰性加载使用的, 而行动对应着触发Transformation执行的操作,一般是输出到集合,或者打印出来,或者返回一个值,另外就是从集群输出到别的系统,这有一个专业词叫Action.转换算子,即从一个RDD到另外一个RDD的转换操作,对应一些内置的Compute函数,但是这些函数被有没有shuffle来分为宽依赖算子和窄依赖算子一般网上文章有两种,一种是搬运定义的,即是否一个父RDD分区会被多个子分区依赖,另外一种是看有没有Shuffle,有Shuffle就是宽依赖,没有则是窄依赖,第一种还靠谱点,第二种就是拿本身来说本身,所以没有参考价值,2.1.3 如何区别宽依赖和窄依赖,可以之间看这个map(func):对数据集中的每个元素都使用func,然后返回一个新的RDD filter(func):对数据集中的每个元素都使用func,然后返回一个包含使func为true的元素构成的RDD flatMap(func):与 map 类似,每个输入元素被映射为0或多个输出元素 mapPartitions(func):和map很像,但是map是将func作用在每个元素上,而mapPartitions是func作用在整个分 区上。假设一个RDD有N个元素,M个分区(N >> M),那么map的函数将被调用N次,而mapPartitions中的函数 仅被调用M次,一次处理一个分区中的所有元素 mapPartitionsWithIndex(func):与 mapPartitions 类似,多了分区的索引值的信息glom():将每一个分区形成一个数组,形成新的RDD类型 RDD[Array[T]] sample(withReplacement, fraction, seed):采样算子。以指定的随机种子(seed)随机抽样出数量为fraction的数 据,withReplacement表示是抽出的数据是否放回,true为有放回的抽样,false为无放回的抽样coalesce(numPartitions,false):无shuffle,一般用来减少分区union(otherRDD) : 求两个RDD的并集cartesian(otherRDD):笛卡尔积zip(otherRDD):将两个RDD组合成 key-value 形式的RDD,默认两个RDD的partition数量以及元素数量都相同,否 则会抛出异常。map 与 mapPartitions 的区别 map:每次处理一条数据 mapPartitions:每次处理一个分区的数据,分区的数据处理完成后,数据才能释放,资源不足时容易导致 OOM 最佳实践:当内存资源充足时,建议使用mapPartitions,以提高处理效率groupBy(func):按照传入函数的返回值进行分组。将key相同的值放入一个迭代器distinct([numTasks])):对RDD元素去重后,返回一个新的RDD。可传入numTasks参数改变RDD分区数coalesce(numPartitions, true):有shuffle,无论增加分区还是减少分区,一般用repartition来代替repartition(numPartitions):增加或减少分区数,有shufflesortBy(func, [ascending], [numTasks]):使用 func 对数据进行处理,对处理后的结果进行排序intersection(otherRDD) : 求两个RDD的交集subtract (otherRDD) : 求两个RDD的差集这里我建议理解不了的算子,直接从Sparkhistory的依赖图来看,有没有划分Stage,如果划分了就是宽依赖,没有划分就是窄依赖,当然这是实战派的做法,可以在同事或者同学说明问题的时候,show your code 给他,然后把依赖图拿给他 ,当然作为理论加实践的并行者,我这里再拿一种来判别,是从理解定义开始的,定义说是父RDD分区有没有被多个子分区依赖,那可以从这个角度想一下,父分区单个分区数据,有没有可能流向不同的子RDD的分区,比如想一想distinct算子,或者sortBy算子,全局去重和全局排序,假设刚开始1,2,3在一个分区,经过map(x => (x, null)).reduceByKey((x, y) => x).map(_._1) 去重后,虽然分区数量没有变,但是每个分区数据必然要看别的分区的数据,才能知道最后自己要不要保留,从输入分区,到输出分区,必然经过汇合重组,所以必然有shuffle的。sortBy同理。Action触发Job。一个Spark程序(Driver程序)包含了多少 Action 算子,那么就有多少Job; 典型的Action算子: collect / count collect() => sc.runJob() => … => dagScheduler.runJob() => 触发了Jobcollect() / collectAsMap() stats / count / mean / stdev / max / min reduce(func) / fold(func) / aggregate(func)first():Return the first element in this RDD take(n):Take the first num elements of the RDD top(n):按照默认(降序)或者指定的排序规则,返回前num个元素。 takeSample(withReplacement, num, [seed]):返回采样的数据 foreach(func) / foreachPartition(func):与map、mapPartitions类似,区别是 foreach 是 Action saveAsTextFile(path) / saveAsSequenceFile(path) / saveAsObjectFile(path)RDD整体上分为 Value 类型和 Key-Value 类型。 前面介绍的是 Value 类型的RDD的操作,实际使用更多的是 key-value 类型的RDD,也称为 PairRDD。 Value 类型RDD的操作基本集中在 RDD.scala 中; key-value 类型的RDD操作集中在 PairRDDFunctions.scala 中;前面介绍的大多数算子对 Pair RDD 都是有效的,RDD的值为key-value的时候即可隐式转换为PairRDD, Pair RDD还有属于自己的 Transformation、Action 算子;mapValues / flatMapValues / keys / values,这些操作都可以使用 map 操作实现,是简化操作。PariRDD(k, v)使用范围广,聚合 groupByKey / reduceByKey / foldByKey / aggregateByKey combineByKey(OLD) / combineByKeyWithClassTag (NEW) => 底层实现 subtractByKey:类似于subtract,删掉 RDD 中键与 other RDD 中的键相同的元素结论:效率相等用最熟悉的方法;groupByKey在一般情况下效率低,尽量少用sortByKey:sortByKey函数作用于PairRDD,对Key进行排序cogroup / join / leftOuterJoin / rightOuterJoin / fullOuterJoincollectAsMap / countByKey / lookup(key)lookup(key):高效的查找方法,只查找对应分区的数据关于大数据开发中Spark常见RDD是怎样的问题的解答就分享到这里了,希望以上内容可以对大家有一定的帮助,如果你还有很多疑惑没有解开,可以关注 香港云主机开发云行业资讯频道了解更多相关知识。

相关推荐: Storm为什么比Hadoop快

本篇内容主要讲解“Storm为什么比Hadoop快”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“Storm为什么比Hadoop快”吧!“快”这个词是不明确的,专业属于点有两个层面:时延 , 指数据从产生到运算产生结…

免责声明:本站发布的图片视频文字,以转载和分享为主,文章观点不代表本站立场,本站不承担相关法律责任;如果涉及侵权请联系邮箱:360163164@qq.com举报,并提供相关证据,经查实将立刻删除涉嫌侵权内容。

(0)
打赏 微信扫一扫 微信扫一扫
上一篇 09/22 19:49
下一篇 09/22 21:08

相关推荐

发表评论

您的电子邮箱地址不会被公开。