`

spark SQL例子

阅读更多

spark SQL:在大数据查询是,使用SQL让我们方便了许多。。。

1. pom

    <dependencies>
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>2.11.7</version>
        </dependency>
        
       <dependency>
		<groupId>org.apache.spark</groupId>
		<artifactId>spark-core_2.11</artifactId>
		<version>1.5.1</version>
		</dependency>
		
        <dependency>
	<groupId>org.apache.spark</groupId>
	<artifactId>spark-sql_2.11</artifactId>
	<version>1.5.1</version>

 

<build>
        <plugins>
            <plugin>
                <groupId>org.scala-tools</groupId>
                <artifactId>maven-scala-plugin</artifactId>
                <version>2.15.2</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                            <goal>testCompile</goal>
                        </goals>
                    </execution>
                </executions>
                <configuration>
                    <args>
                        <arg>-feature</arg>
                    </args>
                </configuration>
            </plugin>
        </plugins>
    </build>

 2.sparkSQL.scala

import java.sql.DriverManager

import org.apache.spark.rdd.JdbcRDD

import org.apache.spark.{ SparkContext, SparkConf }
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{ IntegerType, StringType, StructField, StructType }

object sparkSQL {

  def main(args: Array[String]): Unit = {

    val conf = new SparkConf().setAppName("Simple Application").setMaster("local[*]")
    val sc = new SparkContext(conf)
    val sqlContext = new org.apache.spark.sql.SQLContext(sc)
    val people = sc.textFile("i:/4/people.txt")
    val dog = sc.textFile("i:/4/dog.txt")
    // 这个 schema 字符类型是字符串
    val schemaPeople = "name age"
    val schemadog = "name age"
    // 导入 Row.
    import org.apache.spark.sql.Row;

    // 导入 Spark SQL 数据类型
    import org.apache.spark.sql.types.{ StructType, StructField, StringType };

    // Generate the schema based on the string of schema
    val schemaPer =
      StructType(
          schemaPeople.split(" ").map(fieldName => StructField(fieldName, StringType, true)))
          
     val schemaD =
      StructType(
          schemadog.split(" ").map(fieldName => StructField(fieldName, StringType, true)))     
    // Convert records of the RDD (people) to Rows.
    val rowRDD = people.map(_.split(",")).map(p => Row(p(0), p(1).trim))
    val rowRDD2 = dog.map(_.split(",")).map(p => Row(p(0), p(1).trim))
    // Apply the schema to the RDD.
    val peopleDataFrame = sqlContext.createDataFrame(rowRDD, schemaPer)
    val dogDataFrame = sqlContext.createDataFrame(rowRDD2, schemaD)
    // 注册DataFrames为表。
    peopleDataFrame.registerTempTable("people")
    dogDataFrame.registerTempTable("dog")
    // SQL语句可以通过使用由sqlContext提供的SQL方法运行。
    val results = sqlContext.sql("SELECT p.name,d.name,p.age,d.age FROM people p,dog d where p.age=d.age")

    // SQL查询的结果是DataFrames支持所有的正常的RDD操作。
    results.map(t => "perName: " + t(0)+" dogName: " + t(1) + " perAge:"+ t(2)+ " dogAge:"+ t(3)).collect().foreach(println)
  }

} 

 people.txt

aaa,11
bbb,22
ccc,33
ddd,44

 dog.txt

eee,11
fff,22
ggg,33
ddd,44

 print

perName: aaa dogName: eee perAge:11 dogAge:11
perName: bbb dogName: fff perAge:22 dogAge:22
perName: ccc dogName: ggg perAge:33 dogAge:33
perName: ddd dogName: ddd perAge:44 dogAge:44

 

分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics