当前所在位置: 首页 > 教程大全 > 正文

apache hive 2.1 安装教程 hive3.1.2安装教程

日期:2023-11-07    作者:康康 浏览量:0    【 字体:

各位老铁们好,相信很多人对apache都不是特别的了解,因此呢,今天就来为大家分享下关于apache以及hive3.1.2安装教程的问题知识,还望可以帮助大家,解决大家的一些困惑,下面一起来看看吧!

本文目录:

  1. 新睿云服务器上Hive是什么?
  2. ods使用教程
  3. 大数据与Hadoop之间是什么关系?
  4. 最近在学习pyspark,有入门指南吗?

新睿云服务器上Hive是什么?

Hive是一个数据仓库基础工具在Hadoop中用来处理结构化数据。它架构在Hadoop之上,总归为大数据,并使得查询和分析方便。最初,Hive是由Facebook开发,后来由Apache软件基金会开发,并作为进一步将它作为名义下ApacheHive为一个开源项目。它用在好多不同的公司。例如,亚马逊使用它在AmazonElasticMapReduce。

ods使用教程

ods的使用教程如下

1、点击工具,在下拉菜单中点击虚拟摄像头。

2、打开自动启动的按钮,再点击应用。

3、选择要添加虚拟摄像头的场景和来源。

4、上方显示内容即是虚拟摄像头显示的内容。

5、在会议软件中选择该摄像头即可展示内容。

6、在实时的语音通话中,也可以找到该虚拟摄像头,点击使用即可。

大数据与Hadoop之间是什么关系?

大数据,官方定义是指那些数据量特别大、数据类别特别复杂的数据集,这种数据集无法用传统的数据库进行存储,管理和处理。大数据的主要特点为数据量大(Volume),数据类别复杂(Variety),数据处理速度快(Velocity)和数据真实性高(Veracity),合起来被称为4V。

大数据中的数据量非常巨大,达到了PB级别。而且这庞大的数据之中,不仅仅包括结构化数据(如数字、符号等数据),还包括非结构化数据(如文本、图像、声音、视频等数据)。这使得大数据的存储,管理和处理很难利用传统的关系型数据库去完成。在大数据之中,有价值的信息往往深藏其中。这就需要对大数据的处理速度要非常快,才能短时间之内就能从大量的复杂数据之中获取到有价值的信息。在大数据的大量复杂的数据之中,通常不仅仅包含真实的数据,一些虚假的数据也混杂其中。这就需要在大数据的处理中将虚假的数据剔除,利用真实的数据来分析得出真实的结果。

大数据分析(BigDataAnalysis)

大数据,表面上看就是大量复杂的数据,这些数据本身的价值并不高,但是对这些大量复杂的数据进行分析处理后,却能从中提炼出很有价值的信息。对大数据的分析,主要分为五个方面:可视化分析(AnalyticVisualization)、数据挖掘算法(DateMiningAlgorithms)、预测性分析能力(PredictiveAnalyticCapabilities)、语义引擎(SemanticEngines)和数据质量管理(DataQualityManagement)。

可视化分析是普通消费者常常可以见到的一种大数据分析结果的表现形式,比如说百度制作的“百度地图春节人口迁徙大数据”就是典型的案例之一。可视化分析将大量复杂的数据自动转化成直观形象的图表,使其能够更加容易的被普通消费者所接受和理解。

数据挖掘算法是大数据分析的理论核心,其本质是一组根据算法事先定义好的数学公式,将收集到的数据作为参数变量带入其中,从而能够从大量复杂的数据中提取到有价值的信息。著名的“啤酒和尿布”的故事就是数据挖掘算法的经典案例。沃尔玛通过对啤酒和尿布购买数据的分析,挖掘出以前未知的两者间的联系,并利用这种联系,提升了商品的销量。亚马逊的推荐引擎和谷歌的广告系统都大量使用了数据挖掘算法。

