首页 > 编程知识 正文

手把手教简谱凡老师教全课程,手把手教吉他的图片

时间:2023-05-03 19:02:21 阅读:41069 作者:2450

PySpark数据科学入门

PySpark是一种优秀的语言,可以进行探索性的数据分析、机器学习流水线的构建,以及为数据平台大规模创建ETL。 熟悉Python和Pandas等库时,PySpark是创建可扩展分析和流水线的好语言。 本文的目的是演示如何启动和运行PySpark以执行常见任务。

使用数据积木作为Spark环境,使用Kaggle的NHL数据集作为分析的数据源。 本文介绍如何将数据写入Spark数据帧、创建这些帧的变换和聚合、可视化结果以及执行线性回归。 此外,还演示了如何使用Pandas UDF以可扩展的方式混合使用常规Python代码和PySpark。 为简便起见,集中精力进行批处理,避免流数据管线中出现的复杂问题。

这篇文章的完整笔记在github上。 (https://github.com/BG Weber/startupdatascience/blob/master/EDA/py spark _ NHL.ipynb )

环境

启动和运行Spark有多种选项。

自我管理:可以使用裸机或虚拟机自行设置群集。 Apache Ambari是此选项的有用项目,但不建议快速启动和运行。 云提供程序:大多数云提供程序都提供Spark群集。 AWS有EMR,GCP有DataProc,可以比自助托管更快地进入交互式环境。 供应商解决方案Databricks和Cloudera等公司提供了便于Spark启动和运行的Spark解决方案。 使用的解决方案取决于安全性、成本和现有基础架构。 如果您尝试使用学习所需的环境运行,建议使用data积木通信版。

在数据积木社区版中创建PySpark群集

使用此环境,可以轻松启动和运行Spark群集和笔记本电脑环境。 在本教程中,您使用Spark 2.4运行时和Python 3创建了群集。 至少需要spark 2.3版才能运行本文中的代码。 Pandas UDF功能需要spark 2.3版。

Spark 数据帧

PySpark使用的重要数据类型是Spark数据帧。 可以将此对象视为一个分布在群集中的表,该表类似于r和Pandas数据帧。 如果希望使用PySpark进行分布式计算,则必须对Spark数据帧而不是其他python数据类型执行操作。

如果使用Spark,还可以使用通过在Spark数据帧中调用toPandas ()返回Pandas对象的Pandas数据帧。 但是,通常不要使用此函数,除非要处理小数据帧,因为它会将整个对象引入单个节点的内存中。

Pandas和Spark数据帧之间的主要区别之一是紧急和延迟运行。 PySpark会延迟操作,直到管线实际需要结果。 例如,可以指定从S3加载数据集并将多个转换应用于数据帧的操作,但不会立即应用。 相反,如果记录转换映射并在实际需要数据时将结果写回S3,则转换将作为单个管道操作应用。 该方法用于防止将完整的数据帧引入内存,从而在整个计算机集群中实现更高效的处理。 使用Pandas数据帧时,所有内容都将被拉入内存中,并立即应用每个Pandas操作。

通常,如果可能的话,最好避免在Spark上紧急操作。 这是因为有效分配的管道数量受到限制。

阅读数据

使用Spark的第一步是将数据集加载到数据帧中。 将数据加载到数据帧后,可以应用变换、执行分析和建模、创建可视化以及保留结果。 在Python中,可以使用Pandas直接从本地文件系统加载文件。

在PySpark中,加载CSV文件更加复杂。 在分布式环境中,由于没有本地存储,因此必须使用分布式文件系统(如HDFS、数据积木文件存储(DBFS )或S3 )指定文件的路径。

通常,使用PySpark时,使用S3的数据。 许多数据库提供了卸载S3功能,还可以使用AWS控制台将文件从本地计算机移动到S3。 本文使用data积木文件系统(DBFS ),该文件系统以/FileStore的形式提供路径。 第一步是上传要处理的CSV文件。

将文件上传到data积木文件存储

下一步是将CSV文件导入到Spark数据帧中,如下所示: 该代码段指定CSV文件的路径,并将许多参数传递给read函数以处理该文件。 最后一步显示加载的数据帧的子集,就像Pandas中的df.head ()一样。

使用Spark时,建议使用parquet格式,因为它是一种文件格式,包含有关列数据类型的元数据,提供文件压缩并与Spark配合使用。 AVRO是另一种适合Spark的格式。 以下代码段说明如何从以前的代码段中检索数据帧,将其存储为DBFS上的parquet文件,以及如何从保存的parque中检索数据帧

t文件中重新加载数据帧。

 

此步骤的结果是相同的,但执行流程明显不同。当将CSV文件读入数据帧时,Spark以急切模式执行操作,这意味着在下一步开始执行之前将所有数据加载到内存中,已经为大家精心准备了大数据的系统学习资料,从Linux-Hadoop-spark-......,需要的小伙伴可以点击而在读取parquet格式的文件时使用惰性方法。通常,您希望在使用Spark时避免急切操作,如果我需要处理大型CSV文件,将首先把数据集转换为parquet格式,然后再执行其余的管道。

通常,您需要处理大量文件,例如位于DBFS中某个路径或目录的数百个parquet文件。使用Spark,您可以在路径中包含通配符来处理文件集合。例如,您可以从S3加载一批parquet文件,如下所示:

 

如果您每天都有一个单独的parquet文件,或者如果您的管道中有一个先前的步骤会输出数百个parquet文件,则此方法很有用。

如果要从数据库(例如Redshift)读取数据,最好先将数据卸载到S3,然后再使用Spark进行处理。在Redshift中,卸载命令可用于将数据导出到S3进行处理:

 

还有用于数据库的库,例如spark-redshift,使这个过程更容易执行。

写数据

与使用Spark读取数据类似,不建议在使用PySpark时将数据写入本地存储。相反,您应该使用分布式文件系统,如S3或HDFS。如果您要使用Spark处理结果,则parquet是用于保存数据框架的良好格式。下面的代码段显示了如何将数据帧保存为DBFS和S3作为parquet。

 

以parquet格式保存数据帧时,通常将其划分为多个文件,如下图所示。

将数据帧保存到DBFS时生成的parquet文件

如果您需要CSV文件中的结果,则需要稍微不同的输出步骤。这种方法的主要区别之一是所有数据在输出到CSV之前将被拉到单个节点。当您需要保存小型数据帧并在Spark之外的系统中处理它时,建议使用此方法。下面的代码段显示了如何将数据帧保存为DBFS和S3上的单个CSV文件。

 

Spark脚本的另一个常见输出是NoSQL数据库,如Cassandra、DynamoDB或Couchbase。这超出了本文的范围,但我过去看过的一种方法是将数据帧写入S3,已经为大家精心准备了大数据的系统学习资料,从Linux-Hadoop-spark-......,需要的小伙伴可以点击然后启动一个加载过程,告诉NoSQL系统从S3上的指定路径加载数据。

我也省略了对流式输出源的写入,如Kafka或Kinesis。这些系统在使用Spark流时更有用。

转换数据

可以在Spark数据帧上执行许多不同类型的操作,就像可以在Pandas数据帧上应用的各种操作一样。在Spark数据帧上执行操作的方法之一是通过Spark SQL,它可以像查询表一样查询数据帧。下面的代码段显示了如何在数据集中找到最高得分的玩家。

 

结果是玩家ID、游戏次数和这些游戏中的总进球数列表。如果我们想要显示播放器的名称,那么我们需要加载一个额外的文件,使其可作为临时视图,然后使用Spark SQL加入它。

数据集中得分最高的玩家

在上面的代码片段中,我使用display命令输出数据集的样本,但也可以将结果分配给另一个数据帧,这可以在管道的后续步骤中使用。下面的代码显示了如何执行这些步骤,其中第一个查询结果被分配给新的数据帧,然后将其分配给临时视图并与一组播放器名称连接。

 

这个过程的结果如下所示,根据Kaggle数据集确定yldsh Ovechkin是NHL中的得分最高的球员。

使用Spark SQL连接数据帧的进程的输出

对于常见任务,有Spark数据帧操作,例如添加新列、删除列、执行连接以及计算聚合和分析统计信息,但是在开始使用时,使用Spark 已经为大家精心准备了大数据的系统学习资料,从Linux-Hadoop-spark-......,需要的小伙伴可以点击SQL执行这些操作可能更容易。此外,如果您已经使用PandaSQL或framequery等库来使用SQL操作Pandas数据帧,那么将代码从Python移植到PySpark会更容易。

与Spark数据帧上的大多数操作一样,Spark SQL操作以延迟执行模式执行,这意味着在需要结果之前不会评估SQL步骤。 Spark SQL提供了一种挖掘PySpark的好方法,而无需首先学习数据帧的新库。

如果您正在使用Databricks,您还可以直接在笔记本中创建可视化,而无需显式使用可视化库。例如,我们可以使用下面的Spark SQL代码绘制每个游戏的平均目标数。

 

Databricks笔记本中显示的初始输出是结果表,但我们可以使用绘图功能将输出转换为不同的可视化,例如下面显示的条形图。这种方法不支持数据科学家可能需要的每个可视化,但它确实使得在Spark中执行探索性数据分析变得更加容易。如果需要,我们可以使用toPandas()函数在驱动程序节点上创建Pandas数据帧,这意味着任何Python绘图库都可用于可视化结果。但是,这种方法应仅用于小型数据帧,因为所有数据都被急切地提取到驱动程序节点上的内存中。

2月和3月的平均每场比赛进球数

对于至少进5球的球员,我也考察了每次射门的平均进球数。

 

此转换的结果如下图所示。大多数至少有5个进球的球员在4%到12%的时间内完成投篮。

Kaggle数据集中玩家的每次射门目标

MLlib

用于数据科学家的Python的常见用例之一是构建预测模型。虽然scikit-learn在使用pandas时非常棒,但它不能扩展到分布式环境中的大型数据集(尽管有很多方法可以与Spark并行化)。在使用PySpark和海量数据集构建预测模型时,MLlib是首选库,因为它本身可以在Spark数据帧上运行。并非所有scikit-learn中的算法都可以在MLlib中使用,但是有很多选项可以涵盖许多用例。

为了在MLib中使用一种监督算法,您需要使用特征向量和标签作为标量来设置数据帧。准备好后,您可以使用fit函数来训练模型。下面的代码段显示了如何使用VectorAssembler将数据框中的多个列组合成单个要素向量。我们使用结果数据帧调用fit函数,然后生成模型的摘要统计信息。

 

该模型根据射击次数、游戏时间和其他因素预测玩家将获得多少目标。然而,该模型的性能较差,导致均方根误差(RMSE)为0.375,R平方值为0.125。具有最大值的系数是镜头列,但是这并没有提供足够的信号以使模型准确。

使用PySpark构建ML管道时需要考虑许多其他步骤,包括训练和测试数据集,超参数调整和模型存储。上面的代码段只是开始使用MLlib的起点。

Pandas UDF

我最近使用的Spark中的一个功能是Pandas用户定义函数(UDF),它使您能够在Spark环境中使用Pandas数据帧执行分布式计算。这些UDF的一般工作方式是首先使用groupby语句对Spark数据帧进行分区,并将每个分区发送到工作节点并转换为传递给UDF的Pandas数据帧。然后,UDF返回转换后的Pandas数据帧,该数据帧与所有其他分区组合,然后转换回Spark数据帧。最终结果非常有用,您可以使用需要Pandas的Python库,现在也可以扩展到海量数据集,只要您有一种分区数据帧的好方法。 Pandas UDF是在Spark 2.3中引入的,我将讨论在Spark Summit 2019期间我们如何在Zynga使用此功能。

曲线拟合是我作为数据科学家执行的常见任务。下面的代码片段显示了如何执行曲线拟合来描述玩家在游戏过程中记录的击球次数和击球次数之间的关系。该片段显示了我们如何通过在过滤到单个播放器的数据集上调用toPandas()来为单个玩家执行此任务。该步骤的输出是两个参数(线性回归系数),他们试图描述这些变量之间的关系。

 

如果我们想为每个玩家计算这条曲线并拥有一个海量数据集,那么由于内存不足异常,toPandas()调用将失败。我们可以通过调用player_id上的groupby(),然后应用下面显示的Pandas UDF,将此操作扩展到整个数据集。该函数将Pandas数据框作为输入,描述单个玩家的游戏统计数据,并返回包含player_id和拟合系数的摘要数据帧。然后将每个摘要Pandas数据帧组合成一个Spark数据帧,该数据帧显示在代码片段的末尾。使用Pandas UDF的另一个设置是为结果数据帧定义模式,其中模式描述了从应用步骤生成的Spark数据帧的格式。

 

此过程的输出如下所示。我们现在有一个数据帧,总结了每个玩家的曲线拟合,并且可以在海量数据集上运行此操作。在处理大量数据集时,选择或生成分区密钥,以在数据分区的数量和大小之间实现良好的权衡。

来自Pandas UDF的输出,显示每位玩家的曲线拟合

最佳实践

我已经介绍了使用PySpark的一些常见任务,但也希望提供一些建议,使其更容易从Python到PySpark。以下是我根据在这些环境之间移植一些项目的经验收集的一些最佳实践:

避免使用库,使用数据帧:使用诸如字典之类的Python数据类型意味着代码可能无法在分布式模式下执行。与其使用键来索引字典中的值,不如考虑将另一列添加到可用作过滤器的数据帧中。谨慎使用toPandas:调用toPandas()将导致所有数据被加载到驱动程序节点上的内存中,并阻止在分布式模式下执行操作。当数据已经聚合并且您想要使用熟悉的Python绘图工具时,可以使用此函数,但它不应该用于大型数据帧。避免for循环:如果可能,最好使用groupby-apply模式重写for循环逻辑以支持并行化代码执行。我注意到,专注于在Python中使用这种模式也导致清理代码更容易转换为PySpark。尝试最小化急切操作:为了使您的管道尽可能具有可扩展性,最好避免将整个数据帧拉入内存的急切操作。我注意到用CSV读取是一个急切的操作,我的工作是将数据帧保存为parquet,然后从parquet重新加载以构建更具可扩展性的管道。使用framequery / pandasql可以更轻松地进行移植:如果您正在使用其他人的Python代码,那么解读一些Pandas操作正在实现的内容可能会很棘手。如果您计划将代码从Python移植到PySpark,那么使用Pandas的SQL库可以使这种转换更容易。

我发现在PySpark中编写代码的时间也因Python编码技巧而得到改善。

结论

PySpark是数据科学家学习的理想语言,因为它支持可扩展的分析和ML管道。如果您已经熟悉Python和Pandas,那么您的大部分知识都可以应用于Spark。我已经展示了如何使用PySpark执行一些常见的操作来引导学习过程。我还展示了一些最近使用Pandas UDF的Spark功能,它使Python代码能够以分布式模式执行。有很好的环境可以让你轻松启动和运行Spark集群,现在正是学习PySpark的好时机!

版权声明:该文观点仅代表作者本人。处理文章:请发送邮件至 三1五14八八95#扣扣.com 举报,一经查实,本站将立刻删除。