Spark

Apache Spark是用于大规模数据处理的统一分析引擎
简单来说,Spark是一款分布式的计算框架,用于调度成千上百的服务器集群,计算TB、PB乃至EB级别的海量数据

Python On Spark

Spark作为全球顶级的分布式计算框架,支持众多的编程语言进行开发。而Python语言,则是Sprak重点支持的方向。

PySpark

  • 作为Python库进行数据处理
  • 提交至Sprak集群进行分布式集群计算

PySpark安装

1
pip install pyspark

PySpark入门

构建PySpark执行环境入口对象
使用PySpark库完成数据处理,首先需要构建一个执行环境入口对象。PySpark的执行环境入口对象是:类SprakContext:的类对象
示例:

1
2
3
4
5
from pyspark import SprakConf,SparkContext
conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app")
sc = SparkContext(conf=conf)
print(sc.version)
sc.stop()

PySpark的编程模型
SprakContext类对象,是PySpark编程中一切功能的入口。
PySpark的编程,主要分为以下三大步骤:

  1. 数据输入:通过SparkContext类对象成员方法完成数据的读取操作,读取后得到RDD类对象
  2. 数据处理计算:通过RDD类对象的成员方法完成各种数据计算的需求
  3. 数据输出:将处理完成后的RDD对象调用各种成员方法完成写出文件、转化为list等操作

数据输入

RDD对象
RDD对象全称为:弹性分布式数据集(Resilient Distributed Datasets)
PySpark针对数据的处理,都是以RDD对象作为载体,即:

  • 数据存储在RDD内
  • 各类数据的计算方法,也都是RDD成员方法
  • RDD的数据计算方法,返回值依旧是RDD对象

Python数据容器转RDD对象
PySpark支持通过SparkContext对象的parallelize成员方法,将:

  • list
  • tuple
  • set
  • dict
  • str
    转化为PySpark的RDD对象

字符串会被拆分出一个个字符,存入RDD对象
字典仅有的key会被存入RDD对象

示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
from pyspark import SprakConf,SparkContext
conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app")
sc = SparkContext(conf=conf)

rdd1 = sc.parallelize([1, 2, 3, 4, 5])
rdd2 = sc.parallelize((1, 2, 3, 4, 5))
rdd3 = sc.parallelize("abcdefg")
rdd4 = sc.parallelize({1, 2, 3, 4, 5})
rdd5 = sc.parallelize({"key1": "value1", "key2": "value2"})

# 查看RDD里面内容
print(rdd1.collect())
print(rdd2.collect())
print(rdd3.collect())
print(rdd4.collect())
print(rdd5.collect())

sc.stop()

读取文件数据到Spark
示例:

1
2
3
4
5
6
7
8
from pyspark import SprakConf,SparkContext
conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app")
sc = SparkContext(conf=conf)

rdd = sc.textFile("文件路径(如D:/hello.txt)")
print(rdd.collect)

sc.stop()

数据计算

map方法
将RDD的数据一条条处理(处理逻辑基于map算子中接受的处理函数),返回新的RDD对象

语法:

1
rdd.map(func)

func: f:(T) -> U
f: 表示这是一个函数(方法)
(T) -> U 表示的是方法的定义:
() 表示传入参数,(T) 表示传入1个参数,() 表示没有传入参数
T 是泛型的代称,在这里表示 任意类型
U 也是泛型代称,在这里表示 任意类型
-> U 表示返回值
(T) -> U 总结起来的意思是:这是一个方法,这个方法接受一个参数传入,传入类型不限,返回一个返回值,返回值类型不限。
(A) -> A 总结起来的意思是:这是一个方法,这个方法接受一个参数传入,传入类型不限,返回一个返回值,返回值和传入参数类型一致。

示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
from pyspark import SprakConf,SparkContext
conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app")
sc = SparkContext(conf=conf)

rdd = sc.parallelize([1, 2, 3, 4, 5])

def func(data):
return data * 10

rdd2map = rdd.map(func)
# rdd2map = rdd.map(lambda x: x*10) 可实现相同效果

print(rdd2map.collect())

flatMap方法
功能:对rdd执行map操作,然后解除嵌套操作

范例:

1
2
3
4
5
# 嵌套的list
lst = [[1,2,3], [4, 5, 6]]