预测性分析能力是大数据分析最重要的应用领域。从大量复杂的数据中挖掘出规律,建立起科学的事件模型,通过将新的数据带入模型,就可以预测未来的事件走向。预测性分析能力常常被应用在金融分析和科学研究领域,用于股票预测或气象预测等。

语义引擎是机器学习的成果之一。过去,计算机对用户输入内容的理解仅仅停留在字符阶段,不能很好的理解输入内容的意思,因此常常不能准确的了解用户的需求。通过对大量复杂的数据进行分析,让计算机从中自我学习,可以使计算机能够尽量精确的了解用户输入内容的意思,从而把握住用户的需求,提供更好的用户体验。苹果的Siri和谷歌的GoogleNow都采用了语义引擎。

数据质量管理是大数据在企业领域的重要应用。为了保证大数据分析结果的准确性,需要将大数据中不真实的数据剔除掉,保留最准确的数据。这就需要建立有效的数据质量管理系统,分析收集到的大量复杂的数据,挑选出真实有效的数据。

分布式计算(DistributedComputing)

对于如何处理大数据,计算机科学界有两大方向:第一个方向是集中式计算,就是通过不断增加处理器的数量来增强单个计算机的计算能力,从而提高处理数据的速度。第二个方向是分布式计算,就是把一组计算机通过网络相互连接组成分散系统,然后将需要处理的大量数据分散成多个部分,交由分散系统内的计算机组同时计算,最后将这些计算结果合并得到最终的结果。尽管分散系统内的单个计算机的计算能力不强,但是由于每个计算机只计算一部分数据,而且是多台计算机同时计算,所以就分散系统而言,处理数据的速度会远高于单个计算机。

过去,分布式计算理论比较复杂,技术实现比较困难,因此在处理大数据方面,集中式计算一直是主流解决方案。IBM的大型机就是集中式计算的典型硬件,很多银行和政府机构都用它处理大数据。不过,对于当时的互联网公司来说,IBM的大型机的价格过于昂贵。因此,互联网公司的把研究方向放在了可以使用在廉价计算机上的分布式计算上。

服务器集群(ServerCluster)

服务器集群是一种提升服务器整体计算能力的解决方案。它是由互相连接在一起的服务器群所组成的一个并行式或分布式系统。服务器集群中的服务器运行同一个计算任务。因此,从外部看,这群服务器表现为一台虚拟的服务器,对外提供统一的服务。

尽管单台服务器的运算能力有限,但是将成百上千的服务器组成服务器集群后,整个系统就具备了强大的运算能力,可以支持大数据分析的运算负荷。Google,Amazon,阿里巴巴的计算中心里的服务器集群都达到了5000台服务器的规模。

大数据的技术基础:MapReduce、GoogleFileSystem和BigTable

2003年到2004年间,Google发表了MapReduce、GFS(GoogleFileSystem)和BigTable三篇技术论文,提出了一套全新的分布式计算理论。

MapReduce是分布式计算框架,GFS(GoogleFileSystem)是分布式文件系统,BigTable是基于GoogleFileSystem的数据存储系统,这三大组件组成了Google的分布式计算模型。

Google的分布式计算模型相比于传统的分布式计算模型有三大优势:首先,它简化了传统的分布式计算理论,降低了技术实现的难度,可以进行实际的应用。其次,它可以应用在廉价的计算设备上,只需增加计算设备的数量就可以提升整体的计算能力,应用成本十分低廉。最后,它被Google应用在Google的计算中心,取得了很好的效果,有了实际应用的证明。

后来,各家互联网公司开始利用Google的分布式计算模型搭建自己的分布式计算系统,Google的这三篇论文也就成为了大数据时代的技术核心。

主流的三大分布式计算系统:Hadoop,Spark和Storm

由于Google没有开源Google分布式计算模型的技术实现,所以其他互联网公司只能根据Google三篇技术论文中的相关原理,搭建自己的分布式计算系统。

