分布式数据处理和 PySpark
在阅读这一章节之前,建议读者首先学习 SQL 查询语句,这样会更容易理解文中的示例程序。
PySpark 是 Apache Spark 的 Python API。Apache Spark 是一个强大的分布式数据处理框架,专用于大规模数据处理和数据分析。通过 PySpark,Python 开发者,即便不去深入了解分布式计算的复杂性,也可以方便地进行大规模数据分析和数据挖掘工作。
PySpark 主要包含以下这些组件:
- Spark SQL:用于执行 SQL 查询以及读取数据的库,支持多种数据格式和存储系统。
- DataFrame API:提供了一个分布式数据集合,使得数据处理和分析更加直观和高效。
- MLlib:用于进行机器学习的库。
- GraphX:用于图形处理的库(在 PySpark 中通过第三方库如 GraphFrames 访问)。
- Spark Streaming:用于实时数据流处理的库。
其中数据库查询是最为常用的功能,所以我们这一章主要介绍数据库查询。
配置 PySpark 的运行环境
安装 PySpark
要使用 PySpark,首先需要安装 Spark。可以通过 pip 安装 PySpark:
pip install pyspark
PySpark 与数据库连接通常是通过 JDBC(Java Database Connectivity)完成的,JDBC 是一种用于执行 SQL 操作的 Java API,可以让你从 Spark 程序中访问几乎所有的关系型数据库。Spark 本身是使用 Java 语言编写的,PySpark 只 是为 Spark 包装了一层 Python 接口函数。所以在使用 PySpark 的时候,还是会调用到底层的 Java 库。所以,除了安装 PySpark 本身之外,还需要有目标数据库的 JDBC 驱动程序。例如,如果想连接到 MySQL 数据库,那么需要下载 MySQL 的 JDBC 驱动并将其放在类路径中。
SparkSession 创建和关闭
SparkSession 是 Spark 2.0 以后引入的概念,它是程序执行的入口,用于配置 Spark 的各种设置(如 master URL)和初始化 Spark 应用。
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.appName("DatabaseTableExample") \
.config("spark.some.config.option", "some-value") \
.getOrCreate()
拿到 spark
这个对象之后,我们就可以通过它来调用 Spark 的各种功能了。具体有哪些功能后文会详细介绍。
在程序的最后,不要忘记关闭 SparkSession。
spark.stop()
在 AWS 中使用 PySpark
AWS 在很多服务中都已经集成了 PySpark,可以直接使用,非常方便。最为常用的两个服务一个是 AWS Glue;另一个是 AWS Athena。Glue 功能比较繁杂,可完成各类数据集成服务;Athena 则专注于数据查询和处理。这两个服务都提供了类似 Jupyter Notbook 的编程环境,并且它们 内置了对 SparkSession 的管理,用户不在需要自己配置。在它们提供的编程环境中, spark
已经是一个直接可用的全局对象,直接调用即可。下文中的示例程序都是在 AWS Athena Notebook 中调试的。
连接和打开数据库
在 AWS 的服务中,如果已经创建好了“数据库”(Database)和“表格”(Table),那么就可以直接使用了。比如我们已经有了一个名为“my_db”的数据库,和一张名为“my_table”的表格,那么运行下面的代码就可以打印出表格中的内容了:
spark.sql("use my_db") # 打开名为 my_db 的数据库
spark.sql( # 调用 SQL 语句,查询 my_table 表格中的内容
"""
SELECT *
FROM my_table
"""
).show() # 调用 show 方法打印表格中的数据
关于 sql 方法的是有,下文还会详细阐述。show 方法用于显示查询得到的数据,默认情况下,它显示数据的前 20 行。如果需要多显示一些行,只要把需要行数传入即可比如 show(30)
。
在 AWS 服务中,数据库中的数据都是以文件形式,保存在 S3 服务中的,所以很多时候,我们不需要配置数据库和表格,直接从文件读取数据即可。比如,某一表格,以 qarquet 的格式(AWS 上最常用的一种数据库存储格式)保存在路径为“s3://my_bucket/my_db/my_table/”的 S3 文件夹中,那么就可以使用下面的语句,读取数据:
df = spark.read.parquet("s3://my_bucket/my_db/my_table/")
如果不是在 AWS 服务上,而是自己的电脑中运行 PySpark,链接数据库要稍微复杂一下。可以通过 DataFrameReader
类的 jdbc
方法连接数据库,我们需要指定 JDBC 连接的 URL,用户名和密码,以及要访问的表名:
# 数据库连接参数
jdbc_url = "jdbc:mysql://my_database_host:port/database_name"
connection_properties = {
"user": "my_database_username",
"password": "my_database_password",
"driver": "com.mysql.jdbc.Driver"
}
# 读取数据库表
df = spark.read.jdbc(url=jdbc_url, table="my_table_name", properties=connection_properties)
PySpark 程序的两种编写方法
我们可以采用两种风格迥然不同的方式来编写 PySpark 代码,一种是直接使用 SQL 语句,另一种方式是直接调用 PySpark 提供的 API。我们一个最简的程序为例,来看看这两种写法有什么不同。假设我们已经有了两张表格,一张是 student_info 表,里面记录了学生的学号、姓名、家庭地址等信息;另一张表是 quize_score,里面记录了学生的学号,以及某次考试成绩。合并两张表的数据就可以得到一个新的表,包括学生姓名和成绩。
通过 Spark SQL 语句编写的程序如下:
spark.sql("""
SELECT student_info_view.name, quize_score.score
FROM student_info
JOIN quize_score
ON student_info.student_id = quize_score.student_id
""").show()
使用 PySpark API 实现同样功能的代码如下:
student_info.join(
quize_score, student_info.student_id == quize_score.student_id
).select(
"student_info.name", "quize_score.score"
).show()
直接调用 SQL 语句最大的好处是,可以直接把 SQL 语句拿到不同的环境中去执行。比如说在使用 AWS Athena 服务的时候,同样的 SQL 即可以在 Athena Notebook 程序中使用,也可以直接在 Athena Query 的交互环境中运行,可移植性非常好。不过 Spark 在执行程序的时候,是要先把 SQL 语句翻译成自身的 API 调用在执行的。用户无法控制这一翻译的过程,也许 Spark 的翻译对于某些问题并不是最优化的方式。如果直接使用 Spark API 编写程序,可以更加灵活,可以根据需求设计出最优化的程序。