侧边栏壁纸
博主头像
这就是之谦博主等级

我们的征途是星辰大海

  • 累计撰写 182 篇文章
  • 累计创建 3 个标签
  • 累计收到 16 条评论
标签搜索

目 录CONTENT

文章目录

PySpark

这就是之谦
2022-06-03 / 0 评论 / 0 点赞 / 492 阅读 / 6,872 字
温馨提示:
本文最后更新于 2022-06-03,若内容或图片失效,请留言反馈。部分素材来自网络,若不小心影响到您的利益,请联系我们删除。

PySpark

学习自毕设(2022.5.25-2022.6.3左右吧),之前大三下学期学过pyspark这门课,毕设重新学习一遍,课程参考是黑马的https://www.bilibili.com/video/BV1Jq4y1z7VP

执行

启动

# 切换到Anaconda的pyspark虚拟环境(3台)
conda activate pyspark
# 启动hadoop hdfs yarn historyserver(node1)
/onekey/hd1_start.sh
# 提交spark on yarn模式(node1)
spark-submit --master yarn /export/server/spark/examples/src/main/python/pi.py



# 关闭hadoop
/onekey/hd2_stop.sh
# 关机
shutdown -h now

基于zk的HA

# 需要先启动zk
/onekey/zk1_start.sh
# 启动spark
/export/server/spark-3.1.2-bin-hadoop3.2/sbin/start-history-server.sh
/export/server/spark-3.1.2-bin-hadoop3.2/sbin/start-all.sh
# 提交pyspark代码
spark-submit --master spark://node1:7077 /export/server/spark/examples/src/main/python/pi.py


# 关闭spark
/export/server/spark-3.1.2-bin-hadoop3.2/sbin/stop-history-server.sh
/export/server/spark-3.1.2-bin-hadoop3.2/sbin/stop-all.sh
/onekey/hd1_start.sh
conda activate pyspark


pyspark --master yarn

spark-submit --master local[*] /export/server/spark/examples/src/main/python/pi.py


./pyspark --master  spark://node1:7077

sc.parallelize([1,2,3,4,5]).map(lambda x:x*10).collect()
创建pyspark环境
conda create -n pyspark python=3.8

HDFS集群:http://node1:9870/

YARN集群:http://node1:8088/

historyserver:http://node1:19888/

4040

8080

18080

环境

三台linux集群centos7

QQ截图20220521180055

hadoop 的hdfs

QQ截图20220521180144

spark HA on zk

QQ截图20220521180203

spark on zk 和yarn的区别

我们已经有yarn的情况下,复用yarn调度显得更方便

重新部署zk模式的ha模式,会重复消耗资源,两种调度也会相互争抢产生影响

所以spark on yarn一般更适合已有hadoop yarn的集群

提交到yarn执行

# coding:utf8
# linux环境
import os
import py

os.environ['JAVA_HOME'] = '/export/server/jdk'  # 这个改成你自己的jdk安装路径
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/envs/pyspark/bin/python3.8'
os.environ['HADOOP_CONF_DIR'] = '/export/server/hadoop/etc/hadoop'
from pyspark import *

if __name__ == '__main__':
    conf = SparkConf().setMaster("yarn").setAppName("test")
    # 依赖文件的设置,如果不设置的话会报错,原因就是集群找不到本地的.py文件,必须要把.py文件上传到集群
    conf.set("spark.submit.pyFiles", "py.py")
    sc = SparkContext(conf=conf)


    # 读取hdfs对象
    file_rdd = sc.textFile("hdfs://node1:8020/input/words.txt")
    print(file_rdd.collect())
    # 输出 ['hello spark', 'hello hadoop', 'hello flink']
    word_rdd = file_rdd.flatMap(lambda line: line.split(" "))  # 通过空格切分

    word_with_one_rdd = word_rdd.map(lambda x: (x, 1))  # 把每个值换成(hadoop,1)的元组形式
    result_rdd = word_with_one_rdd.reduceByKey(lambda a, b: a + b)  # 相加规则
    print(result_rdd.collect())
    # 输出 [('hadoop', 1), ('hello', 3), ('spark', 1), ('flink', 1)]
    print(py.hello())



py.py的内容

def hello():
    return "hello"

linux下submit方式提交

spark-submit --master yarn --py-files ./py.py ./test.py

pyspark的架构和运行

driver端由jvm执行,executor端由jvm做命令转发,底层由python解释器进行工作

QQ截图20220522145351

集群运行代码,分布式运行,

spark,非任务处理由driver执行(非RDD)

任务处理由executor执行,executor可以有很多,所以任务的计算是分布式多台并行的

spark core (RDD)