Yahoo的工程师DougCutting和MikeCafarella在2005年合作开发了分布式计算系统Hadoop。后来,Hadoop被贡献给了Apache基金会,成为了Apache基金会的开源项目。DougCutting也成为Apache基金会的主席,主持Hadoop的开发工作。

Hadoop采用MapReduce分布式计算框架,并根据GFS开发了HDFS分布式文件系统,根据BigTable开发了HBase数据存储系统。尽管和Google内部使用的分布式计算系统原理相同,但是Hadoop在运算速度上依然达不到Google论文中的标准。

不过,Hadoop的开源特性使其成为分布式计算系统的事实上的国际标准。Yahoo,Facebook,Amazon以及国内的百度,阿里巴巴等众多互联网公司都以Hadoop为基础搭建自己的分布式计算系统。

Spark也是Apache基金会的开源项目,它由加州大学伯克利分校的实验室开发,是另外一种重要的分布式计算系统。它在Hadoop的基础上进行了一些架构上的改良。Spark与Hadoop最大的不同点在于,Hadoop使用硬盘来存储数据,而Spark使用内存来存储数据,因此Spark可以提供超过Hadoop100倍的运算速度。但是,由于内存断电后会丢失数据,Spark不能用于处理需要长期保存的数据。

Storm是Twitter主推的分布式计算系统,它由BackType团队开发,是Apache基金会的孵化项目。它在Hadoop的基础上提供了实时运算的特性,可以实时的处理大数据流。不同于Hadoop和Spark,Storm不进行数据的收集和存储工作,它直接通过网络实时的接受数据并且实时的处理数据,然后直接通过网络实时的传回结果。

Hadoop,Spark和Storm是目前最重要的三大分布式计算系统,Hadoop常用于离线的复杂的大数据处理,Spark常用于离线的快速的大数据处理,而Storm常用于在线的实时的大数据处理。

最近在学习pyspark,有入门指南吗?

Spark提供了一个Python_Shell,即pyspark,从而可以以交互的方式使用Python编写Spark程序。

有关Spark的基本架构介绍参考http://blog.csdn.net/cymy001/article/details/78483614;

有关Pyspark的环境配置参考http://blog.csdn.net/cymy001/article/details/78430892。

pyspark里最核心的模块是SparkContext(简称sc),最重要的数据载体是RDD。RDD就像一个NumPyarray或者一个PandasSeries,可以视作一个有序的item集合。只不过这些item并不存在driver端的内存里,而是被分割成很多个partitions,每个partition的数据存在集群的executor的内存中。

引入Python中pyspark工作模块

importpyspark

frompysparkimportSparkContextassc

frompysparkimportSparkConf

conf=SparkConf().setAppName("miniProject").setMaster("local[*]")

sc=SparkContext.getOrCreate(conf)

#任何Spark程序都是SparkContext开始的,SparkContext的初始化需要一个SparkConf对象,SparkConf包含了Spark集群配置的各种参数(比如主节点的URL)。初始化后,就可以使用SparkContext对象所包含的各种方法来创建和操作RDD和共享变量。Sparkshell会自动初始化一个SparkContext(在Scala和Python下可以,但不支持Java)。

#getOrCreate表明可以视情况新建session或利用已有的session

1

2

3

4

5

6

7

SparkSession是Spark2.0引入的新概念。SparkSession为用户提供了统一的切入点,来让用户学习spark的各项功能。在spark的早期版本中,SparkContext是spark的主要切入点,由于RDD是主要的API,我们通过sparkcontext来创建和操作RDD。对于每个其他的API,我们需要使用不同的context。例如,对于Streming,我们需要使用StreamingContext;对于sql,使用sqlContext;对于hive,使用hiveContext。但是随着DataSet和DataFrame的API逐渐成为标准的API,就需要为他们建立接入点。所以在spark2.0中,引入SparkSession作为DataSet和DataFrameAPI的切入点。SparkSession实质上是SQLContext和HiveContext的组合(未来可能还会加上StreamingContext),所以在SQLContext和HiveContext上可用的API在SparkSession上同样是可以使用的。SparkSession内部封装了SparkContext,所以计算实际上是由SparkContext完成的。

