概述
pyspark下的子模块主要有:
pyspark.sql
:关于SQL和DataFrames的模块pyspark.streaming
:流式计算模块pyspark.ml
:基于DataFrame的机器学习模块
pyspark的初始代码一般为:
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.master("local") \
.appName("Demo") \
.getOrCreate()
sc = spark.sparkContext # 开启一个spark上下文会话
SparkContext是spark功能的主要入口,在代码中常表示为sc
。sc
对象的常用方法:
parallelize
:以一个本地Python集合生成一个RDD对象,推荐使用range
RDD
pyspark中的基础数据结构为弹性分布式数据集(Resilient Distributed Dataset),RDD具有如下下特性:
- In-memory Computation:在内存中进行计算
- Lazy Evaluation:使用DAG保存操作,只在必要时才会做计算
- Immutability:RDD是Read-Only的
- Cacheable or Persistence:可存放在内存或硬盘中
- Partitioned:数据分布式存储在各节点中
- Fault Tolerance:分布式产生了容错性
- Coarse-grained Operations:RDD的操作是粗粒度(一批一批)的,并不是元素级的操作
使用序列数据构造一个rdd并查看前5个元素:
rdd = sc.parallelize(range(10))
rdd.take(5)
[0, 1, 2, 3, 4]
应用Python中的同名高阶函数:
rdd.map(lambda x: x*x).take(5)
[0, 1, 4, 9, 16]
rdd.filter(lambda x: x > 5).take(5)
[6, 7, 8, 9]
rdd.reduce(lambda x, y: x+y)
45
使用RDD的collect()
方法可以将RDD转成Python数据类型:
rdd.collect()
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
RDD常用方法 | |||||
---|---|---|---|---|---|
collect | count | distinct | filter | first | flatMap |
groupBy | join | map | max | mean | min |
randomSplit | reduce | sample | sortBy | stats | stdev |
sum | take | variance |
DataFrame
pyspark另一种常用数据结构是DataFrame,而DF又可分为Row和Column。
读入文件生成DF的示例代码为:
df = spark.read.csv('/home/hujinzhi/PySpark/dataset/train.csv', header=None, inferSchema=True)
只选择指定列,并显示前3行:
cols = ['Time', 'RoomArea', 'RoomDir', 'Bedroom',
'Livingroom', 'Rental'] # 只选取部分列做演示
df = df.select(cols)
df.show(3)
+----+-----------+-------+-------+----------+-----------+ \
|Time| RoomArea|RoomDir|Bedroom|Livingroom| Rental| \
+----+-----------+-------+-------+----------+-----------+ \
| 2|0.020854022| WS| 3| 2|3.904923599| \
| 3|0.010923535| ES| 2| 1|2.546689304| \
| 3|0.010923535| ES| 2| 1|2.546689304| \
+----+-----------+-------+-------+----------+-----------+ \
显示DF的列信息:
df.printSchema()
root
|-- Time: integer (nullable = true)
|-- RoomArea: double (nullable = true)
|-- RoomDir: string (nullable = true)
|-- Bedroom: integer (nullable = true)
|-- Livingroom: integer (nullable = true)
|-- Rental: double (nullable = true)
对DF中的条目进行计数:
df.count() # 对行计数
196539
输出描述性统计信息:
df.select(['Time','RoomArea','Bedroom','Rental']).summary().show() # 描述统计信息
+-------+------------------+--------------------+------------------+-----------------+
|summary| Time| RoomArea| Bedroom| Rental|
+-------+------------------+--------------------+------------------+-----------------+
| count| 196539| 196539| 196539| 196539|
| mean|2.1152290385114405|0.013138849743341008| 2.236634968123375|7.949313378405461|
| stddev|0.7869801628627767|0.008103513291823544|0.8969612494208798|6.310608757211932|
| min| 1| 0.0| 0| 0.0|
| 25%| 1| 0.009268454| 2| 4.923599321|
| 50%| 2| 0.012909633| 2| 6.62139219|
| 75%| 3| 0.01489573| 3| 8.998302207|
| max| 3| 1.0| 11| 100.0|
+-------+------------------+--------------------+------------------+-----------------+
条件筛选:
df.where((df.RoomArea > 0.3) &
(df.Time == 3)).select('Time', 'RoomArea', 'RoomDir', 'Rental').show(3) # 条件筛选
+----+-----------+-------+-----------+
|Time| RoomArea|RoomDir| Rental|
+----+-----------+-------+-----------+
| 3|0.330354187| W|5.602716469|
| 3|0.490897054| S|8.896434635|
| 3|0.490897054| S|8.896434635|
+----+-----------+-------+-----------+
DF常用方法 | |||||
---|---|---|---|---|---|
collect | columns | count | describe | distinct | drop |
dropDuplicates | dropna | fillna | foreach | groupBy | head |
join | orderBy | printSchema | randomSplit | select | show |
sort | take | where | withColumn | withColumnRenamed |
SQL
pyspark同样还支持执行SQL语句去访问SQL数据结构。为了模拟从SQL表中读取数据,将一个DF转成一个临时表来做演示:
df.registerTempTable('TMP')
使用spark.sql()
方法来执行SQL语句,注意返回的是DF数据结构。
spark.sql('SELECT * FROM TMP LIMIT 5').show()
spark.sql('SELECT MIN(Rental) FROM TMP').show() # 查找Rental的最小值
# 查找最小房屋面积对应的样本
spark.sql('SELECT * FROM TMP WHERE RoomArea IN (SELECT MIN(RoomArea) FROM TMP)').show()
值得特别一提的是,在zeppelin环境中,当直接使用sql解释器提取数据时,zeppelin会默认提供一个可视化组件。
%sql
SELECT Bedroom,count(1) FROM TMP GROUP BY Bedroom
在可视化组件的settings
中设置好keys
与values
,部分输出如下所示: