博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Spark 基本函数学习笔记一
阅读量:6479 次
发布时间:2019-06-23

本文共 9214 字,大约阅读时间需要 30 分钟。

 

Spark 基本函数学习笔记一

spark的函数主要分两类,Transformations和Actions。

Transformations为一些数据转换类函数,actions为一些行动类函数:

  • 转换:转换的返回值是一个新的RDD集合,而不是单个值。调用一个变换方法,
    不会有任何求值计算,它只获取一个RDD作为参数,然后返回一个新的RDD。
  • 行动:行动操作计算并返回一个新的值。当在一个RDD对象上调用行动函数时,
    会在这一时刻计算全部的数据处理查询并返回结果值。

这里介绍pyspark中常用函数功能以及代码示例。

官方文档链接:

文档github链接:

RDD下面的Transformations函数,这些函数适用于RDD集合操作:

  • map(func)
  • flatMap(func)
  • mapPartitions(func)
  • mapPartitionsWithIndex(func)
  • foreach(f)
  • foreachPartition(f)
  • filter(func)
  • sample()
  • union()
  • intersection()
  • distinct()
  • groupBy()
  • groupByKey()
  • reduce
  • reduceByKey()
  • aggregate
  • aggregateByKey()
  • sortBy
  • sortByKey()
  • join()
  • cogroup()
  • cartesian()
  • coalesce()
  • Pipe()
  • Repartition()
  • rePartitionAndSortWithinPartitions()
In [1]:
from pyspark.sql import SparkSessionimport numpy as npfrom pyspark import SparkContextspark = SparkSession.Builder().getOrCreate()sc = spark.sparkContext
In [2]:
rdd = sc.parallelize(range(10))rdd.collect()
Out[2]:
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
 

map(func)转换

map(func) 与python的map函数功能一样,都是对每个元素执行func函数的计算。

返回一个新的数据集,数据集的每个元素都是经过func函数处理的

我们这里以对每个元素乘以10计算示例

In [3]:
# 使用lambda 函数temp = rdd.map(lambda x: x*10)temp.collect()
Out[3]:
[0, 10, 20, 30, 40, 50, 60, 70, 80, 90]
In [4]:
# 使用自定义函数def multi_10(x):    return x * 10temp = rdd.map(multi_10)temp.collect()
Out[4]:
[0, 10, 20, 30, 40, 50, 60, 70, 80, 90]
 

flatMap(func)

类似于map(func), 但是不同的是map对每个元素处理完后返回与原数据集相同元素数量的数据集,而flatMap返回的元素数不一定和原数据集相同

In [5]:
rdd = sc.parallelize([[1,2],[2,3],[3,4]])d = rdd.flatMap(lambda x: x)d.collect()
Out[5]:
[1, 2, 2, 3, 3, 4]
 

mapPartitions(func)

mapPartitions是map的一个变种。

map的输入函数是应用于RDD中每个元素,而mapPartitions的输入函数是应用于每个分区,

也就是把每个分区中的内容作为整体来处理的。

In [6]:
rdd = sc.parallelize([1,2,3,4,5], 3)def f(iterator):     yield sum(iterator)rdd.mapPartitions(f).collect()
Out[6]:
[1, 5, 9]
 

glom()函数就是要显示出RDD对象的分区情况

可以看到rdd分了三个区,每个区的数据为: [[1], [2, 3], [4, 5]]

所以上面的例子中mapPartitions计算sum的结果是三个,

每个分区求和结果是[1,5,9]

In [7]:
rdd.glom().collect()
Out[7]:
[[1], [2, 3], [4, 5]]
 

mapPartitionsWithIndex(func)

与mapPartition相比,mapPartitionWithIndex能够保留分区索引,函数的传入参数也是分区索引和iterator构成的键值对。

In [8]:
def f1(partitionIndex,iterator):    yield (partitionIndex,sum(iterator))    def f2(partitionIndex,iterator):    yield sum(iterator)    rf1 = rdd.mapPartitionsWithIndex(f1)rf2 = rdd.mapPartitionsWithIndex(f2)# f1 的返回值可以保留分区索引print(rf1.glom().collect())print(rf2.glom().collect())
 
[[(0, 1)], [(1, 5)], [(2, 9)]][[1], [5], [9]]
 

foreach(func)

在RDD上每个元素执行func运算,foreach与map的区别是:

map返回一个新的RDD,foreach直接在元素上应用func操作,