# 解除嵌套的list
lst = [1, 2, 3, 4, 5, 6]

flatMap使用示例:

1
2
3
4
rdd = sc.parallelize(["a b c", "d, f, g"])

# 按空格切分数据后,解除嵌套
print(rdd.flatMap(lambda x: x.split(" ")).collect())

reduceByKey方法
功能:针对KV型 RDD,自动安装key分组,然后根据你提供的聚合逻辑,完成组内数据(value) 的聚合操作

reduceByKey中的聚合逻辑:
比如,有[1, 2, 3, 4, 5],聚合函数是lambda a, b: a + b

  1. a = 1, b = 2
  2. a = a + b, b = 3
  3. a = a + b, b = 4
  4. a = a + b, b = 5

用法:

1
rdd.reduceByKey(func)

func: (V, V) -> V
接受2个传入参数(类型一致),返回一个返回值,类型和传入要求一致

示例代码:

1
2
3
rdd = sc.parallelize(("a", 1), ("a", 1), ("b", 1), ("b", 1), ("b", 1))
result = rdd.reduceByKey(lambda a, b: a + b)
print(result.collect())

reduceByKey中接受的函数,只负责聚合,不理会分组
分组是自动by key来分组的

Filter
功能:过滤想要的数据
语法:

1
rdd.filter(func)

func: (T) -> bool 传入一个任意类型的参数,返回 True or False

示例代码:

1
2
3
4
5
6
7
8
from pyspark import SprakConf,SparkContext
if __name__ == '__main__':
conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app")
sc = SparkContext(conf=conf)

rdd = sc.parallelize([1, 2, 3, 4, 5])

print(rdd.filter(lambda x: x % 2 ==1).collect())

distinct方法
功能:对RDD数据进行去重,返回新的rdd
语法:

1
rdd.distinct()

distinct()无需传入参数

sortBy方法
功能:对RDD数据进行排序,基于你指定的排序依据。
语法:

1
rdd.sortBy(func,ascending = False, numPartitions = 1)

func: (T) -> U: 告知rdd中的哪个数据进行排序,比如lambda x: x[1] 表示安装rdd中的第二列元素进行排序
ascending True:升序 False:降序
numPartitions:用多少分区排序

数据输出

输出为Python对象

collect方法
功能:将RDD各个分区内的数据,统一收集到drive,形成一个list对象
用法:

1
rdd.collect()

返回一个list

reduce方法
功能:对RDD数据集按照你传入的逻辑进行聚合
语法:

1
rdd.reduce(func)

func (T, T) -> T
2 个参数传入 1 个返回值,返回值和参数类型一致

示例代码:

1
2
3
rdd = sc.parallelize(range(10))
# 将rdd的数据进行累加求和
print(rdd.reduce(lambda a, b: a + b))

聚合逻辑和reduceByKey相同,但是不给予key值

take方法
功能:取RDD的前N个元素,组合成list返回
用法示例:

1
sc.parallelize([3, 2, 1, 4, 5, 6]).take(5)

输出结果为 [3, 2, 1, 4, 5]

count方法
功能:计算RDD有多少条数据,返回值是一个数字
用法示例:

1
sc.parallelize([3, 2, 1, 4, 5, 6]).count()

输出为6

输出到文件中

saveAsTextFile
功能:将RDD的数据写入到文本文件中,支持本地写出,hdfs等文件系统
示例代码:

1
2
rdd = sc.parallelize([1, 2, 3, 4, 5])
rdd.saveAsTextFile(../data/output/test.txt)

调用保存文件的算子,需要配置Hadoop依赖

  • 下载Hadoop 安装
  • 解压到任意位置
  • 在Python代码中使用os模块配置:os.environ['HADOOP_HOME'] = 'HADOOP解压路径'

修改rdd分区为1个
方式1,SparkConf对象设置属性全局并行度为1:

1
2
3
4
from pyspark import SprakConf,SparkContext
conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app")
conf.set("spark.default.parallelism", "1")
sc = SparkContext(conf=conf)

方式2,创建RDD的时候设置(parallelize方法传入numSlices参数为1):

1
2
rdd = sc.parallelize([1, 2, 3, 4, 5], numSlices = 1)
rdd = sc.parallelize([1, 2, 3, 4, 5], 1)