Python.d14
Spark
Apache Spark是用于大规模数据处理的统一分析引擎。
简单来说,Spark是一款分布式的计算框架,用于调度成千上百的服务器集群,计算TB、PB乃至EB级别的海量数据
Python On Spark
Spark作为全球顶级的分布式计算框架,支持众多的编程语言进行开发。而Python语言,则是Sprak重点支持的方向。
PySpark
- 作为Python库进行数据处理
- 提交至Sprak集群进行分布式集群计算
PySpark安装
1 | pip install pyspark |
PySpark入门
构建PySpark执行环境入口对象
使用PySpark库完成数据处理,首先需要构建一个执行环境入口对象。PySpark的执行环境入口对象是:类SprakContext:的类对象
示例:
1 | from pyspark import SprakConf,SparkContext |
PySpark的编程模型
SprakContext类对象,是PySpark编程中一切功能的入口。
PySpark的编程,主要分为以下三大步骤:
- 数据输入:通过SparkContext类对象成员方法完成数据的读取操作,读取后得到RDD类对象
- 数据处理计算:通过RDD类对象的成员方法完成各种数据计算的需求
- 数据输出:将处理完成后的RDD对象调用各种成员方法完成写出文件、转化为list等操作
数据输入
RDD对象
RDD对象全称为:弹性分布式数据集(Resilient Distributed Datasets)
PySpark针对数据的处理,都是以RDD对象作为载体,即:
- 数据存储在RDD内
- 各类数据的计算方法,也都是RDD成员方法
- RDD的数据计算方法,返回值依旧是RDD对象
Python数据容器转RDD对象
PySpark支持通过SparkContext对象的parallelize成员方法,将:
- list
- tuple
- set
- dict
- str
转化为PySpark的RDD对象
字符串会被拆分出一个个字符,存入RDD对象
字典仅有的key会被存入RDD对象
示例:
1 | from pyspark import SprakConf,SparkContext |
读取文件数据到Spark
示例:
1 | from pyspark import SprakConf,SparkContext |
数据计算
map方法
将RDD的数据一条条处理(处理逻辑基于map算子中接受的处理函数),返回新的RDD对象
语法:
1 | rdd.map(func) |
func: f:(T) -> U
f: 表示这是一个函数(方法)
(T) -> U 表示的是方法的定义:
() 表示传入参数,(T) 表示传入1个参数,() 表示没有传入参数
T 是泛型的代称,在这里表示 任意类型
U 也是泛型代称,在这里表示 任意类型
-> U 表示返回值
(T) -> U 总结起来的意思是:这是一个方法,这个方法接受一个参数传入,传入类型不限,返回一个返回值,返回值类型不限。
(A) -> A 总结起来的意思是:这是一个方法,这个方法接受一个参数传入,传入类型不限,返回一个返回值,返回值和传入参数类型一致。
示例:
1 | from pyspark import SprakConf,SparkContext |
flatMap方法
功能:对rdd执行map操作,然后解除嵌套
操作
范例:
1 | # 嵌套的list |
flatMap使用示例:
1 | rdd = sc.parallelize(["a b c", "d, f, g"]) |
reduceByKey方法
功能:针对KV型 RDD,自动安装key分组,然后根据你提供的聚合逻辑,完成组内数据(value) 的聚合操作
reduceByKey中的聚合逻辑:
比如,有[1, 2, 3, 4, 5],聚合函数是lambda a, b: a + b
- a = 1, b = 2
- a = a + b, b = 3
- a = a + b, b = 4
- a = a + b, b = 5
用法:
1 | rdd.reduceByKey(func) |
func: (V, V) -> V
接受2个传入参数(类型一致),返回一个返回值,类型和传入要求一致
示例代码:
1 | rdd = sc.parallelize(("a", 1), ("a", 1), ("b", 1), ("b", 1), ("b", 1)) |
reduceByKey中接受的函数,只负责聚合,不理会分组
分组是自动by key
来分组的
Filter
功能:过滤想要的数据
语法:
1 | rdd.filter(func) |
func: (T) -> bool 传入一个任意类型的参数,返回 True or False
示例代码:
1 | from pyspark import SprakConf,SparkContext |
distinct方法
功能:对RDD数据进行去重,返回新的rdd
语法:
1 | rdd.distinct() |
distinct()无需传入参数
sortBy方法
功能:对RDD数据进行排序,基于你指定的排序依据。
语法:
1 | rdd.sortBy(func,ascending = False, numPartitions = 1) |
func: (T) -> U: 告知rdd中的哪个数据进行排序,比如lambda x: x[1]
表示安装rdd中的第二列元素进行排序
ascending True:升序 False:降序
numPartitions:用多少分区排序
数据输出
输出为Python对象
collect方法
功能:将RDD各个分区内的数据,统一收集到drive,形成一个list对象
用法:
1 | rdd.collect() |
返回一个list
reduce方法
功能:对RDD数据集按照你传入的逻辑进行聚合
语法:
1 | rdd.reduce(func) |
func (T, T) -> T
2 个参数传入 1 个返回值,返回值和参数类型一致
示例代码:
1 | rdd = sc.parallelize(range(10)) |
聚合逻辑和reduceByKey相同,但是不给予key值
take方法
功能:取RDD的前N个元素,组合成list返回
用法示例:
1 | sc.parallelize([3, 2, 1, 4, 5, 6]).take(5) |
输出结果为 [3, 2, 1, 4, 5]
count方法
功能:计算RDD有多少条数据,返回值是一个数字
用法示例:
1 | sc.parallelize([3, 2, 1, 4, 5, 6]).count() |
输出为6
输出到文件中
saveAsTextFile
功能:将RDD的数据写入到文本文件中,支持本地写出,hdfs等文件系统
示例代码:
1 | rdd = sc.parallelize([1, 2, 3, 4, 5]) |
调用保存文件的算子,需要配置Hadoop依赖
- 下载Hadoop 安装
- 解压到任意位置
- 在Python代码中使用
os
模块配置:os.environ['HADOOP_HOME'] = 'HADOOP解压路径'
修改rdd分区为1个
方式1,SparkConf对象设置属性全局并行度为1:
1 | from pyspark import SprakConf,SparkContext |
方式2,创建RDD的时候设置(parallelize方法传入numSlices参数为1):
1 | rdd = sc.parallelize([1, 2, 3, 4, 5], numSlices = 1) |