`

用Apache Spark进行大数据处理

阅读更多

from:http://www.infoq.com/cn/articles/apache-spark-sql

Spark SQL,作为Apache Spark大数据框架的一部分,主要用于结构化数据处理和对Spark数据执行类SQL的查询。通过Spark SQL,可以针对不同格式的数据执行ETL操作(如JSON,Parquet,数据库)然后完成特定的查询操作。

在这一文章系列的第二篇中,我们将讨论Spark SQL库,如何使用Spark SQL库对存储在批处理文件、JSON数据集或Hive表中的数据执行SQL查询。

 

Spark大数据处理框架目前最新的版本是上个月发布的Spark 1.3。这一版本之前,Spark SQL模块一直处于“Alpha”状态,现在该团队已经从Spark SQL库上将这一标签移除。这一版本中包含了许多新的功能特性,其中一部分如下:

  • 数据框架(DataFrame):Spark新版本中提供了可以作为分布式SQL查询引擎的程序化抽象DataFrame。
  • 数据源(Data Sources):随着数据源API的增加,Spark SQL可以便捷地处理以多种不同格式存储的结构化数据,如Parquet,JSON以及Apache Avro库。
  • JDBC服务器(JDBC Server):内置的JDBC服务器可以便捷地连接到存储在关系型数据库表中的结构化数据并利用传统的商业智能(BI)工具进行大数据分析。

Spark SQL组件

使用Spark SQL时,最主要的两个组件就是DataFrame和SQLContext。

首先,我们来了解一下DataFrame。

 

DataFrame

DataFrame是一个分布式的,按照命名列的形式组织的数据集合。DataFrame基于R语言中的data frame概念,与关系型数据库中的数据库表类似。

之前版本的Spark SQL API中的SchemaRDD已经更名为DataFrame。

通过调用将DataFrame的内容作为行RDD(RDD of Rows)返回的rdd方法,可以将DataFrame转换成RDD。

可以通过如下数据源创建DataFrame:

  • 已有的RDD
  • 结构化数据文件
  • JSON数据集
  • Hive表
  • 外部数据库

Spark SQL和DataFrame API已经在下述几种程序设计语言中实现:

  • Scala(https://spark.apache.org/docs/1.3.0/api/scala/index.html#org.apache.spark.sql.package)
  • Java(https://spark.apache.org/docs/1.3.0/api/java/index.html?org/apache/spark/sql/api/java/package-summary.html)
  • Python(https://spark.apache.org/docs/1.3.0/api/python/pyspark.sql.html)

本文中所涉及的Spark SQL代码示例均使用Spark Scala Shell程序。

SQLContext

Spark SQL提供SQLContext封装Spark中的所有关系型功能。可以用之前的示例中的现有SparkContext创建SQLContext。下述代码片段展示了如何创建一个SQLContext对象。

val sqlContext = new org.apache.spark.sql.SQLContext(sc)
 

此外,Spark SQL中的HiveContext可以提供SQLContext所提供功能的超集。可以在用HiveQL解析器编写查询语句以及从Hive表中读取数据时使用。

在Spark程序中使用HiveContext无需既有的Hive环境。

JDBC数据源

Spark SQL库的其他功能还包括数据源,如JDBC数据源。

JDBC数据源可用于通过JDBC API读取关系型数据库中的数据。相比于使用JdbcRDD,应该将JDBC数据源的方式作为首选,因为JDBC数据源能够将结果作为DataFrame对象返回,直接用Spark SQL处理或与其他数据源连接。

Spark SQL示例应用

在上一篇文章中,我们学习了如何在本地环境中安装Spark框架,如何启动Spark框架并用Spark Scala Shell与其交互。如需安装最新版本的Spark,可以从Spark网站下载该软件。

对于本文中的代码示例,我们将使用相同的Spark Shell执行Spark SQL程序。这些代码示例适用于Windows环境。

为了确保Spark Shell程序有足够的内存,可以在运行spark-shell命令时,加入driver-memory命令行参数,如下所示:

spark-shell.cmd --driver-memory 1G
 

Spark SQL应用

Spark Shell启动后,就可以用Spark SQL API执行数据分析查询。

在第一个示例中,我们将从文本文件中加载用户数据并从数据集中创建一个DataFrame对象。然后运行DataFrame函数,执行特定的数据选择查询。

文本文件customers.txt中的内容如下:

100, John Smith, Austin, TX, 78727
200, Joe Johnson, Dallas, TX, 75201
300, Bob Jones, Houston, TX, 77028
400, Andy Davis, San Antonio, TX, 78227
500, James Williams, Austin, TX, 78727
 

下述代码片段展示了可以在Spark Shell终端执行的Spark SQL命令。

// 首先用已有的Spark Context对象创建SQLContext对象
val sqlContext = new org.apache.spark.sql.SQLContext(sc)

// 导入语句,可以隐式地将RDD转化成DataFrame
import sqlContext.implicits._

// 创建一个表示客户的自定义类
case class Customer(customer_id: Int, name: String, city: String, state: String, zip_code: String)

// 用数据集文本文件创建一个Customer对象的DataFrame
val dfCustomers = sc.textFile("data/customers.txt").map(_.split(",")).map(p => Customer(p(0).trim.toInt, p(1), p(2), p(3), p(4))).toDF()

// 将DataFrame注册为一个表
dfCustomers.registerTempTable("customers")

// 显示DataFrame的内容
dfCustomers.show()

// 打印DF模式
dfCustomers.printSchema()

// 选择客户名称列
dfCustomers.select("name").show()

// 选择客户名称和城市列
dfCustomers.select("name", "city").show()

// 根据id选择客户
dfCustomers.filter(dfCustomers("customer_id").equalTo(500)).show()

// 根据邮政编码统计客户数量
dfCustomers.groupBy("zip_code").count().show()
 

在上一示例中,模式是通过反射而得来的。我们也可以通过编程的方式指定数据集的模式。这种方法在由于数据的结构以字符串的形式编码而无法提前定义定制类的情况下非常实用。

如下代码示例展示了如何使用新的数据类型类StructType,StringType和StructField指定模式。

//
// 用编程的方式指定模式
//

// 用已有的Spark Context对象创建SQLContext对象
val sqlContext = new org.apache.spark.sql.SQLContext(sc)

// 创建RDD对象
val rddCustomers = sc.textFile("data/customers.txt")

// 用字符串编码模式
val schemaString = "customer_id name city state zip_code"

// 导入Spark SQL数据类型和Row
import org.apache.spark.sql._

import org.apache.spark.sql.types._;

// 用模式字符串生成模式对象
val schema = StructType(schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, true)))

// 将RDD(rddCustomers)记录转化成Row。
val rowRDD = rddCustomers.map(_.split(",")).map(p => Row(p(0).trim,p(1),p(2),p(3),p(4)))

// 将模式应用于RDD对象。
val dfCustomers = sqlContext.createDataFrame(rowRDD, schema)

// 将DataFrame注册为表
dfCustomers.registerTempTable("customers")

// 用sqlContext对象提供的sql方法执行SQL语句。
val custNames = sqlContext.sql("SELECT name FROM customers")

// SQL查询的返回结果为DataFrame对象,支持所有通用的RDD操作。
// 可以按照顺序访问结果行的各个列。
custNames.map(t => "Name: " + t(0)).collect().foreach(println)

// 用sqlContext对象提供的sql方法执行SQL语句。
val customersByCity = sqlContext.sql("SELECT name,zip_code FROM customers ORDER BY zip_code")

// SQL查询的返回结果为DataFrame对象,支持所有通用的RDD操作。
// 可以按照顺序访问结果行的各个列。
customersByCity.map(t => t(0) + "," + t(1)).collect().foreach(println)
 

除了文本文件之外,也可以从其他数据源中加载数据,如JSON数据文件,Hive表,甚至可以通过JDBC数据源加载关系型数据库表中的数据。

如上所示,Spark SQL提供了十分友好的SQL接口,可以与来自多种不同数据源的数据进行交互,而且所采用的语法也是团队熟知的SQL查询语法。这对于非技术类的项目成员,如数据分析师以及数据库管理员来说,非常实用。

 

总结

本文中,我们了解到Apache Spark SQL如何用熟知的SQL查询语法提供与Spark数据交互的SQL接口。Spark SQL是一个功能强大的库,组织中的非技术团队成员,如业务分析师和数据分析师,都可以用Spark SQL执行数据分析。

下一篇文章中,我们将讨论可用于处理实时数据或流数据的Spark Streaming库。Spark Streaming库是任何一个组织的整体数据处理和管理生命周期中另外一个重要的组成部分,因为流数据处理可为我们提供对系统的实时观察。这对于欺诈检测、在线交易系统、事件处理解决方案等用例来说至关重要。

分享到:
评论

相关推荐

    基于Apache Spark的分布式数据处理Scala设计源码

    本项目是基于Apache Spark开发的分布式数据处理Scala设计源码,主要使用Scala进行开发。项目共包含25个文件,其中Scala源代码文件10个,XML配置文件9个,文本文件3个,Git忽略配置文件2个,以及Properties配置文件1...

    用ApacheSpark进行大数据处理

    ApacheSpark是一个围绕速度、易用性和复杂分析构建的大数据处理框架。最初在2009年由加州大学伯克利分校的AMPLab开发,并于2010年成为Apache的开源项目之一。与Hadoop和Storm等其他大数据和MapReduce技术相比,Spark...

    基于Apache Spark的分布式数据处理设计源码

    本项目是基于Apache Spark开发的分布式数据处理设计源码,主要使用Scala进行开发。项目共包含12518个文件,其中Scala源代码文件3515个,Q脚本文件1559个,Java源代码文件1120个,SQL文件368个,文本文件357个,...

    Apache Spark:大数据处理统一引擎.pdf

    #资源达人分享计划#

    ApacheSpark设计与实现.pdf+ApacheSpark源码剖析.pdf+Spark原著中文版.pdf

    ApacheSpark设计与实现.pdf+ApacheSpark源码剖析.pdf+Spark原著中文版.pdf

    基于Scala的Apache Spark 3.1.2大数据处理工具设计源码

    基于Scala的Apache Spark 3.1.2大数据处理工具设计源码,该项目包含14251个文件,主要文件类型有4018个scala源文件,以及1559个q脚本文件。此外,还包括1009个txt文本文件,984个java源文件,以及488个python源文件...

    基于Scala的Apache Spark大数据处理框架设计源码

    本源码提供了一个基于Scala的Apache Spark大数据处理框架设计。项目包含12914个文件,其中包括3697个Scala文件、1559个Q文件、1150个Java文件、388个SQL文件、372个TXT文件、362个Python文件、200个Markdown文档、...

    Apache Spark的设计与实现 PDF中文版

    本文主要讨论 Apache Spark 的设计与实现,重点关注其设计思想、运行原理、实现架构及性能调优,附带讨论与 Hadoop MapReduce 在设计与实现上的区别。不喜欢将该文档称之为“源码分析”,因为本文的主要目的不是去...

    基于Scala的Apache Spark大数据处理引擎设计源码

    本源码提供了一个基于Scala的Apache Spark大数据处理引擎设计。项目包含13628个文件,其中包括3835个Scala文件、1559个Q文件、999个TXT文件、933个Java源文件、395个SQL文件、369个Python文件、206个Markdown文档、...

    基于Apache Spark的大规模数据处理设计源码

    本资源提供了一套基于Apache Spark的大规模数据处理的设计源码,包含13640个文件,其中包括3846个Scala源代码文件,1559个Q文件,以及999个文本文件。此外,还包括933个Java源代码文件,395个SQL数据库文件,以及369...

    Apache Spark:大数据处理统一引擎

    Spark的通用性有几个重要的好处。(1)应用程序更容易开发,因为它们使用统一的API。(2)结合处理任务更有效;而先前的系统需要将数据写入存储以将其传递给...自2010年发布以来,Spark已经发展为活跃的开源项目或大数据处理。

Global site tag (gtag.js) - Google Analytics