初始化RDD的方法

(1)本地内存中已经有一份序列数据(比如python的list),可以通过sc.parallelize去初始化一个RDD。当执行这个操作以后,list中的元素将被自动分块(partitioned),并且把每一块送到集群上的不同机器上。

importpyspark

frompysparkimportSparkContextassc

frompysparkimportSparkConf

conf=SparkConf().setAppName("miniProject").setMaster("local[*]")

sc=SparkContext.getOrCreate(conf)

#(a)利用list创建一个RDD;使用sc.parallelize可以把Pythonlist,NumPyarray或者PandasSeries,PandasDataFrame转成SparkRDD。

rdd=sc.parallelize([1,2,3,4,5])

rdd

#Output:ParallelCollectionRDD[0]atparallelizeatPythonRDD.scala:480

#(b)getNumPartitions()方法查看list被分成了几部分

rdd.getNumPartitions()

#Output:4

1

2

3

4

5

6

7

8

9

10

11

12

13

14

#(c)glom().collect()查看分区状况

rdd.glom().collect()

#Output:[[1],[2],[3],[4,5]]

1

2

3

在这个例子中,是一个4-core的CPU笔记本;Spark创建了4个executor,然后把数据分成4个块。colloect()方法很危险,数据量上BT文件读入会爆掉内存……

(2)创建RDD的另一个方法是直接把文本读到RDD。文本的每一行都会被当做一个item,不过需要注意的一点是,Spark一般默认给定的路径是指向HDFS的,如果要从本地读取文件的话,给一个file://开头(windows下是以file:\\开头)的全局路径。

importpyspark

frompysparkimportSparkContextassc

frompysparkimportSparkConf

conf=SparkConf().setAppName("miniProject").setMaster("local[*]")

sc=SparkContext.getOrCreate(conf)

#(a)记录当前pyspark工作环境位置

importos

cwd=os.getcwd()

cwd

#Output:'C:\\Users\\Yu\\0JulyLearn\\5weekhadoopspark'

#(b)要读入的文件的全路径

rdd=sc.textFile("file:\\\\\\"+cwd+"\\names\yob1880.txt")

rdd

#Output:file:\\\C:\Users\Yu\0JulyLearn\5weekhadoopspark\names\yob1880.txtMapPartitionsRDD[3]attextFileatNativeMethodAccessorImpl.java:0

#(c)first()方法取读入的rdd数据第一个item

rdd.first()

#Output:'Mary,F,7065'

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

甚至可以sc.wholeTextFiles读入整个文件夹的所有文件。但是要特别注意,这种读法,RDD中的每个item实际上是一个形如(文件名,文件所有内容)的元组。读入整个文件夹的所有文件。

importpyspark

frompysparkimportSparkContextassc

frompysparkimportSparkConf

conf=SparkConf().setAppName("miniProject").setMaster("local[*]")

sc=SparkContext.getOrCreate(conf)

#记录当前pyspark工作环境位置

importos

cwd=os.getcwd()

cwd

#Output:'C:\\Users\\Yu\\0JulyLearn\\5weekhadoopspark'

rdd=sc.wholeTextFiles("file:\\\\\\"+cwd+"\\names\yob1880.txt")

rdd

#Output:org.apache.spark.api.java.JavaPairRDD@12bcc15

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

rdd.first()

Output:

