首页 > 编程知识 正文

flink和hadoop关系,past软件中文教程

时间:2023-05-03 10:55:48 阅读:41070 作者:3448

从这个名称pyspark可以看出,我们将python和spark结合使用。

我相信你在这个时候电脑上已经配备了hadoop、spark、python3。

那么,现在开始稍微调查一下pyspark吧。 当然,如果不想知道直接找pyspark的使用方法,

1 .背景:

他出生于加州大学伯克利分校的AMP实验室,2013年6月成为名为Apache的孵化项目,并以Scala语言实现。 Scala是在JAVA之上建立的。

你为什么要设计这样的东西?

为了改善Hadoop的MAP REDUCE的弱点:

1 .交互和迭代

2 .在群集多点内存中执行的分布式计算

3 .容错数据集

为什么要用SPARK?

1 .先进的大数据分布式编程和计算框架

2 .使用视图而不是Hadoop (spark可以独立于Hadoop使用,但不能替代Hadoop )。 因为Hadoop至今仍然很重要) ) ) )。

3 .内存分布式计算:执行次数快

4 .可以用不同的语言编程(java、scala、r、python )。

5 .可以从不同的数据源检索数据

从HDFS、Cassandea、HBase等

可以同时支持多种文件格式:文本seq Avro parquet

6 .实现不同的大数据功能:Spark Core、Sparc SQL等

2 .主要零部件

1.spark core :包含spark的主要基本功能,所有与Rdd相关的API都来自spark core

2.spark sql :spark用于结构语言处理的软件包。 用户可以在soark环境中使用sql语言处理数据

等等(其他不介绍) )。

介绍spark core

spark生态圈的核心:

负责数据的读取

完成分布式计算

2 .包含两个重要部件

到没有环形图(DAG )的分布式并行计算框架

容错分布式数据rdd (resilientdistributeddataset ) )。

3 .总体上是在spark功能日程管理中心,定义和管理RDD。 RDD表示一系列的数据集合分散在矩阵的存储器中,spark core的任务是分散计算这些数据

4.RDD (重点) :

分布式数据集分布在不同群集节点的内存中,并被理解为大数组。 数组中的每个元素都是RDD的分区,其中一个RDD是分布式的,并计算在多态计算机节点的内存和硬盘中。

RDD数据块可以位于磁盘上,也可以位于内存中(取决于设置)。 如果缓冲区无效或丢失,RDD分区可以重新计算并刷新。 不能更改RDD,但可以用API转换生成新的RDD。

对两种RDD的操作(也称为运算符) :

1 .转换(懒惰的执行) :包括mapflatmapgroupbykeyreducebykey等

他们在需要操作时实际计算结果,而不是立即运行一些指令集

2 .操作(立即执行) :包括计数标记选择等

他们返回结果或输出RDD数据

这些操作实现了MapReduce的基本函数map、reduce以及计算模型,还提供了过滤器、join、groupBYKey等,另外spark sql操作了具有数据结构的rdd——spark data frame

工作原理与mapreduce相同,但他们的工作方法不同。 mr的运算是存储器磁盘的读写,不能在存储器内共享数据,但RDD可以共享并持久化。 大数据运算总是交互式的、反复的,所以数据的复用性很重要,但mr的磁盘读写引起的I/O开销会导致数度的降低

废话这么多了开始表演了!

首先需要启动hadoop和spark

然后在命令行中输入:

jupyter-notebook-- IP 192.168.50.129-- IP后跟你此时的IP。 这样,我们就能得到网站:

然后,复制并在浏览器中打开它,进入jupyter页面。 单击new,python3创建文件

必须首先部署py4j:

其实就是在python3里面导入了spark和sc模块


要注意了:下图红框里面的要对应你spark/python/lib里面的文件

import osimport sysspark_name = os.environ.get('SPARK_HOME',None)if not spark_name: raise ValueErrorError('spark环境没有配置好')sys.path.insert(0,os.path.join(spark_name,'python'))sys.path.insert(0,os.path.join(spark_name,'python/lib/py4j-0.10.4-src.zip'))exec(open(os.path.join(spark_name,'python/pyspark/shell.py')).read())

现在我们就可以使用pyspark了:



    请注意,spark在交互式shell下运行时候,这里的sc即SparkContext 的一个实例已经自动生成了,这是因为pyspark shell本身就是spark应用的driver程序,而driver程序包含应用的main函数定义RDD并在计算机集群上进行各种操作,所以一旦获得SparkContext object 即sc ,driver就可以访问spark了,因此sc可以看成是driver对计算机集群的连接.

    spark里面的core里面的RDD有俩个组织,一个为driver另一个为worker,有点像hadoop里面的namenode和datanode,所以driver只能有一个而worker可以为多个.driver负责获取数据,管理worker,所以worker就负责工作.

 

有俩种类型的RDD:

       1. 并行集合:来自与分布式化的数据对象,比如我们上面的代码,python里面的list对象,再比如用户自己键入的数据

        并行化RDD就是通过调用sc的parallelize方法,在一个已经存在的数据集合上创建的(一个Seq对象),集合的对象将会被拷贝,创建出一个可以被并行操作的分布式数据集,比如上面的代码,演示了如何python中的list创建一个并行集合,并进行分行

       2. 文件系统数据集读取数据

        spark可以将任何hadoop所支持的存储资源转换称RDD,如本地文件(语言网络文件系统),索引的节点都必须能访问到,HDFS,mongodb,HBase,等

下面我们开始使用文件系统集读取数据,首先在hello里面上传一个文件,比如:


开始写代码:

lines = sc.textFile("hdfs://python2:9000/hello/data.csv")

既然我们已经获取了数据那就开始操作:

1.map()

    他的参数是一个函数(支持lambda函数),函数应用于RDD的每一个元素,函数的参数只能有一个,返回值是一个新的RDD

2.flatMap()

    参数是一个函数,函数应用为RDD的每一个元素,参数只有1个,将数据进行拆分,变成迭代器,返回值是一个新的RDD  


如上图,可以进一步的将数据拆分出来,也可以进行添加一些别的操作,比如:


3.filter()

    参数是一个函数,与python里面的filter一样,函数会过滤掉不符号和条件的元素,返回值是一个新的RDD

4.reduce()

    并行汇总所有RDD元素,参数是一个函数,函数的参数有2个




5.countByValue()

    各RDD元素在RDD中出现的次数


也可以像reduceByKey()变换每一组内汇总


6.reduceByKey()

    在每一键组内汇总变换,查看每一个数据在文件里面出现的次数


7.sortByKey()

    未完待续...

附链接:

    spark RDD的使用



版权声明:该文观点仅代表作者本人。处理文章:请发送邮件至 三1五14八八95#扣扣.com 举报,一经查实,本站将立刻删除。