rdd是一个数据集,用于存放对象,

rdd是分布式的,用于分布式计算

rdd中的数据可以存储在内存或磁盘中

1、rdd的5大特性

1、rdd有分区

2、rdd的方法会作用到rdd的每个分区

image-20220522152533601

3、rdd之间有依赖关系

image-20220522152828710

4、key-value型的rdd可以有分区器,默认hash规则分区

image-20220522153319053

5、rdd的分区划分,会尽量靠近数据所在的服务器

image-20220522153522889

2、wordcount代码分析

# coding:utf8
# linux环境
import os

os.environ['JAVA_HOME'] = '/export/server/jdk'  # 这个改成你自己的jdk安装路径
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/envs/pyspark/bin/python3.8'
from pyspark import *

if __name__ == '__main__':
    # 分布式
    # conf = SparkConf().setAppName("WordCount")
    # 本机
    conf = SparkConf().setMaster("local[*]").setAppName("test")
    sc = SparkContext(conf=conf)

    # 读取hdfs对象
    file_rdd = sc.textFile("hdfs://node1:8020/input/words.txt")
    print(file_rdd.collect())
    # 输出 ['hello spark', 'hello hadoop', 'hello flink']
    word_rdd = file_rdd.flatMap(lambda line: line.split(" "))  # 通过空格切分 

    word_with_one_rdd = word_rdd.map(lambda x: (x, 1))  # 把每个值换成(hadoop,1)的元组形式
    result_rdd = word_with_one_rdd.reduceByKey(lambda a, b: a + b)  # 相加规则
    print(result_rdd.collect())
    # 输出 [('hadoop', 1), ('hello', 3), ('spark', 1), ('flink', 1)]

image-20220522160452557

3、rdd的创建

SparkContext(conf=conf)是spark的入口

手动创建rdd

# coding:utf8
from pyspark import *

if __name__ == '__main__':
    conf = SparkConf().setMaster("local[*]").setAppName("WordCount")
    sc = SparkContext(conf=conf)

    rdd = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9])
    print("默认分区数", rdd.getNumPartitions())
    # 输出8,电脑是8核cpu处理器,所以默认是8个分区
	rdd1 = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9], 3)
    print("指定分区数", rdd1.getNumPartitions())
    # 指定分区数后,则输出3

读取文件

from pyspark import *

if __name__ == '__main__':
    # 分布式
    # conf = SparkConf().setAppName("WordCount")
    # 本机
    conf = SparkConf().setMaster("local[*]").setAppName("test")
    sc = SparkContext(conf=conf)

    # 读取本地对象
    # file_rdd = sc.textFile("data/input/words.txt", 3)  # 可以加参数指定最小分区数,仅供参考,spark有自己的判断
    # print("分区数", file_rdd.getNumPartitions())
    # 读取hdfs对象
    file_rdd = sc.textFile("hdfs://node1:8020/input/words.txt", 3)
    print(file_rdd.collect())

小文件读取可以选择wholeTextFiles,适用于大量小文件

rdd = sc.wholeTextFiles("data/input/tiny_files") # 很多小文件
print(rdd.collect())
print(rdd.map(lambda x:x[1]).collect())

4、rdd算子

rdd的分布式api,也就是分布式的方法

transformation算子(转换算子) 懒加载,没有action的话不执行

rdd->rdd,返回值依然是rdd

action算子(执行计划的开关)

返回值不是rdd

5、常用的transformation算子

map

# coding:utf8
# linux环境
import os
os.environ['JAVA_HOME'] = '/export/server/jdk'  # 这个改成你自己的jdk安装路径
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/envs/pyspark/bin/python3.8'
from pyspark import *

if __name__ == '__main__':
    # 分布式
    # conf = SparkConf().setAppName("WordCount")
    # 本机
    conf = SparkConf().setMaster("local[*]").setAppName("test")
    sc = SparkContext(conf=conf)

    rdd = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9], 3)

    # 1.定义方法
    def add(data):
        return data * 10

    print(rdd.map(add).collect())

    # 2.lambda表达式写匿名函数(适用于一行代码搞定返回的函数,如果函数处理复杂,还是要编写独立函数体实现)
    print(rdd.map(lambda x: x*10).collect())

mapValues(map的只处理value)

和map类似,只处理value

rdd = sc.parallelize([('a', 1), ('a', 2), ('b', 3), ('b', 4)])
print(rdd.map(lambda x: (x[0], x[1] * 10)).collect())
print(rdd.mapValues(lambda x: x * 10).collect())
# 输出都是 [('a', 10), ('a', 20), ('b', 30), ('b', 40)]

