PySpark
学习自毕设(2022.5.25-2022.6.3左右吧),之前大三下学期学过pyspark这门课,毕设重新学习一遍,课程参考是黑马的https://www.bilibili.com/video/BV1Jq4y1z7VP
执行
启动
# 切换到Anaconda的pyspark虚拟环境(3台)
conda activate pyspark
# 启动hadoop hdfs yarn historyserver(node1)
/onekey/hd1_start.sh
# 提交spark on yarn模式(node1)
spark-submit --master yarn /export/server/spark/examples/src/main/python/pi.py
# 关闭hadoop
/onekey/hd2_stop.sh
# 关机
shutdown -h now
基于zk的HA
# 需要先启动zk
/onekey/zk1_start.sh
# 启动spark
/export/server/spark-3.1.2-bin-hadoop3.2/sbin/start-history-server.sh
/export/server/spark-3.1.2-bin-hadoop3.2/sbin/start-all.sh
# 提交pyspark代码
spark-submit --master spark://node1:7077 /export/server/spark/examples/src/main/python/pi.py
# 关闭spark
/export/server/spark-3.1.2-bin-hadoop3.2/sbin/stop-history-server.sh
/export/server/spark-3.1.2-bin-hadoop3.2/sbin/stop-all.sh
/onekey/hd1_start.sh
conda activate pyspark
pyspark --master yarn
spark-submit --master local[*] /export/server/spark/examples/src/main/python/pi.py
./pyspark --master spark://node1:7077
sc.parallelize([1,2,3,4,5]).map(lambda x:x*10).collect()
创建pyspark环境
conda create -n pyspark python=3.8
HDFS集群:http://node1:9870/
YARN集群:http://node1:8088/
historyserver:http://node1:19888/
4040
8080
18080
环境
三台linux集群centos7
hadoop 的hdfs
spark HA on zk
spark on zk 和yarn的区别
我们已经有yarn的情况下,复用yarn调度显得更方便
重新部署zk模式的ha模式,会重复消耗资源,两种调度也会相互争抢产生影响
所以spark on yarn一般更适合已有hadoop yarn的集群
提交到yarn执行
# coding:utf8
# linux环境
import os
import py
os.environ['JAVA_HOME'] = '/export/server/jdk' # 这个改成你自己的jdk安装路径
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/envs/pyspark/bin/python3.8'
os.environ['HADOOP_CONF_DIR'] = '/export/server/hadoop/etc/hadoop'
from pyspark import *
if __name__ == '__main__':
conf = SparkConf().setMaster("yarn").setAppName("test")
# 依赖文件的设置,如果不设置的话会报错,原因就是集群找不到本地的.py文件,必须要把.py文件上传到集群
conf.set("spark.submit.pyFiles", "py.py")
sc = SparkContext(conf=conf)
# 读取hdfs对象
file_rdd = sc.textFile("hdfs://node1:8020/input/words.txt")
print(file_rdd.collect())
# 输出 ['hello spark', 'hello hadoop', 'hello flink']
word_rdd = file_rdd.flatMap(lambda line: line.split(" ")) # 通过空格切分
word_with_one_rdd = word_rdd.map(lambda x: (x, 1)) # 把每个值换成(hadoop,1)的元组形式
result_rdd = word_with_one_rdd.reduceByKey(lambda a, b: a + b) # 相加规则
print(result_rdd.collect())
# 输出 [('hadoop', 1), ('hello', 3), ('spark', 1), ('flink', 1)]
print(py.hello())
py.py的内容
def hello():
return "hello"
linux下submit方式提交
spark-submit --master yarn --py-files ./py.py ./test.py
pyspark的架构和运行
driver端由jvm执行,executor端由jvm做命令转发,底层由python解释器进行工作
集群运行代码,分布式运行,
spark,非任务处理由driver执行(非RDD)
任务处理由executor执行,executor可以有很多,所以任务的计算是分布式多台并行的
spark core (RDD)
rdd是一个数据集,用于存放对象,
rdd是分布式的,用于分布式计算
rdd中的数据可以存储在内存或磁盘中
1、rdd的5大特性
1、rdd有分区
2、rdd的方法会作用到rdd的每个分区
3、rdd之间有依赖关系
4、key-value型的rdd可以有分区器,默认hash规则分区
5、rdd的分区划分,会尽量靠近数据所在的服务器
2、wordcount代码分析
# coding:utf8
# linux环境
import os
os.environ['JAVA_HOME'] = '/export/server/jdk' # 这个改成你自己的jdk安装路径
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/envs/pyspark/bin/python3.8'
from pyspark import *
if __name__ == '__main__':
# 分布式
# conf = SparkConf().setAppName("WordCount")
# 本机
conf = SparkConf().setMaster("local[*]").setAppName("test")
sc = SparkContext(conf=conf)
# 读取hdfs对象
file_rdd = sc.textFile("hdfs://node1:8020/input/words.txt")
print(file_rdd.collect())
# 输出 ['hello spark', 'hello hadoop', 'hello flink']
word_rdd = file_rdd.flatMap(lambda line: line.split(" ")) # 通过空格切分
word_with_one_rdd = word_rdd.map(lambda x: (x, 1)) # 把每个值换成(hadoop,1)的元组形式
result_rdd = word_with_one_rdd.reduceByKey(lambda a, b: a + b) # 相加规则
print(result_rdd.collect())
# 输出 [('hadoop', 1), ('hello', 3), ('spark', 1), ('flink', 1)]
3、rdd的创建
SparkContext(conf=conf)是spark的入口
手动创建rdd
# coding:utf8
from pyspark import *
if __name__ == '__main__':
conf = SparkConf().setMaster("local[*]").setAppName("WordCount")
sc = SparkContext(conf=conf)
rdd = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9])
print("默认分区数", rdd.getNumPartitions())
# 输出8,电脑是8核cpu处理器,所以默认是8个分区
rdd1 = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9], 3)
print("指定分区数", rdd1.getNumPartitions())
# 指定分区数后,则输出3
读取文件
from pyspark import *
if __name__ == '__main__':
# 分布式
# conf = SparkConf().setAppName("WordCount")
# 本机
conf = SparkConf().setMaster("local[*]").setAppName("test")
sc = SparkContext(conf=conf)
# 读取本地对象
# file_rdd = sc.textFile("data/input/words.txt", 3) # 可以加参数指定最小分区数,仅供参考,spark有自己的判断
# print("分区数", file_rdd.getNumPartitions())
# 读取hdfs对象
file_rdd = sc.textFile("hdfs://node1:8020/input/words.txt", 3)
print(file_rdd.collect())
小文件读取可以选择wholeTextFiles,适用于大量小文件
rdd = sc.wholeTextFiles("data/input/tiny_files") # 很多小文件
print(rdd.collect())
print(rdd.map(lambda x:x[1]).collect())
4、rdd算子
rdd的分布式api,也就是分布式的方法
transformation算子(转换算子) 懒加载,没有action的话不执行
rdd->rdd,返回值依然是rdd
action算子(执行计划的开关)
返回值不是rdd
5、常用的transformation算子
map
# coding:utf8
# linux环境
import os
os.environ['JAVA_HOME'] = '/export/server/jdk' # 这个改成你自己的jdk安装路径
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/envs/pyspark/bin/python3.8'
from pyspark import *
if __name__ == '__main__':
# 分布式
# conf = SparkConf().setAppName("WordCount")
# 本机
conf = SparkConf().setMaster("local[*]").setAppName("test")
sc = SparkContext(conf=conf)
rdd = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9], 3)
# 1.定义方法
def add(data):
return data * 10
print(rdd.map(add).collect())
# 2.lambda表达式写匿名函数(适用于一行代码搞定返回的函数,如果函数处理复杂,还是要编写独立函数体实现)
print(rdd.map(lambda x: x*10).collect())
mapValues(map的只处理value)
和map类似,只处理value
rdd = sc.parallelize([('a', 1), ('a', 2), ('b', 3), ('b', 4)])
print(rdd.map(lambda x: (x[0], x[1] * 10)).collect())
print(rdd.mapValues(lambda x: x * 10).collect())
# 输出都是 [('a', 10), ('a', 20), ('b', 30), ('b', 40)]
flatMap (合并多个数组为一个)
rdd = sc.parallelize(["ydd lyy lxw", "ydd ydd ljj", "lxw lyy ydd"])
print(rdd.map(lambda x: x.split(" ")).collect())
# 输出 [['ydd', 'lyy', 'lxw'], ['ydd', 'ydd', 'ljj'], ['lxw', 'lyy', 'ydd']]
print(rdd.flatMap(lambda x: x.split(" ")).collect())
# 输出 ['ydd', 'lyy', 'lxw', 'ydd', 'ydd', 'ljj', 'lxw', 'lyy', 'ydd']
reduceByKey(聚合)
针对k-v的rdd,自动按照key分组,根据聚合逻辑,完成value聚合
lambda两两数字做聚合,lambda a,b:a+b
# 读取hdfs对象
file_rdd = sc.textFile("hdfs://node1:8020/input/words.txt")
print(file_rdd.collect())
# 输出 ['hello spark', 'hello hadoop', 'hello flink']
word_rdd = file_rdd.flatMap(lambda line: line.split(" "))
word_with_one_rdd = word_rdd.map(lambda x: (x, 1))
result_rdd = word_with_one_rdd.reduceByKey(lambda a, b: a + b)
print(result_rdd.collect())
# 输出 [('hadoop', 1), ('hello', 3), ('spark', 1), ('flink', 1)]
groupBy(分组)
rdd = sc.parallelize([('a', 1), ('a', 2), ('b', 3), ('b', 4)])
# groupBy内容:后面的是给定按照哪个元素来分组(hash分组)
result = rdd.groupBy(lambda t: t[0])
print(result.map(lambda t: (t[0], list(t[1]))).collect())
# 输出 [('b', [('b', 3), ('b', 4)]), ('a', [('a', 1), ('a', 2)])]
groupByKey(groupBy的自动按照key分组)
只针对key-value
# groupBy内容:后面的是给定按照哪个元素来分组(hash分组)
# result = rdd.groupBy(lambda t: t[0])
# print(result.map(lambda t: (t[0], list(t[1]))).collect())
# 输出 [('b', [('b', 3), ('b', 4)]), ('a', [('a', 1), ('a', 2)])]
# groupByKey,自动按照key分组(与groupBy输出有区别,注意下)
result = rdd.groupByKey()
print(result.map(lambda t: (t[0], list(t[1]))).collect())
# 输出 [('b', [3, 4]), ('a', [1, 2])]
filter(过滤)
过滤想要的数据保留
rdd = sc.parallelize([1, 2, 3, 4, 5, 6])
# true保留
print(rdd.filter(lambda x: x % 2 == 1).collect())
# 输出 [1, 3, 5]
distinct(去重)
去重
rdd = sc.parallelize([1, 1, 2, 2, 3, 3])
print(rdd.distinct().collect())
# 输出 [2, 1, 3]
union(合并)
合并rdd
rdd1 = sc.parallelize([1, 1, 2, 3])
rdd2 = sc.parallelize(["a", "b", "c"])
# 不会去重,不同类型也能合并
print(rdd1.union(rdd2).collect())
# 输出 [1, 1, 2, 3, 'a', 'b', 'c']
join(sql关联)
sql内外连接,智能用于二元元组,key-value
按照key做join
rdd1 = sc.parallelize([(1, "a"), (2, "b"), (3, "c"), (4, "d")])
rdd2 = sc.parallelize([(1, "1科"), (2, "2科"), (5, "5科")])
print(rdd1.join(rdd2).collect()) # sql内连接
# 输出 [(1, ('a', '1科')), (2, ('b', '2科'))]
print(rdd1.leftOuterJoin(rdd2).collect()) # sql左连接
# 输出 [(4, ('d', None)), (1, ('a', '1科')), (2, ('b', '2科')), (3, ('c', None))]
print(rdd1.rightOuterJoin(rdd2).collect()) # sql右连接
# 输出 [(1, ('a', '1科')), (5, (None, '5科')), (2, ('b', '2科'))]
intersection(交集)
取交集,必须完全相同,与join不同,(join只对key做对比,intersection对key和value都要相同)
rdd1 = sc.parallelize([(1, "a"), (2, "b"), (3, "c"), (4, "d")])
rdd2 = sc.parallelize([(1, "a"), (2, "2科"), (5, "5科")])
print(rdd1.intersection(rdd2).collect())
# 输出 [(1, 'a')]
glom(加入嵌套,按分区)
rdd = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9, 10], 3)
print(rdd.glom().collect())
# 输出 [[1, 2, 3], [4, 5, 6], [7, 8, 9, 10]]
sortBy(排序)
# 第一个参数:告知rdd中按照哪个数据进行排序lambda x:x[1]按照第二个数据排序
# 第二个参数:True升序 False降序
# 第三个参数:用多少个分区排序
"""注意,如果需要全局有序,则排序分区数设置为1,否则分布式只能保证分区内排序"""
rdd = sc.parallelize([("a", 3), ("b", 1), ("c", 4), ("d", 3), ("e", 5), ("f", 9), ("g", 2), ("f", 6)], 3)
print(rdd.sortBy(lambda x: x[1], ascending=True, numPartitions=3).collect())
# [('b', 1), ('g', 2), ('a', 3), ('d', 3), ('c', 4), ('e', 5), ('f', 6), ('f', 9)]
print(rdd.sortBy(lambda x: x[0], ascending=False, numPartitions=3).collect())
# [('g', 2), ('f', 9), ('f', 6), ('e', 5), ('d', 3), ('c', 4), ('b', 1), ('a', 3)]
sortByKey(sortBy按照key处理)
# 第一个参数:True升序 False降序
# 第二个参数:用多少个分区排序
# 第三个参数:对key进行处理
"""注意,如果需要全局有序,则排序分区数设置为1,否则分布式只能保证分区内排序"""
rdd = sc.parallelize([("a", 3), ("B", 1), ("c", 4)], 3)
print(rdd.sortByKey(ascending=True, numPartitions=1, keyfunc=lambda x: str(x).lower()).collect())
# [('a', 3), ('B', 1), ('c', 4)]
6、常用的Action算子
countByKey(相加)
key-value
rdd = sc.textFile("hdfs://node1:8020/input/words.txt")
rdd2 = rdd.flatMap(lambda x: x.split(" ")).map(lambda x: (x, 1))
result = rdd2.countByKey()
# result不是rdd,而是dict
print(result)
# defaultdict(<class 'int'>, {'hello': 3, 'spark': 1, 'hadoop': 1, 'flink': 1})
collect(收集rdd各个分区内的数据,统一收集到driver中,形成一个list对象)
上面几乎所有代码都有collect
reduce(聚合)[区分reduceByKey]
rdd = sc.parallelize([1, 2, 3, 4, 5])
print(rdd.reduce(lambda a, b: a * b))
# 120
fold(和reduce基本相同,可以设置一个默认值 )[基本不用]
first(取出rdd的第一个元素)
rdd = sc.parallelize([1, 2, 3, 4, 5])
print(rdd.first())
# 1
take(取rdd的前n个元素)
rdd = sc.parallelize([1, 2, 3, 4, 5])
print(rdd.take(3))
# [1, 2, 3]
top(rdd降序取前n个)
rdd = sc.parallelize([1, 2, 3, 4, 5])
print(rdd.top(3))
# [5, 4, 3]
count(返回rdd一共有多少条数据)
rdd = sc.parallelize([1, 2, 3, 4, 5])
print(rdd.count())
# 5
takeSample(随机抽取rdd数据)
# 参数一:True表示可以取同一个数据,False表示不允许取同一个数据
# 参数二:抽样个数
# 参数三:随机数种子,随机数字即可,可以不用传,spark默认值即可
rdd = sc.parallelize([1, 2, 3, 4, 5])
print(rdd.takeSample(False, 3))
# [4, 1, 2]
takeOrdered(对rdd排序取前n个)
# 参数一:要几个数据
# 参数二:对排序数据进行更改(不会影响源数据,只在排序时影响)
rdd = sc.parallelize([1, 2, 3, 4, 5])
print(rdd.takeOrdered(3)) # 正序
# [1, 2, 3]
print(rdd.takeOrdered(3, lambda x: -x)) # 倒序
# [5, 4, 3]
foreach(对rdd每个值进行操作,和map一样,但这个方法没有返回值)
rdd = sc.parallelize([1, 2, 3, 4, 5])
rdd.foreach(lambda x: print(x*10)) # 无返回值,也不改变rdd
# 输出
10
20
30
40
50
foreach存在的意义(某些场景并发执行效率更高,不需要数据从executor传回driver)
saveAsTextFile(将rdd写入本地文件中)
saveAsTextFile分区最直接会写出文件,不会汇总到driver
rdd = sc.parallelize([1, 2, 3, 4, 5])
rdd.saveAsTextFile("hdfs://node1:8020/output/12345")
两个文件,就是两个分区
mapPartitions(和map相同)
mapPartitions减少io,每个分区仅有一次IO,效率高速度快
rdd = sc.parallelize([1, 2, 3, 4, 5])
# 每次操作的都是一个分区的数据
def ptest(i):
result = list()
for x in i:
result.append(x*10)
return result
print(rdd.mapPartitions(ptest).collect())
# [10, 20, 30, 40, 50]
foreachParttion(foreach一样,但是是Partition逻辑)
rdd = sc.parallelize([1, 2, 3, 4, 5])
def ptest(i):
result = list()
for x in i:
result.append(x*10)
print(result)
rdd.foreachPartition(ptest)
# [30, 40, 50]
# [10, 20]
partitionBy(对rdd进行自定义分区操作)
默认分区规则是通过key进行hash分区,现在可以通过key自定义分区规则
rdd = sc.parallelize([("a", 3), ("b", 1), ("c", 4)])
# 参数1:重新分区后有几个分区
# 参数2:自定义分区规则,函数
def test_fun(k):
if 'a' == k or 'c' == k: return 0 # a和c放在0分区
else: return 1 # 其他b放在0分区
print(rdd.partitionBy(2, test_fun).glom().collect())
# [[('a', 3), ('c', 4)], [('b', 1)]]
repartition(对rdd的分区进行重新分区(仅数量))【不建议使用】
底层就是coalesce
coalesce(对rdd进行重新分区)
rdd = sc.parallelize([("a", 3), ("b", 1), ("c", 4)], 3)
print(rdd.getNumPartitions()) # 3
print(rdd.repartition(1).getNumPartitions()) # 1
print(rdd.coalesce(1).getNumPartitions()) # 1
print(rdd.coalesce(5, shuffle=True).getNumPartitions()) # 5
groupByKey和reduceByKey的区别
reduceByKey基本等同于groupByKey + 一个聚合
但是reduceByKey的效率更高,因为reduceByKey自带聚合逻辑,在分组之前会先聚合
7、RDD算子总结
8、RDD的持久化
内存迭代
老旧的rdd没用了就从内存中清理掉,为后续计算腾空间
对于上述场景,需要优化,进行rdd3的缓存
# 注意,这些内容存储在内存中,设计上来说是不安全的,内存不保证安全
rdd3.cache() # 缓存到内存中
rdd3.persist(StoreageLevel.MEMORY_ONLY) # 仅内存缓存
rdd3.persist(StoreageLevel.MEMORY_ONLY_2) # 仅内存缓存,2个副本
rdd3.persist(StoreageLevel.DISK_ONLY) # 仅缓存硬盘上
rdd3.persist(StoreageLevel.DISK_ONLY_2) # 仅缓存硬盘上,2个副本
rdd3.persist(StoreageLevel.DISK_ONLY_3) # 仅缓存硬盘上,3个副本
rdd3.persist(StoreageLevel.MEMORY_AND_DISK) # 先放内存,不够放硬盘
rdd3.persist(StoreageLevel.MEMORY_AND_DISK_2) # 先放内存,不够放硬盘,2个副本
rdd3.persist(StoreageLevel.OFF_HELP) # 堆外内存(系统内存)
# 一般使用的是
rdd3.persist(StoreageLevel.MEMORY_AND_DISK)
# 主动清理缓存
rdd.unpersist()
# 设计上是安全的
# CheckPoint 仅支持硬盘存储,每个分区都会向hdfs中汇集数据
sc.setCheckpointDor("hdfs://node1:8020/output/ckp")
rdd.checkpoint()
总结
9、广播变量
同一个Executor中共享一个广播变量
broadcast = sc.broadcast(stu_info_list) #不是rdd,是一个本地变量类型的广播变量
value = broadcast.value #从广播变量中取值就好了
10、累加器
rdd = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9, 10], 2)
# acmlt = 0 # 不用累加器的话,最后输出为0
count = sc.accumulator(0) # 用累加器后,结果就变为了10
def test_fun(data):
global count
count += 1
print(count)
rdd.map(test_fun).collect()
print(count)
rdd = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9, 10], 2)
count = sc.accumulator(0)
def test_fun(data):
global count
count += 1
rdd2 = rdd.map(test_fun)
# rdd2.cache() # 添加缓存后,最后就会输出10了
rdd2.collect()
print(count)# 10
rdd3 = rdd2.map(lambda x:x)
rdd3.collect()
print(count)# 20
# 输出20的原因是,rdd2没有缓存,所以上面就没有了,结果下面要用到rdd2,所以就会重新走一遍rdd2的逻辑得到rdd2,所以又走了一遍
总结
11、Spark DAG
执行流程图,有向无环图
Spark宽依赖窄依赖
窄依赖:父RDD的一个分区,全部将数据发给子RDD的一个分区
宽依赖:父RDD的一个分区,将数据发给子RDD的多个分区
SparkSQL(DataFrame)
1、SparkSQL概述
sparkSql和Hive的异同
sparksql是基于内存的,hive是基于磁盘的
sparksql无数据管理,hive有metastore
sparksql可以结合sql和代码,hive仅能sql
SparkSession
rdd的入口是SparkContext
sparksql的入口是SparkSession(里可以获取到SparkContext)
# coding:utf8
# linux环境
import os
# os.environ['JAVA_HOME'] = '/export/server/jdk' # 这个改成你自己的jdk安装路径
# os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/envs/pyspark/bin/python3.8'
# os.environ['HADOOP_CONF_DIR'] = '/export/server/hadoop/etc/hadoop'
from pyspark.sql import *
if __name__ == '__main__':
spark = SparkSession.builder.appName("test").master("local[*]").getOrCreate()
df = spark.read.csv("data/input/stu_score.txt", sep=',', header=False)
df2 = df.toDF("id", "name", "score")
df2.printSchema()
df2.show()
df2.createTempView("score")
# SQL风格
spark.sql("""
SELECT * from score WHERE name = '语文' LIMIT 5
""").show()
# DSL风格
df2.where("name='语文'").limit(5).show()
2、dataframe
(1)基于RDD构建dataframe
构建dataframe方式一 基于RDD构建一
# coding:utf8
# linux环境
import os
os.environ['JAVA_HOME'] = '/export/server/jdk' # 这个改成你自己的jdk安装路径
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/envs/pyspark/bin/python3.8'
os.environ['HADOOP_CONF_DIR'] = '/export/server/hadoop/etc/hadoop'
from pyspark.sql import *
if __name__ == '__main__':
spark = SparkSession.builder \
.appName("test") \
.master("local[*]") \
.getOrCreate()
sc = spark.sparkContext
rdd = sc.textFile("file:///lxw/pyspark/data/sql/people.txt") \
.map(lambda x: x.split(",")) \
.map(lambda x: (x[0], int(x[1])))
df = spark.createDataFrame(rdd, schema=['name', 'age'])
df.printSchema()
# show的第一个参数,最大显示多少条数据,第二个参数False表示不截断全部显示
df.show(20, False)
df.createTempView("people")
spark.sql("SELECT * FROM people WHERE age<30").show()
构建dataframe方式二 基于RDD构建二(通过StructType创建)
# coding:utf8
# linux环境
import os
os.environ['JAVA_HOME'] = '/export/server/jdk' # 这个改成你自己的jdk安装路径
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/envs/pyspark/bin/python3.8'
os.environ['HADOOP_CONF_DIR'] = '/export/server/hadoop/etc/hadoop'
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StringType, IntegerType
if __name__ == '__main__':
spark = SparkSession.builder \
.appName("test") \
.master("local[*]") \
.getOrCreate()
sc = spark.sparkContext
rdd = sc.textFile("file:///lxw/pyspark/data/sql/people.txt") \
.map(lambda x: x.split(",")) \
.map(lambda x: (x[0], int(x[1])))
# 参数1:名字 参数2:类型 参数3:是否允许为空
schema = StructType()\
.add("name", StringType(), nullable=True)\
.add("age", IntegerType(), nullable=False)
df = spark.createDataFrame(rdd, schema=schema)
df.printSchema()
df.show()
构建dataframe方式三 基于RDD构建三(使用RDD的toDF方法)
# coding:utf8
# linux环境
import os
os.environ['JAVA_HOME'] = '/export/server/jdk' # 这个改成你自己的jdk安装路径
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/envs/pyspark/bin/python3.8'
os.environ['HADOOP_CONF_DIR'] = '/export/server/hadoop/etc/hadoop'
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StringType, IntegerType
if __name__ == '__main__':
spark = SparkSession.builder \
.appName("test") \
.master("local[*]") \
.getOrCreate()
sc = spark.sparkContext
rdd = sc.textFile("file:///lxw/pyspark/data/sql/people.txt") \
.map(lambda x: x.split(",")) \
.map(lambda x: (x[0], int(x[1])))
# 1.直接通过toDF(无法指定类型,智能通过推断类型)
df = rdd.toDF(["name", "age"])
df.printSchema()
df.show()
# 2.通过StructType然后toDF(可以指定类型)
schema = StructType() \
.add("name", StringType(), nullable=True) \
.add("age", IntegerType(), nullable=False)
df2 = rdd.toDF(schema=schema)
df2.printSchema()
df2.show()
(2)基于pandas构建dataframe
构建dataframe方式四 基于pandas构建
# coding:utf8
# linux环境
import os
import pandas as pd
os.environ['JAVA_HOME'] = '/export/server/jdk' # 这个改成你自己的jdk安装路径
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/envs/pyspark/bin/python3.8'
os.environ['HADOOP_CONF_DIR'] = '/export/server/hadoop/etc/hadoop'
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StringType, IntegerType
if __name__ == '__main__':
spark = SparkSession.builder \
.appName("test") \
.master("local[*]") \
.getOrCreate()
sc = spark.sparkContext
rdd = sc.textFile("file:///lxw/pyspark/data/sql/people.txt") \
.map(lambda x: x.split(",")) \
.map(lambda x: (x[0], int(x[1])))
# 基于pandas构建dataframe
pd_df = pd.DataFrame({
"id": [1, 2, 3],
"name": ["岩弟弟", "烽烟", "岩龙铠甲"],
"age": [10, 12, 45]
})
df = spark.createDataFrame(pd_df)
df.printSchema()
df.show()
(3)基于SparkSQL的统一api构建dataframe
# coding:utf8
# linux环境
import os
import pandas as pd
os.environ['JAVA_HOME'] = '/export/server/jdk' # 这个改成你自己的jdk安装路径
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/envs/pyspark/bin/python3.8'
os.environ['HADOOP_CONF_DIR'] = '/export/server/hadoop/etc/hadoop'
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StringType, IntegerType
if __name__ == '__main__':
spark = SparkSession.builder \
.appName("test") \
.master("local[*]") \
.getOrCreate()
sc = spark.sparkContext
# 构建StructType,默认一列读取,默认列名为value,类型为string
schema = StructType() \
.add("data", StringType(), nullable=True)
df = spark.read.format("text")\
.schema(schema=schema)\
.load("file:///lxw/pyspark/data/sql/people.txt")
df.printSchema()
df.show()
读取json数据
# json数据
df = spark.read.json("file:///lxw/pyspark/data/sql/people.json")
# df = spark.read.format("json").load("file:///lxw/pyspark/data/sql/people.json")
df.printSchema()
df.show()
读取csv数据
# json数据
df = spark.read.format("csv") \
.option("sep", ";") \
.option("header", True) \
.option("encoding", "utf-8") \
.schema("name STRING, age INT, job STRING") \
.load("file:///lxw/pyspark/data/sql/people.csv")
df.printSchema()
df.show()
读取parquet数据(parquet是spark中常用的一种列式存储文件格式)
# parquet文件
df = spark.read.parquet("file:///lxw/pyspark/data/sql/users.parquet")
# df = spark.read.format("parquet").load("file:///lxw/pyspark/data/sql/users.parquet")
df.printSchema()
df.show()
(4)DataFrame的DSL和SQL风格
DSL风格
df = spark.read.format("csv") \
.schema("id INT, subject STRING, score INT") \
.load("file:///lxw/pyspark/data/sql/stu_score.txt")
df.select(["id", "subject"]).show()
df.select("id", "subject").show()
id_column = df['id']
subject_column = df['subject']
df.select(id_column, subject_column).show()
df.filter("score < 99").show()
df.filter(df['score'] < 99).show()
# df.where() # 和filter一样
df.groupBy("subject").count().show()
df.groupBy(df['subject']).count().show()
SQL风格
df = spark.read.format("csv") \
.schema("id INT, subject STRING, score INT") \
.load("file:///lxw/pyspark/data/sql/stu_score.txt")
df.createTempView("score") # 注册临时表
df.createOrReplaceTempView("score2") # 注册或者替换临时表
df.createGlobalTempView("score3") # 注册全局临时视图,使用时需要加上global_temp
spark.sql("select subject,count(*) as cnt from score group by subject").show()
spark.sql("select subject,count(*) as cnt from score2 group by subject").show()
spark.sql("select subject,count(*) as cnt from global_temp.score3 group by subject").show()
3、SparkSQL版wordcount
# coding:utf8
# linux环境
import os
import pandas as pd
os.environ['JAVA_HOME'] = '/export/server/jdk' # 这个改成你自己的jdk安装路径
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/envs/pyspark/bin/python3.8'
os.environ['HADOOP_CONF_DIR'] = '/export/server/hadoop/etc/hadoop'
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StringType, IntegerType
from pyspark.sql import functions as F
if __name__ == '__main__':
spark = SparkSession.builder \
.appName("test") \
.master("local[*]") \
.getOrCreate()
sc = spark.sparkContext
# 1.SparkSQL风格
# rdd读取
rdd = sc.textFile("file:///lxw/pyspark/data/sql/words.txt") \
.flatMap(lambda x: x.split(" ")) \
.map(lambda x: [x])
df = rdd.toDF(["word"])
df.createTempView("wordsView")
spark.sql("""
select word,count(*) as cnt from wordsView group by word order by cnt desc
""").show()
# 2.DSL风格
# sparkAPI读取
df = spark.read.format("text") \
.load("file:///lxw/pyspark/data/sql/words.txt")
df2 = df.withColumn("value", F.explode(F.split(df["value"], " ")))
df2.groupBy("value").count() \
.withColumnRenamed("value", "单词") \
.withColumnRenamed("count", "个数") \
.orderBy("个数", ascending=False) \
.show()
4、数据清洗(异常数据清洗)
# 去重
df.dropDuplicates().show()
df.dropDuplicates(['age','job']).show()
# 去除空值
df.dropna().show()
df.dropna(thresh=3).show() #最少满足3个有效列
df.dropna(thresh=2,subset=['name','age']).show() #只判断name和age,而且满足两个有效
# 填充空值
df.fillna("loss") # 给空值填入loss
df.fillna("loss",subset=['name','age']).show() #指定列填充
df.fillna({'name':'nameNull','age':'0','job':'jobNull'}).show() #对每个列进行单独填充
5、Dataframe数据导出
(1)导出到文件
# coding:utf8
# linux环境
import os
import pandas as pd
os.environ['JAVA_HOME'] = '/export/server/jdk' # 这个改成你自己的jdk安装路径
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/envs/pyspark/bin/python3.8'
os.environ['HADOOP_CONF_DIR'] = '/export/server/hadoop/etc/hadoop'
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StringType, IntegerType
from pyspark.sql import functions as F
if __name__ == '__main__':
spark = SparkSession.builder \
.appName("test") \
.master("local[*]") \
.getOrCreate()
sc = spark.sparkContext
# 1.SparkSQL风格
# rdd读取
rdd = sc.textFile("file:///lxw/pyspark/data/sql/words.txt") \
.flatMap(lambda x: x.split(" ")) \
.map(lambda x: [x])
df_rdd = rdd.toDF(["word"])
df_rdd.createTempView("wordsView")
df = spark.sql("""
select word,count(*) as cnt from wordsView group by word order by cnt desc
""")
df.show()
# 写出到text
df.select(F.concat_ws("---", "word", "cnt")) \
.write \
.mode("overwrite") \
.format("text") \
.save("file:///lxw/pyspark/data/sql/text")
# 写出到csv
df.write \
.mode("overwrite") \
.format("csv") \
.option("sep", ",") \
.option("header", True) \
.save("file:///lxw/pyspark/data/sql/csv")
# 写出到json
df.write \
.mode("overwrite") \
.format("json") \
.save("file:///lxw/pyspark/data/sql/json")
# 写出到parquet
df.write \
.mode("overwrite") \
.format("parquet") \
.save("file:///lxw/pyspark/data/sql/parquet")
(2)导出到mysql
先导入连接mysql的jar包,导入路径为pyspark的jars路径下为下:
我导入后的路径:D:\python\pyspark\venv\Lib\site-packages\pyspark\jars\mysql-connector-java-5.1.41-bin.jar
# 导入到mysql(写出会自动创建表)
df.write.mode("overwrite") \
.format("jdbc") \
.option("url", "jdbc:mysql://localhost:3306/pysparkdata?useSSL=false&useUnicode=true") \
.option("dbtable", "testdata") \
.option("user", "root") \
.option("password", "123456") \
.save()
# 读取mysql数据转为df
df = spark.read.format("jdbc") \
.option("url", "jdbc:mysql://localhost:3306/yiliao?useSSL=false&useUnicode=true") \
.option("dbtable", "question_history") \
.option("user", "root") \
.option("password", "123456") \
.load()
df.show()
评论区