原RDD内容不变,无返回值

In [9]:
rdd = sc.parallelize([1,2,3,4,5])res = rdd.foreach(lambda x: x*2)print(res)  # 打印结果为Nonerdd.collect() # 输出为 [1, 2, 3, 4, 5]# 打印元素,如果使用jupyter,在启动页面可以看到打印def f(x):     print(x)sc.parallelize([1, 2, 3, 4, 5]).foreach(f)
 
None
 

foreachPartition(f)

与foreach一样,只是操作的对象为RDD的每个数据分区

In [10]:
def f(iterator):    for x in iterator:        print(x)sc.parallelize([1, 2, 3, 4, 5]).foreachPartition(f)
 

filter(func)函数

返回一个新的数据集,这个数据集中的元素是通过func函数筛选后返回为true的元素

简单的说就是,对数据集中的每个元素进行筛选,如果符合条件则返回true,不符合返回false,

最后将返回为true的元素组成新的数据集返回

In [11]:
# 选择偶数d = rdd.filter(lambda x: x % 2 ==0)d.collect()
Out[11]:
[2, 4]
In [12]:
# 整除3def three(x):    return x % 3 == 0d = rdd.filter(three)d.collect()
Out[12]:
[3]
 

sample()

sample()方法返回数据集的随机样本。

  • 第一个参数指定采样是否应该替换
  • 第二个参数定义返回数据的分数
  • 第三个参数是伪随机数产生器的种子
In [13]:
rdd.sample(False, 0.4, 40).collect()
Out[13]:
[3]
 

union

union(otherDataset)是数据合并,返回一个新的数据集,由原数据集和otherDataset联合而成。

In [14]:
rdd1 = rdd.map(lambda x: x + 10)rdd.union(rdd1).collect()
Out[14]:
[1, 2, 3, 4, 5, 11, 12, 13, 14, 15]
 

intersection()

返回两个数据集的交集

In [15]:
rdd1 = sc.parallelize([1, 10, 2, 3, 4, 5])rdd2 = sc.parallelize([1, 6, 2, 3, 7, 8])rdd1.intersection(rdd2).collect()
Out[15]:
[1, 2, 3]
 

distinct()

返回数据集中不同值的列表,即去除数据集中重复的元素

In [16]:
rdd = sc.parallelize([1,2,3,4,1,2,3])rdd.distinct().collect()
Out[16]:
[4, 1, 2, 3]
 

groupBy

给定一个分组条件,返回分组后的key value数据集

In [17]:
rdd = sc.parallelize(range(10))# 按照模2结果来分组res = rdd.groupBy(lambda x: x % 2).collect()[(k, sorted(v)) for k,v in res]
Out[17]:
[(0, [0, 2, 4, 6, 8]), (1, [1, 3, 5, 7, 9])]
In [18]:
# 按照模3结果来分组res = rdd.groupBy(lambda x: x % 3).collect()[(k, sorted(v)) for k,v in res]
Out[18]:
[(0, [0, 3, 6, 9]), (1, [1, 4, 7]), (2, [2, 5, 8])]
 

groupByKey

数据按照key来分区,数据要求为key value格式。

返回的为key value格式,value为分组后的数据。

In [19]:
rdd = sc.parallelize([('a', 1), ('a',1), ('b',1), ('b',1), ('b',1), ('c',1),('c',1),('c',1)])rdd.collect()
Out[19]:
[('a', 1), ('a', 1), ('b', 1), ('b', 1), ('b', 1), ('c', 1), ('c', 1), ('c', 1)]
In [20]:
# 分组后求值的和rdd.groupByKey().mapValues(sum).collect()
Out[20]:
[('b', 3), ('c', 3), ('a', 2)]
 

reduce

与python中的reduce功能一样,对iter对象的元素前两个元素进行函数操作,得到的结果与第三个元素再进行函数操作,

以此类推,直到最后一个元素。

In [21]:
from operator import add# 累计求和sc.parallelize([1, 2, 3, 4, 5]).reduce(add)
Out[21]:
15
 

reduceByKey

功能与reduce函数一样。不过输入的数据为key value格式,按照key分组进行函数操作。

在WordCount例子中,使用reduceByKey来统计单词的个数。 链接:

In [22]:
rdd = sc.parallelize([('a', 1), ('a',1), ('b',1), ('b',1), ('b',1), ('c',1),('c',1),('c',1)])rdd.reduceByKey(add).collect()
Out[22]:
[('b', 3), ('c', 3), ('a', 2)]
 

aggregate()

函数原型:aggregate(zeroValue, seqOp, combOp)

aggregate函数操作比较复杂,有两个函数。seqOp函数是对每个分区的元素与zoroValue进行计算,

然后由combOp函数对所有分区的结果进行计算。

将初始值和第一个分区中的第一个元素传递给seq函数进行计算,然后将计算结果和第二个元素传递给seq函数,直到计算到最后一个值。

第二个分区中也是同理操作。

最后将初始值、所有分区的结果经过combine函数进行计算(先将前两个结果进行计算,将返回结果和下一个结果传给combine函数,以此类推),并返回最终结果。

In [23]:
data = sc.parallelize((1,2,3,4,5,6),2)# 如果使用jupyter,打印结果在jupyter页面可以看到def seq(a,b):    print('seqOp:'+str(a)+"\t"+str(b))    return min(a,b)def combine(a,b):    print('comOp:'+str(a)+"\t"+str(b))    return a+b# 例子解析:# 先在每个分区中元素中两两操作,找最小的元素。# 计算完成后,由combine函数计算两两分区结果的和# 计算步骤1:初始值为3,与分区[1,2,3]元素一一进行seq最小值运算,得到结果为 1# 计算步骤2:初始值为3,与分区[4,5,6]元素一一进行seq最小值运算,得到结果为 3# 计算步骤3:初始值为3,与分区结果1,分区结果3进行combine相加运算,得到结果为 3 + 1 + 3, 结果为7 print(data.glom().collect())data.aggregate(3,seq, combine)
 
[[1, 2, 3], [4, 5, 6]]comOp:3	1comOp:4	3
Out[23]:
7
 

aggregateByKey()

函数原型 aggregateByKey(zeroValue, seqFunc, combFunc, numPartitions=None, partitionFunc=<function portable_hash>)

在kv对的RDD中,按key将value进行分组合并,合并时,将每个value和初始值作为seq函数的参数,进行计算,

返回的结果作为一个新的kv对,然后再将结果按照key进行合并,

最后将每个分组的value传递给combine函数进行计算(先将前两个value进行计算,将返回结果和下一个value传给combine函数,以此类推),

将key与计算结果作为一个新的kv对输出

In [24]:
data = sc.parallelize([(1,3),(1,2),(1,4),(2,3)])def seqFunc(a, b):    print("seqFunc:%s,%s" %(a,b))    return max(a, b) #取最大值def combFunc(a, b):    print("combFunc:%s,%s" %(a ,b))    return a + b #累加起来# aggregateByKey这个算子内部有分组# 这里numPartitions值为4,将数据分为四个区,seqFunc计算结果为 (1,3),(1,3),(1,4)和(2,3)# 然后按照key分组进行comFunc计算,得到结果[(1, 10), (2,3)]data.aggregateByKey(3, seqFunc, combFunc, 4).collect()
Out[24]:
[(1, 10), (2, 3)]
 

sortBy

sortBy(keyfunc, ascending=True, numPartitions=None)

对集合元素排序,根据给定的函数进行排序操作

In [25]:
tmp = [('a', 1), ('b', 2), ('1', 3), ('d', 4), ('2', 5)]x0 = sc.parallelize(tmp).sortBy(lambda x: x[0]).collect()x1 = sc.parallelize(tmp).sortBy(lambda x: x[1]).collect()print(x0)print(x1)
 
[('1', 3), ('2', 5), ('a', 1), ('b', 2), ('d', 4)][('a', 1), ('b', 2), ('1', 3), ('d', 4), ('2', 5)]
 

sortByKey

函数原型 sortByKey(ascending=True, numPartitions=None, keyfunc=<function RDD.>)

根据key进行排序,输入的数据必须为key value格式

In [26]:
tmp = [('a', 1), ('b', 2), ('1', 3), ('d', 4), ('2', 5)]sc.parallelize(tmp).sortByKey().collect()
Out[26]:
[('1', 3), ('2', 5), ('a', 1), ('b', 2), ('d', 4)]
 

join()

两个数据集按照key内连接,返回数据集中有相同key的元素组成的新的数据集,

数据集A中的元素与数据集B中相同key元素一一组合,生成新的数据集

格式为(key, (value1, value2))