flatMap (合并多个数组为一个)

rdd = sc.parallelize(["ydd lyy lxw", "ydd ydd ljj", "lxw lyy ydd"])

print(rdd.map(lambda x: x.split(" ")).collect())
# 输出 [['ydd', 'lyy', 'lxw'], ['ydd', 'ydd', 'ljj'], ['lxw', 'lyy', 'ydd']]
print(rdd.flatMap(lambda x: x.split(" ")).collect())
# 输出 ['ydd', 'lyy', 'lxw', 'ydd', 'ydd', 'ljj', 'lxw', 'lyy', 'ydd']

reduceByKey(聚合)

针对k-v的rdd,自动按照key分组,根据聚合逻辑,完成value聚合

lambda两两数字做聚合,lambda a,b:a+b

image-20220522173411271

# 读取hdfs对象
file_rdd = sc.textFile("hdfs://node1:8020/input/words.txt")
print(file_rdd.collect())
# 输出 ['hello spark', 'hello hadoop', 'hello flink']
word_rdd = file_rdd.flatMap(lambda line: line.split(" "))

word_with_one_rdd = word_rdd.map(lambda x: (x, 1))
result_rdd = word_with_one_rdd.reduceByKey(lambda a, b: a + b)
print(result_rdd.collect())
# 输出 [('hadoop', 1), ('hello', 3), ('spark', 1), ('flink', 1)]

groupBy(分组)

rdd = sc.parallelize([('a', 1), ('a', 2), ('b', 3), ('b', 4)])

# groupBy内容:后面的是给定按照哪个元素来分组(hash分组)
result = rdd.groupBy(lambda t: t[0])
print(result.map(lambda t: (t[0], list(t[1]))).collect())
# 输出 [('b', [('b', 3), ('b', 4)]), ('a', [('a', 1), ('a', 2)])]

groupByKey(groupBy的自动按照key分组)

只针对key-value

# groupBy内容:后面的是给定按照哪个元素来分组(hash分组)
# result = rdd.groupBy(lambda t: t[0])
# print(result.map(lambda t: (t[0], list(t[1]))).collect())
# 输出 [('b', [('b', 3), ('b', 4)]), ('a', [('a', 1), ('a', 2)])]
# groupByKey,自动按照key分组(与groupBy输出有区别,注意下)
result = rdd.groupByKey()
print(result.map(lambda t: (t[0], list(t[1]))).collect())
# 输出 [('b', [3, 4]), ('a', [1, 2])]

filter(过滤)

过滤想要的数据保留

rdd = sc.parallelize([1, 2, 3, 4, 5, 6])
# true保留
print(rdd.filter(lambda x: x % 2 == 1).collect())
# 输出 [1, 3, 5]

distinct(去重)

去重

rdd = sc.parallelize([1, 1, 2, 2, 3, 3])
print(rdd.distinct().collect())
# 输出 [2, 1, 3]

union(合并)

合并rdd

rdd1 = sc.parallelize([1, 1, 2, 3])
rdd2 = sc.parallelize(["a", "b", "c"])
# 不会去重,不同类型也能合并
print(rdd1.union(rdd2).collect())
# 输出 [1, 1, 2, 3, 'a', 'b', 'c']

join(sql关联)

sql内外连接,智能用于二元元组,key-value

按照key做join

rdd1 = sc.parallelize([(1, "a"), (2, "b"), (3, "c"), (4, "d")])
rdd2 = sc.parallelize([(1, "1科"), (2, "2科"), (5, "5科")])
print(rdd1.join(rdd2).collect())  # sql内连接
# 输出 [(1, ('a', '1科')), (2, ('b', '2科'))]
print(rdd1.leftOuterJoin(rdd2).collect())  # sql左连接
# 输出 [(4, ('d', None)), (1, ('a', '1科')), (2, ('b', '2科')), (3, ('c', None))]
print(rdd1.rightOuterJoin(rdd2).collect())  # sql右连接
# 输出 [(1, ('a', '1科')), (5, (None, '5科')), (2, ('b', '2科'))]

intersection(交集)

取交集,必须完全相同,与join不同,(join只对key做对比,intersection对key和value都要相同)

rdd1 = sc.parallelize([(1, "a"), (2, "b"), (3, "c"), (4, "d")])
rdd2 = sc.parallelize([(1, "a"), (2, "2科"), (5, "5科")])
print(rdd1.intersection(rdd2).collect())
# 输出 [(1, 'a')]

glom(加入嵌套,按分区)

rdd = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9, 10], 3)
print(rdd.glom().collect())
# 输出 [[1, 2, 3], [4, 5, 6], [7, 8, 9, 10]]