('file:/C:/Users/Yu/0JulyLearn/5weekhadoopspark/names/yob1880.txt',

1

2

3

4

5

其余初始化RDD的方法,包括:HDFS上的文件,Hive中的数据库与表,SparkSQL得到的结果。这里暂时不做介绍。

RDDTransformation

(1)RDDs可以进行一系列的变换得到新的RDD,有点类似列表推导式的操作,先给出一些RDD上最常用到的transformation:

map()对RDD的每一个item都执行同一个操作

flatMap()对RDD中的item执行同一个操作以后得到一个list,然后以平铺的方式把这些list里所有的结果组成新的list

filter()筛选出来满足条件的item

distinct()对RDD中的item去重

sample()从RDD中的item中采样一部分出来,有放回或者无放回

sortBy()对RDD中的item进行排序

1

2

3

4

5

6

如果想看操作后的结果,可以用一个叫做collect()的action把所有的item转成一个Pythonlist。数据量大时,collect()很危险……

importpyspark

frompysparkimportSparkContextassc

frompysparkimportSparkConf

conf=SparkConf().setAppName("miniProject").setMaster("local[*]")

sc=SparkContext.getOrCreate(conf)

numbersRDD=sc.parallelize(range(1,10+1))

print(numbersRDD.collect())

#map()对RDD的每一个item都执行同一个操作

squaresRDD=numbersRDD.map(lambdax:x**2)#Squareeverynumber

print(squaresRDD.collect())

#filter()筛选出来满足条件的item

filteredRDD=numbersRDD.filter(lambdax:x%2==0)#Onlytheevens

print(filteredRDD.collect())

#Output:

#[1,2,3,4,5,6,7,8,9,10]

#[1,4,9,16,25,36,49,64,81,100]

#[2,4,6,8,10]

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

importpyspark

frompysparkimportSparkContextassc

frompysparkimportSparkConf

conf=SparkConf().setAppName("miniProject").setMaster("local[*]")

sc=SparkContext.getOrCreate(conf)

#flatMap()对RDD中的item执行同一个操作以后得到一个list,然后以平铺的方式把这些list里所有的结果组成新的list

sentencesRDD=sc.parallelize(['Helloworld','MynameisPatrick'])

wordsRDD=sentencesRDD.flatMap(lambdasentence:sentence.split(""))

print(wordsRDD.collect())

print(wordsRDD.count())

#Output:

#['Hello','world','My','name','is','Patrick']

#6

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

对比一下:

这里如果使用map的结果是[[‘Hello’,‘world’],[‘My’,‘name’,‘is’,‘Patrick’]],

使用flatmap的结果是全部展开[‘Hello’,‘world’,‘My’,‘name’,‘is’,‘Patrick’]。

flatmap即对应Python里的如下操作:

l=['Helloworld','MynameisPatrick']

ll=[]

forsentenceinl:

ll=ll+sentence.split("")#+号作用,twolist拼接

ll

1

2

3

4

5

(2)最开始列出的各个Transformation,可以一个接一个地串联使用,比如:

importpyspark

frompysparkimportSparkContextassc

frompysparkimportSparkConf

conf=SparkConf().setAppName("miniProject").setMaster("local[*]")

sc=SparkContext.getOrCreate(conf)

defdoubleIfOdd(x):

ifx%2==1:

return2*x

else:

returnx

numbersRDD=sc.parallelize(range(1,10+1))

resultRDD=(numbersRDD

.map(doubleIfOdd)#map,filter,distinct()

.filter(lambdax:x>6)

.distinct())#distinct()对RDD中的item去重

resultRDD.collect()

#Output:[8,10,18,14]

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

(3)当遇到更复杂的结构,比如被称作“pairRDDs”的以元组形式组织的k-v对(key,value),Spark中针对这种item结构的数据,定义了一些transform和action:

reduceByKey():对所有有着相同key的items执行reduce操作

groupByKey():返回类似(key,listOfValues)元组的RDD,后面的valueList是同一个key下面的

sortByKey():按照key排序

countByKey():按照key去对item个数进行统计

collectAsMap():和collect有些类似,但是返回的是k-v的字典

1

2

3

4

5

importpyspark

frompysparkimportSparkContextassc

frompysparkimportSparkConf

conf=SparkConf().setAppName("miniProject").setMaster("local[*]")

sc=SparkContext.getOrCreate(conf)

rdd=sc.parallelize(["Hellohello","HelloNewYork","Yorksayshello"])

resultRDD=(rdd

.flatMap(lambdasentence:sentence.split(""))

.map(lambdaword:word.lower())

.map(lambdaword:(word,1))#将word映射成(word,1)

.reduceByKey(lambdax,y:x+y))#reduceByKey对所有有着相同key的items执行reduce操作

resultRDD.collect()

#Output:[('hello',4),('york',2),('says',1),('new',1)]

result=resultRDD.collectAsMap()#collectAsMap类似collect,以k-v字典的形式返回

result

#Output:{'hello':4,'new':1,'says':1,'york':2}

resultRDD.sortByKey(ascending=True).take(2)#sortByKey按键排序

#Output:[('hello',4),('new',1)]

#取出现频次最高的2个词

print(resultRDD

.sortBy(lambdax:x[1],ascending=False)

.take(2))

#Output:[('hello',4),('york',2)]

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

RDD间的操作

(1)如果有2个RDD,可以通过下面这些操作,对它们进行集合运算得到1个新的RDD

rdd1.union(rdd2):所有rdd1和rdd2中的item组合(并集)

rdd1.intersection(rdd2):rdd1和rdd2的交集

rdd1.substract(rdd2):所有在rdd1中但不在rdd2中的item(差集)

rdd1.cartesian(rdd2):rdd1和rdd2中所有的元素笛卡尔乘积(正交和)

1

2

3

4

importpyspark

frompysparkimportSparkContextassc

frompysparkimportSparkConf

conf=SparkConf().setAppName("miniProject").setMaster("local[*]")

sc=SparkContext.getOrCreate(conf)

#初始化两个RDD

numbersRDD=sc.parallelize([1,2,3])

moreNumbersRDD=sc.parallelize([2,3,4])

1

2

3

4

5

6

7

8

9

numbersRDD.union(moreNumbersRDD).collect()#union()取并集

#Output:[1,2,3,2,3,4]

numbersRDD.intersection(moreNumbersRDD).collect()#intersection()取交集

#Output:[2,3]

numbersRDD.subtract(moreNumbersRDD).collect()#substract()取差集

#Output:[1]

numbersRDD.cartesian(moreNumbersRDD).collect()#cartesian()取笛卡尔积

#Output:[(1,2),(1,3),(1,4),(2,2),(2,3),(2,4),(3,2),(3,3),(3,4)]

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

(2)在给定2个RDD后,可以通过一个类似SQL的方式去join它们

importpyspark

frompysparkimportSparkContextassc

frompysparkimportSparkConf

conf=SparkConf().setAppName("miniProject").setMaster("local[*]")

sc=SparkContext.getOrCreate(conf)

#Homeofdifferentpeople

homesRDD=sc.parallelize([

('Brussels','John'),

('Brussels','Jack'),

('Leuven','Jane'),

('Antwerp','Jill'),

])

#Qualityoflifeindexforvariouscities

lifeQualityRDD=sc.parallelize([

('Brussels',10),

('Antwerp',7),

('RestOfFlanders',5),

])

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

homesRDD.join(lifeQualityRDD).collect()#join

#Output:

#[('Antwerp',('Jill',7)),

#('Brussels',('John',10)),

#('Brussels',('Jack',10))]

homesRDD.leftOuterJoin(lifeQualityRDD).collect()#leftOuterJoin

#Output:

#[('Antwerp',('Jill',7)),

#('Leuven',('Jane',None)),

#('Brussels',('John',10)),

#('Brussels',('Jack',10))]

homesRDD.rightOuterJoin(lifeQualityRDD).collect()#rightOuterJoin

#Output:

#[('Antwerp',('Jill',7)),

#('RestOfFlanders',(None,5)),

#('Brussels',('John',10)),

#('Brussels',('Jack',10))]

homesRDD.cogroup(lifeQualityRDD).collect()#cogroup

#Output:

#[('Antwerp',

#(<pyspark.resultiterable.ResultIterableat0x73d2d68>,

#<pyspark.resultiterable.ResultIterableat0x73d2940>)),

#('RestOfFlanders',

#(<pyspark.resultiterable.ResultIterableat0x73d2828>,

#<pyspark.resultiterable.ResultIterableat0x73d2b70>)),

#('Leuven',

#(<pyspark.resultiterable.ResultIterableat0x73d26a0>,

#<pyspark.resultiterable.ResultIterableat0x7410a58>)),

#('Brussels',

#(<pyspark.resultiterable.ResultIterableat0x73d2b38>,

#<pyspark.resultiterable.ResultIterableat0x74106a0>))]

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

#Oops!Those<ResultIterable>sareSpark'swayofreturningalist

#thatwecanwalkover,withoutmaterializingthelist.

#Let'smaterializetheliststomaketheabovemorereadable:

(homesRDD

.cogroup(lifeQualityRDD)

.map(lambdax:(x[0],(list(x[1][0]),list(x[1][1]))))

.collect())

#Output:

#[('Antwerp',(['Jill'],[7])),

#('RestOfFlanders',([],[5])),

#('Leuven',(['Jane'],[])),

#('Brussels',(['John','Jack'],[10]))]

1

2

3

4

5

6

7

8

9

10

11

12

13

惰性计算,actions方法

特别注意:Spark的一个核心概念是惰性计算。当你把一个RDD转换成另一个的时候,这个转换不会立即生效执行!!!Spark会把它先记在心里,等到真的有actions需要取转换结果时,才会重新组织transformations(因为可能有一连串的变换)。这样可以避免不必要的中间结果存储和通信。

常见的action如下,当它们出现的时候,表明需要执行上面定义过的transform了:

collect():计算所有的items并返回所有的结果到driver端,接着collect()会以Pythonlist的形式返回结果

first():和上面是类似的,不过只返回第1个item

take(n):类似,但是返回n个item

count():计算RDD中item的个数

top(n):返回头n个items,按照自然结果排序

reduce():对RDD中的items做聚合

1

2

3

4

5

6

importpyspark

frompysparkimportSparkContextassc

frompysparkimportSparkConf

conf=SparkConf().setAppName("miniProject").setMaster("local[*]")

sc=SparkContext.getOrCreate(conf)

rdd=sc.parallelize(range(1,10+1))

rdd.reduce(lambdax,y:x+y)#reduce():对RDD中的items做聚合

#Output:55

1

2

3

4

5

6

7

8

9

10

reduce的原理:先在每个分区(partition)里完成reduce操作,然后再全局地进行reduce。

有时候需要重复用到某个transform序列得到的RDD结果。但是一遍遍重复计算显然是要开销的,所以我们可以通过一个叫做cache()的操作把它暂时地存储在内存中。缓存RDD结果对于重复迭代的操作非常有用,比如很多机器学习的算法,训练过程需要重复迭代。

importpyspark

frompysparkimportSparkContextassc

frompysparkimportSparkConf

conf=SparkConf().setAppName("miniProject").setMaster("local[*]")

sc=SparkContext.getOrCreate(conf)

importnumpyasnp

numbersRDD=sc.parallelize(np.linspace(1.0,10.0,10))

squaresRDD=numbersRDD.map(lambdax:x**2)

squaresRDD.cache()#PreservetheactualitemsofthisRDDinmemory

avg=squaresRDD.reduce(lambdax,y:x+y)/squaresRDD.count()

print(avg)

#Output:38.5

END,本文到此结束,如果可以帮助到大家,还望关注本站哦!

阅读全文