In [27]:
x = sc.parallelize([("a", 1), ("b", 4)])y = sc.parallelize([("a", 2), ("a", 3)])x.join(y).collect()
Out[27]:
[('a', (1, 2)), ('a', (1, 3))]
 

cogroup()

将多个RDD中同一个Key对应的Value组合到一起。如果RDD中没有对应的key,则会生成一个空值

In [28]:
x = sc.parallelize([("a", 1), ("b", 4)])y = sc.parallelize([("a", 2)])
In [29]:
for k, v in x.cogroup(y).collect():    print(k, tuple(map(list, v)))
 
a ([1], [2])b ([4], [])
 

cartesian()

笛卡尔集,是通过两个数据集计算而成的

数据集a的每个元素与数据集b的每个元素组合,形成新的数据对数据集

In [30]:
rdd = sc.parallelize([1, 2])rdd2 = sc.parallelize([3, 4, 5])rdd.cartesian(rdd2).collect()
Out[30]:
[(1, 3), (1, 4), (1, 5), (2, 3), (2, 4), (2, 5)]
 

coalesce()

按照给定的数量重新分区数据集

In [31]:
old = sc.parallelize([1, 2, 3, 4, 5], 3).glom().collect()print(old)sc.parallelize([1, 2, 3, 4, 5], 3).coalesce(1).glom().collect()
 
[[1], [2, 3], [4, 5]]
Out[31]:
[[1, 2, 3, 4, 5]]
In [32]:
sc.parallelize([1, 2, 3, 4, 5], 3).coalesce(2).glom().collect()
Out[32]:
[[1], [2, 3, 4, 5]]
 

pipe

对rdd数据集的每个元素,调用外部程序

In [33]:
# cat 文件内容# 先在目录下创建一个for_test.txt文件,然后来读取文件内容sc.parallelize(['1', '2', '', '3']).pipe('cat for_test.txt').collect()
Out[33]:
['for pyspark function test', 'for pyspark function test', 'for pyspark function test', 'for pyspark function test']
 

repartition

数据集重新分区

In [34]:
# 创建默认为四个分区的数据集rdd = sc.parallelize([1,2,3,4,5,6,7], 4)print(rdd.glom().collect())# 重新分为两个分区rdd.repartition(2).glom().collect()
 
[[1], [2, 3], [4, 5], [6, 7]]
Out[34]:
[[1, 2, 3, 4, 5, 6, 7], []]
 

rePartitionAndSortWithinPartitions()

根据给定的分区程序对RDD进行重新分区,并在每个生成的分区内按键对记录进行排序。

这比调用重新分区,然后在每个分区内进行排序更有效率,因为它可以将排序压入洗牌机器。

应用场景:

  • 如果需要重分区,并且想要对分区中的数据进行升序排序。
  • 提高性能,替换repartition和sortBy
In [35]:
rdd = sc.parallelize([(0, 5), (3, 8), (2, 6), (0, 8), (3, 8), (1, 3)])rdd2 = rdd.repartitionAndSortWithinPartitions(2, lambda x: x % 2, True)rdd2.glom().collect()
Out[35]:
[[(0, 5), (0, 8), (2, 6)], [(1, 3), (3, 8), (3, 8)]]

转载于:https://www.cnblogs.com/StitchSun/p/10656246.html

你可能感兴趣的文章
分享一个shell脚本:通过Jumper机器来创建Jumper和target机器账号
查看>>
在QT添加QWebView是出现的问题
查看>>
C++之ListNode结构
查看>>
UITableViewCell分割线不是左对齐的问题
查看>>
CentOS7 编译安装PHP7
查看>>
汇编语言简介(二)
查看>>
MySQL常见错误代码及代码说明
查看>>
我的友情链接
查看>>
Python定时返回网址状态码&&网页内容是否为json数据
查看>>
Cglib动态代理基础使用
查看>>
在windows powershell中利用脚本自动安装和配置DHCP服务
查看>>
logback使用
查看>>
我的友情链接
查看>>
我的友情链接
查看>>
【翻译】Chrome DevTools: JavaScript CPU Profiling in Chrome 58
查看>>
REX(Register EXtension) 前缀
查看>>
Spring注解@Async和@Transactional失效问题究竟是什么原因,强势解释一波
查看>>
zabbix安装详解(2)
查看>>
产生n对括号的解法
查看>>
Windows Server 2012 集群打开集群管理器查看角色失败
查看>>