sortBy(排序)

# 第一个参数:告知rdd中按照哪个数据进行排序lambda x:x[1]按照第二个数据排序
# 第二个参数:True升序 False降序
# 第三个参数:用多少个分区排序
"""注意,如果需要全局有序,则排序分区数设置为1,否则分布式只能保证分区内排序"""
rdd = sc.parallelize([("a", 3), ("b", 1), ("c", 4), ("d", 3), ("e", 5), ("f", 9), ("g", 2), ("f", 6)], 3)
print(rdd.sortBy(lambda x: x[1], ascending=True, numPartitions=3).collect())
# [('b', 1), ('g', 2), ('a', 3), ('d', 3), ('c', 4), ('e', 5), ('f', 6), ('f', 9)]
print(rdd.sortBy(lambda x: x[0], ascending=False, numPartitions=3).collect())
# [('g', 2), ('f', 9), ('f', 6), ('e', 5), ('d', 3), ('c', 4), ('b', 1), ('a', 3)]

sortByKey(sortBy按照key处理)

# 第一个参数:True升序 False降序
# 第二个参数:用多少个分区排序
# 第三个参数:对key进行处理
"""注意,如果需要全局有序,则排序分区数设置为1,否则分布式只能保证分区内排序"""
rdd = sc.parallelize([("a", 3), ("B", 1), ("c", 4)], 3)
print(rdd.sortByKey(ascending=True, numPartitions=1, keyfunc=lambda x: str(x).lower()).collect())
# [('a', 3), ('B', 1), ('c', 4)]

6、常用的Action算子

countByKey(相加)

key-value

rdd = sc.textFile("hdfs://node1:8020/input/words.txt")
rdd2 = rdd.flatMap(lambda x: x.split(" ")).map(lambda x: (x, 1))
result = rdd2.countByKey()
# result不是rdd,而是dict
print(result)
# defaultdict(<class 'int'>, {'hello': 3, 'spark': 1, 'hadoop': 1, 'flink': 1})

collect(收集rdd各个分区内的数据,统一收集到driver中,形成一个list对象)

上面几乎所有代码都有collect

reduce(聚合)[区分reduceByKey]

rdd = sc.parallelize([1, 2, 3, 4, 5])
print(rdd.reduce(lambda a, b: a * b))
# 120

image-20220522201127569

fold(和reduce基本相同,可以设置一个默认值 )[基本不用]

image-20220522201716203

first(取出rdd的第一个元素)

rdd = sc.parallelize([1, 2, 3, 4, 5])
print(rdd.first())
# 1

take(取rdd的前n个元素)

rdd = sc.parallelize([1, 2, 3, 4, 5])
print(rdd.take(3))
# [1, 2, 3]

top(rdd降序取前n个)

rdd = sc.parallelize([1, 2, 3, 4, 5])
print(rdd.top(3))
# [5, 4, 3]

count(返回rdd一共有多少条数据)

rdd = sc.parallelize([1, 2, 3, 4, 5])
print(rdd.count())
# 5

takeSample(随机抽取rdd数据)

# 参数一:True表示可以取同一个数据,False表示不允许取同一个数据
# 参数二:抽样个数
# 参数三:随机数种子,随机数字即可,可以不用传,spark默认值即可
rdd = sc.parallelize([1, 2, 3, 4, 5])
print(rdd.takeSample(False, 3))
# [4, 1, 2]

takeOrdered(对rdd排序取前n个)

# 参数一:要几个数据
# 参数二:对排序数据进行更改(不会影响源数据,只在排序时影响)
rdd = sc.parallelize([1, 2, 3, 4, 5])
print(rdd.takeOrdered(3))  # 正序 
# [1, 2, 3]
print(rdd.takeOrdered(3, lambda x: -x))  # 倒序
# [5, 4, 3]

foreach(对rdd每个值进行操作,和map一样,但这个方法没有返回值)

rdd = sc.parallelize([1, 2, 3, 4, 5])
rdd.foreach(lambda x: print(x*10)) # 无返回值,也不改变rdd
# 输出
10
20
30
40
50

foreach存在的意义(某些场景并发执行效率更高,不需要数据从executor传回driver)

image-20220522204029172

saveAsTextFile(将rdd写入本地文件中)

saveAsTextFile分区最直接会写出文件,不会汇总到driver

rdd = sc.parallelize([1, 2, 3, 4, 5])
rdd.saveAsTextFile("hdfs://node1:8020/output/12345")

两个文件,就是两个分区

image-20220522205721644

mapPartitions(和map相同)

mapPartitions减少io,每个分区仅有一次IO,效率高速度快

image-20220522210252989

rdd = sc.parallelize([1, 2, 3, 4, 5])

# 每次操作的都是一个分区的数据
def ptest(i):
    result = list()
    for x in i:
        result.append(x*10)
        return result

print(rdd.mapPartitions(ptest).collect())
# [10, 20, 30, 40, 50]

foreachParttion(foreach一样,但是是Partition逻辑)

rdd = sc.parallelize([1, 2, 3, 4, 5])
def ptest(i):
    result = list()
    for x in i:
        result.append(x*10)
        print(result)


rdd.foreachPartition(ptest)
# [30, 40, 50]
# [10, 20]

partitionBy(对rdd进行自定义分区操作)

默认分区规则是通过key进行hash分区,现在可以通过key自定义分区规则

rdd = sc.parallelize([("a", 3), ("b", 1), ("c", 4)])
# 参数1:重新分区后有几个分区
# 参数2:自定义分区规则,函数
def test_fun(k):
    if 'a' == k or 'c' == k: return 0  # a和c放在0分区
    else: return 1  # 其他b放在0分区

print(rdd.partitionBy(2, test_fun).glom().collect())
# [[('a', 3), ('c', 4)], [('b', 1)]]

repartition(对rdd的分区进行重新分区(仅数量))【不建议使用】

底层就是coalesce

coalesce(对rdd进行重新分区)

rdd = sc.parallelize([("a", 3), ("b", 1), ("c", 4)], 3)
print(rdd.getNumPartitions())  # 3
print(rdd.repartition(1).getNumPartitions())  # 1
print(rdd.coalesce(1).getNumPartitions())  # 1
print(rdd.coalesce(5, shuffle=True).getNumPartitions())  # 5

groupByKey和reduceByKey的区别

reduceByKey基本等同于groupByKey + 一个聚合

但是reduceByKey的效率更高,因为reduceByKey自带聚合逻辑,在分组之前会先聚合

7、RDD算子总结

image-20220529153419999

image-20220529153451497

8、RDD的持久化

内存迭代

老旧的rdd没用了就从内存中清理掉,为后续计算腾空间

image-20220529160058406

对于上述场景,需要优化,进行rdd3的缓存

# 注意,这些内容存储在内存中,设计上来说是不安全的,内存不保证安全
rdd3.cache()  # 缓存到内存中
rdd3.persist(StoreageLevel.MEMORY_ONLY)  # 仅内存缓存
rdd3.persist(StoreageLevel.MEMORY_ONLY_2)  # 仅内存缓存,2个副本
rdd3.persist(StoreageLevel.DISK_ONLY)  # 仅缓存硬盘上
rdd3.persist(StoreageLevel.DISK_ONLY_2)  # 仅缓存硬盘上,2个副本
rdd3.persist(StoreageLevel.DISK_ONLY_3)  # 仅缓存硬盘上,3个副本
rdd3.persist(StoreageLevel.MEMORY_AND_DISK)  # 先放内存,不够放硬盘
rdd3.persist(StoreageLevel.MEMORY_AND_DISK_2)  # 先放内存,不够放硬盘,2个副本
rdd3.persist(StoreageLevel.OFF_HELP)  # 堆外内存(系统内存)

# 一般使用的是
rdd3.persist(StoreageLevel.MEMORY_AND_DISK)

# 主动清理缓存
rdd.unpersist()
# 设计上是安全的
# CheckPoint 仅支持硬盘存储,每个分区都会向hdfs中汇集数据
sc.setCheckpointDor("hdfs://node1:8020/output/ckp")
rdd.checkpoint()

image-20220529161408919

总结

image-20220529161746147

9、广播变量

同一个Executor中共享一个广播变量

broadcast = sc.broadcast(stu_info_list) #不是rdd,是一个本地变量类型的广播变量
value = broadcast.value #从广播变量中取值就好了

10、累加器

rdd = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9, 10], 2)
# acmlt = 0 # 不用累加器的话,最后输出为0
count = sc.accumulator(0) # 用累加器后,结果就变为了10

def test_fun(data):
    global count
    count += 1
    print(count)

rdd.map(test_fun).collect()
print(count)

image-20220529164550959

rdd = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9, 10], 2)
count = sc.accumulator(0)

def test_fun(data):
    global count
    count += 1

rdd2 = rdd.map(test_fun)
# rdd2.cache()  # 添加缓存后,最后就会输出10了
rdd2.collect()
print(count)# 10

rdd3 = rdd2.map(lambda x:x)
rdd3.collect()
print(count)# 20
# 输出20的原因是,rdd2没有缓存,所以上面就没有了,结果下面要用到rdd2,所以就会重新走一遍rdd2的逻辑得到rdd2,所以又走了一遍

总结

image-20220529170302556

11、Spark DAG

执行流程图,有向无环图

Spark宽依赖窄依赖

窄依赖:父RDD的一个分区,全部将数据发给子RDD的一个分区

宽依赖:父RDD的一个分区,将数据发给子RDD的多个分区

SparkSQL(DataFrame)

1、SparkSQL概述

sparkSql和Hive的异同

sparksql是基于内存的,hive是基于磁盘的

sparksql无数据管理,hive有metastore

sparksql可以结合sql和代码,hive仅能sql

image-20220530161450428

SparkSession

rdd的入口是SparkContext

sparksql的入口是SparkSession(里可以获取到SparkContext)

image-20220530162042774

# coding:utf8
# linux环境
import os

# os.environ['JAVA_HOME'] = '/export/server/jdk'  # 这个改成你自己的jdk安装路径
# os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/envs/pyspark/bin/python3.8'
# os.environ['HADOOP_CONF_DIR'] = '/export/server/hadoop/etc/hadoop'
from pyspark.sql import *

if __name__ == '__main__':
    spark = SparkSession.builder.appName("test").master("local[*]").getOrCreate()

    df = spark.read.csv("data/input/stu_score.txt", sep=',', header=False)
    df2 = df.toDF("id", "name", "score")
    df2.printSchema()
    df2.show()

    df2.createTempView("score")

    # SQL风格
    spark.sql("""
        SELECT * from score WHERE name = '语文' LIMIT 5
    """).show()

    # DSL风格
    df2.where("name='语文'").limit(5).show()

2、dataframe

image-20220530181552015

(1)基于RDD构建dataframe

构建dataframe方式一 基于RDD构建一

# coding:utf8
# linux环境
import os

os.environ['JAVA_HOME'] = '/export/server/jdk'  # 这个改成你自己的jdk安装路径
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/envs/pyspark/bin/python3.8'
os.environ['HADOOP_CONF_DIR'] = '/export/server/hadoop/etc/hadoop'
from pyspark.sql import *

if __name__ == '__main__':
    spark = SparkSession.builder \
        .appName("test") \
        .master("local[*]") \
        .getOrCreate()

    sc = spark.sparkContext

    rdd = sc.textFile("file:///lxw/pyspark/data/sql/people.txt") \
        .map(lambda x: x.split(",")) \
        .map(lambda x: (x[0], int(x[1])))

    df = spark.createDataFrame(rdd, schema=['name', 'age'])
    df.printSchema()
    # show的第一个参数,最大显示多少条数据,第二个参数False表示不截断全部显示
    df.show(20, False)

    df.createTempView("people")
    spark.sql("SELECT * FROM people WHERE age<30").show()

构建dataframe方式二 基于RDD构建二(通过StructType创建)

# coding:utf8
# linux环境
import os

os.environ['JAVA_HOME'] = '/export/server/jdk'  # 这个改成你自己的jdk安装路径
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/envs/pyspark/bin/python3.8'
os.environ['HADOOP_CONF_DIR'] = '/export/server/hadoop/etc/hadoop'
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StringType, IntegerType

if __name__ == '__main__':
    spark = SparkSession.builder \
        .appName("test") \
        .master("local[*]") \
        .getOrCreate()

    sc = spark.sparkContext

    rdd = sc.textFile("file:///lxw/pyspark/data/sql/people.txt") \
        .map(lambda x: x.split(",")) \
        .map(lambda x: (x[0], int(x[1])))

    # 参数1:名字 参数2:类型 参数3:是否允许为空
    schema = StructType()\
        .add("name", StringType(), nullable=True)\
        .add("age", IntegerType(), nullable=False)
    df = spark.createDataFrame(rdd, schema=schema)
    df.printSchema()
    df.show()

构建dataframe方式三 基于RDD构建三(使用RDD的toDF方法)

# coding:utf8
# linux环境
import os

os.environ['JAVA_HOME'] = '/export/server/jdk'  # 这个改成你自己的jdk安装路径
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/envs/pyspark/bin/python3.8'
os.environ['HADOOP_CONF_DIR'] = '/export/server/hadoop/etc/hadoop'
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StringType, IntegerType

if __name__ == '__main__':
    spark = SparkSession.builder \
        .appName("test") \
        .master("local[*]") \
        .getOrCreate()

    sc = spark.sparkContext

    rdd = sc.textFile("file:///lxw/pyspark/data/sql/people.txt") \
        .map(lambda x: x.split(",")) \
        .map(lambda x: (x[0], int(x[1])))

    # 1.直接通过toDF(无法指定类型,智能通过推断类型)
    df = rdd.toDF(["name", "age"])
    df.printSchema()
    df.show()

    # 2.通过StructType然后toDF(可以指定类型)
    schema = StructType() \
        .add("name", StringType(), nullable=True) \
        .add("age", IntegerType(), nullable=False)
    df2 = rdd.toDF(schema=schema)
    df2.printSchema()
    df2.show()

(2)基于pandas构建dataframe

构建dataframe方式四 基于pandas构建

# coding:utf8
# linux环境
import os
import pandas as pd

os.environ['JAVA_HOME'] = '/export/server/jdk'  # 这个改成你自己的jdk安装路径
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/envs/pyspark/bin/python3.8'
os.environ['HADOOP_CONF_DIR'] = '/export/server/hadoop/etc/hadoop'
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StringType, IntegerType

if __name__ == '__main__':
    spark = SparkSession.builder \
        .appName("test") \
        .master("local[*]") \
        .getOrCreate()

    sc = spark.sparkContext

    rdd = sc.textFile("file:///lxw/pyspark/data/sql/people.txt") \
        .map(lambda x: x.split(",")) \
        .map(lambda x: (x[0], int(x[1])))

    # 基于pandas构建dataframe
    pd_df = pd.DataFrame({
        "id": [1, 2, 3],
        "name": ["岩弟弟", "烽烟", "岩龙铠甲"],
        "age": [10, 12, 45]
    })

    df = spark.createDataFrame(pd_df)

    df.printSchema()
    df.show()

(3)基于SparkSQL的统一api构建dataframe

# coding:utf8
# linux环境
import os
import pandas as pd

os.environ['JAVA_HOME'] = '/export/server/jdk'  # 这个改成你自己的jdk安装路径
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/envs/pyspark/bin/python3.8'
os.environ['HADOOP_CONF_DIR'] = '/export/server/hadoop/etc/hadoop'
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StringType, IntegerType

if __name__ == '__main__':
    spark = SparkSession.builder \
        .appName("test") \
        .master("local[*]") \
        .getOrCreate()

    sc = spark.sparkContext

    # 构建StructType,默认一列读取,默认列名为value,类型为string
    schema = StructType() \
        .add("data", StringType(), nullable=True) 
    df = spark.read.format("text")\
        .schema(schema=schema)\
        .load("file:///lxw/pyspark/data/sql/people.txt")
    df.printSchema()
    df.show()

读取json数据

# json数据
df = spark.read.json("file:///lxw/pyspark/data/sql/people.json")
# df = spark.read.format("json").load("file:///lxw/pyspark/data/sql/people.json")
df.printSchema()
df.show()

读取csv数据

# json数据
df = spark.read.format("csv") \
	.option("sep", ";") \
	.option("header", True) \
	.option("encoding", "utf-8") \
	.schema("name STRING, age INT, job STRING") \
	.load("file:///lxw/pyspark/data/sql/people.csv")
df.printSchema()
df.show()

读取parquet数据(parquet是spark中常用的一种列式存储文件格式)

# parquet文件
df = spark.read.parquet("file:///lxw/pyspark/data/sql/users.parquet")
# df = spark.read.format("parquet").load("file:///lxw/pyspark/data/sql/users.parquet")
df.printSchema()
df.show()

(4)DataFrame的DSL和SQL风格

DSL风格

df = spark.read.format("csv") \
        .schema("id INT, subject STRING, score INT") \
        .load("file:///lxw/pyspark/data/sql/stu_score.txt")

df.select(["id", "subject"]).show()
df.select("id", "subject").show()
id_column = df['id']
subject_column = df['subject']
df.select(id_column, subject_column).show()

df.filter("score < 99").show()
df.filter(df['score'] < 99).show()
# df.where() # 和filter一样

df.groupBy("subject").count().show()
df.groupBy(df['subject']).count().show()

SQL风格

df = spark.read.format("csv") \
        .schema("id INT, subject STRING, score INT") \
        .load("file:///lxw/pyspark/data/sql/stu_score.txt")

df.createTempView("score")  # 注册临时表
df.createOrReplaceTempView("score2")  # 注册或者替换临时表
df.createGlobalTempView("score3")  # 注册全局临时视图,使用时需要加上global_temp

spark.sql("select subject,count(*) as cnt from score group by subject").show()
spark.sql("select subject,count(*) as cnt from score2 group by subject").show()
spark.sql("select subject,count(*) as cnt from global_temp.score3 group by subject").show()

3、SparkSQL版wordcount

# coding:utf8
# linux环境
import os
import pandas as pd

os.environ['JAVA_HOME'] = '/export/server/jdk'  # 这个改成你自己的jdk安装路径
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/envs/pyspark/bin/python3.8'
os.environ['HADOOP_CONF_DIR'] = '/export/server/hadoop/etc/hadoop'
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StringType, IntegerType
from pyspark.sql import functions as F

if __name__ == '__main__':
    spark = SparkSession.builder \
        .appName("test") \
        .master("local[*]") \
        .getOrCreate()

    sc = spark.sparkContext

    # 1.SparkSQL风格
    # rdd读取
    rdd = sc.textFile("file:///lxw/pyspark/data/sql/words.txt") \
        .flatMap(lambda x: x.split(" ")) \
        .map(lambda x: [x])

    df = rdd.toDF(["word"])
    df.createTempView("wordsView")
    spark.sql("""
        select word,count(*) as cnt from wordsView group by word order by cnt desc
    """).show()

    # 2.DSL风格
    # sparkAPI读取
    df = spark.read.format("text") \
        .load("file:///lxw/pyspark/data/sql/words.txt")

    df2 = df.withColumn("value", F.explode(F.split(df["value"], " ")))
    df2.groupBy("value").count() \
        .withColumnRenamed("value", "单词") \
        .withColumnRenamed("count", "个数") \
        .orderBy("个数", ascending=False) \
        .show()

4、数据清洗(异常数据清洗)

# 去重
df.dropDuplicates().show()
df.dropDuplicates(['age','job']).show() 
# 去除空值
df.dropna().show()
df.dropna(thresh=3).show() #最少满足3个有效列
df.dropna(thresh=2,subset=['name','age']).show() #只判断name和age,而且满足两个有效
# 填充空值
df.fillna("loss") # 给空值填入loss
df.fillna("loss",subset=['name','age']).show() #指定列填充
df.fillna({'name':'nameNull','age':'0','job':'jobNull'}).show() #对每个列进行单独填充

5、Dataframe数据导出

(1)导出到文件

# coding:utf8
# linux环境
import os
import pandas as pd

os.environ['JAVA_HOME'] = '/export/server/jdk'  # 这个改成你自己的jdk安装路径
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/envs/pyspark/bin/python3.8'
os.environ['HADOOP_CONF_DIR'] = '/export/server/hadoop/etc/hadoop'
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StringType, IntegerType
from pyspark.sql import functions as F

if __name__ == '__main__':
    spark = SparkSession.builder \
        .appName("test") \
        .master("local[*]") \
        .getOrCreate()

    sc = spark.sparkContext

    # 1.SparkSQL风格
    # rdd读取
    rdd = sc.textFile("file:///lxw/pyspark/data/sql/words.txt") \
        .flatMap(lambda x: x.split(" ")) \
        .map(lambda x: [x])

    df_rdd = rdd.toDF(["word"])
    df_rdd.createTempView("wordsView")
    df = spark.sql("""
        select word,count(*) as cnt from wordsView group by word order by cnt desc
    """)
    df.show()

    # 写出到text
    df.select(F.concat_ws("---", "word", "cnt")) \
        .write \
        .mode("overwrite") \
        .format("text") \
        .save("file:///lxw/pyspark/data/sql/text")

    # 写出到csv
    df.write \
        .mode("overwrite") \
        .format("csv") \
        .option("sep", ",") \
        .option("header", True) \
        .save("file:///lxw/pyspark/data/sql/csv")

    # 写出到json
    df.write \
        .mode("overwrite") \
        .format("json") \
        .save("file:///lxw/pyspark/data/sql/json")

    # 写出到parquet
    df.write \
        .mode("overwrite") \
        .format("parquet") \
        .save("file:///lxw/pyspark/data/sql/parquet")

(2)导出到mysql

先导入连接mysql的jar包,导入路径为pyspark的jars路径下为下:

我导入后的路径:D:\python\pyspark\venv\Lib\site-packages\pyspark\jars\mysql-connector-java-5.1.41-bin.jar

image-20220603191251095

# 导入到mysql(写出会自动创建表)
df.write.mode("overwrite") \
	.format("jdbc") \
	.option("url", "jdbc:mysql://localhost:3306/pysparkdata?useSSL=false&useUnicode=true") \
	.option("dbtable", "testdata") \
	.option("user", "root") \
	.option("password", "123456") \
	.save()
# 读取mysql数据转为df
df = spark.read.format("jdbc") \
	.option("url", "jdbc:mysql://localhost:3306/yiliao?useSSL=false&useUnicode=true") \
	.option("dbtable", "question_history") \
	.option("user", "root") \
	.option("password", "123456") \
	.load()
df.show()
0

评论区