Spark之二:Spark-SQL编程指南

/ spark / 200浏览

SparkSQL是Spark的一个模块,它主要用来处理结构化数据。它是相对于SparkCore更高层的一个API,底层实现也是SparkCore的方式来实现。

SparkSQL支持纯SQL的开发,也提供了一种链式编程的API,它的关键是DataFrame。

环境准备

引入spark-sql依赖

在编写spark-sql程序时,需要将spark-sql的依赖加入到项目中。

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-sql_2.11</artifactId>
    <version>2.2.0</version>
</dependency>

创建SQL上下文

在写spark-core程序时,需要一个spark上下文:SparkContext。在写spark-sql时,也需要一个SQL上下文:SqlContext。这个SQL上下文是有SparkSession创建出来。SparkSession中定义了spark的运行环境。

def main(args: Array[String]): Unit = {
    val sparkSession = SparkSession.builder()
    .master("local[*]")
    .appName("TestApp")
    .getOrCreate()
    val sqlContext = sparkSession.sqlContext
}

SparkConf的配置在spark-core中已经介绍了,它们是一样的,都用来指定spark的运行环境。

ps:在spark2.0时,sqlContext的大部分功能已经完全移到了SparkSession下。意味着我们不再使用sql上下文了,直接从sparkSession中进行sql开发。

在写spark-core程序时,我们的SparkContext是new出来的,现在也可以从sparkSession中获取。后续推荐使用这种方式来获取sparkContext,因为它完美的避免了同一个程序创建多个SparkContext的这种情况,而且它可以创建不冲突的sqlContext和sparkContext。

创建DataFrame

写spark-sql程序,需要先有一个DataFrame。DataFrame可以理解为一个表对象,它是将某个表的结构封装成了一个对象。创建DataFrame有如下几种方式:

spark支持的data source有很多种类型,比如:json、csv、parquet、orc、jdbc、other data base。后面会详细介绍如何从这些数据源来获得DataFrame。

下面是一个从json中创建DataFrame的案例:

def main(args: Array[String]): Unit = {
    val sparkSession = SparkSession.builder()
    .master("local[*]")
    .appName(" SparkSqlDemo")
    .getOrCreate()
    val dataFrame = sparkSession.read.json("E:\\workspace\\person.json")
    dataFrame.show()
}

这个json文件是spark案例下的一个json文件,可以在example/resource下找到。show之后会输出表信息:

DataFrame操作详解

基本操作方法

spark-sql具有一些基本的一些操作方法,如下(包含但不仅限于)

以person.json这个文件为例:

select

select提供了多种形式。可以指定查询的字段名,也可以使用一个表达式。

dataFrame.select("name","age")

dataFrame.selectExpr("age+2").show()

如果不写select,默认是查询全部字段,一般只有在查询部分字段时才写select。

filter

filter用来过滤不想要的数据。filter里面可以传一个表达式,也可以传一个function。function一般用于更复杂的过滤。两种使用方式如下:

dataFrame.filter("age is not null").show()
dataFrame.filter(row=>{
    val age = row.getAs[java.lang.Long]("age")
     age != null && age > 20
}).show()

groupBy

根据某个字段进行分组,可以添加多个字段。类似于sql中的group by name,age

dataFrame.groupBy("name","age")

orderBy

根据字段进行排序,可以是多个字段,也可以是一个表达式。默认升序,如果需要降序,可以使用表达式的形式来实现。

dataFrame.orderBy("age","name").show()
dataFrame.orderBy(newColumn("age").desc).show()

count

count一般配合groupBy使用,统计每个分组下的数量。

dataFrame.groupBy("name").count()

上面是一些基本的DataFrame方法,可以看出来,和sql基本是一样的。除了这些基本方法之外,还有很多方法,可以参考Spark官网文档。

自定义函数

上面提到,spark在select时可以写表达式,例如要统计age列的总和,可以写

dataFrame.selectExpr("sum(age)")

期中的sum是一个sql函数。如果在某些情况下,sum不能满足条件,可以自定义自己的函数来使用。

自定义函数分为3类:

比较常用的是udf和udaf,udtf几乎不用。例如要解析一些json文件,可以用udf来解析。再比如要求某个指标的上下四分位,可以用udaf来聚合。

自定义udf函数

使用register来注册一个自定义函数,第一个参数为udf函数名,第二个参数为自定义的udf函数。

自定义udaf函数

自定义udaf,需要继承UserDefinedAggregateFunction。下面是一个自定义求平均值的函数。

class MyAvgFunction extends UserDefinedAggregateFunction {
    override def inputSchema: StructType = new StructType().add("input", LongType)

    override def bufferSchema: StructType = new StructType().add("sum", LongType).add("count", IntegerType)

    override def dataType: DataType = StringType

    override def deterministic: Boolean = true

    override def initialize(buffer: MutableAggregationBuffer): Unit = {
        buffer(0) = 0
        buffer(1) = 0
    }

    override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
        buffer(0) = buffer.getAs[Long](0) + input.getAs[java.lang.Long](0)
        buffer(1) = buffer.getAs[Int](1) + 1
    }

    override def merge(bufferl: MutableAggregationBuffer, buffer2: Row): Unit = {
        bufferl(0) = bufferl.getAs[Long](0) + buffer2.getAs[Long](0)
        bufferl(1) = bufferl.getAs[Int](1) + buffer2.getAs[Int](1)
    }

    override def evaluate(buffer: Row): Any = (buffer.getAs[Long](0).toDouble / buffer.getAs[Int](1)).formatted("8.2f")
}

使用该自定义函数,像注册udf函数一样,将它注册进去即可使用。

纯SQL写法

spark不仅仅提供了链式sql编程外,还提供了入门门槛更低的纯sql写法。如下:

def main(args: Array[String]): Unit = {
    val sparkSession = SparkSession.builder()
    .master("1ocal[*]")
    .appName("SparksqlDemo")
    .getOrCreate()
    val dataFrame = sparkSession.read.json("E:\\workspace\\person.json")
    dataFrame.createTempView("user_info")
    val newDataFrame = sparkSession.sql("select * from user_ info")
    newDataFrame.show()
}

上面的程序是从一个文件中读出一个dataFrame后,将它注册为一个表视图。然后可以通过sql来操作该表视图。spark默认是集成了hive的,如果从hive里面读数据,则只需要在sparkSession中启动hive即可。

sparkSession..enableHiveSupport()

操作hive是无需撞见tempView的。tempView是session级别的。如果要将tempView分享给所有的session,则可以创建一个全局的视图。

dataFrame.createGlobalTempView("user_info")

全局视图在所有的session中都有效,直到spark任务停止。例如:

dataFrame.createGlobalTempView("user_info")
val newDataFrame1=sparkSession.sql("select * from user_info")
val newDataFrame2=sparkSession.newSession().sql("select * from user_info")

需要注意的是,newSession只是用来隔离一些注册表、临时视图、注册函数。在它们的底层依旧是同一个SparkContext。之前也说过,Spark程序只能有一个SparkContext。

从RDD转换DataFrame

上面有一种通过文本文件、数据库中读出来DataFrame。除了这些方式之外,还有一种方式就是从一个已知的RDD来创建DataFrame。

从一个已知的RDD转换为DataFrame又存在两种方式

使用反射形式转换,代码会更加简洁,但某些场景并不适合,例如解析一个文本文件,该文本文件需要输出到列不同的多个表,就需要手动来创建schema。

使用反射形式

object TestApp {

  def main(args: Array[String]): Unit = {
    val sparkSession = SparkSession.builder()
      .master("1ocal[*]")
      .enableHiveSupport()
      .appName("SparksqlDemo")
      .getOrCreate()
    import sparkSession.implicits._
    val lines = sparkSession.sparkContext.textFile("E:\\workspace\lperson.txt")
    val userInfoDataFrame = lines.map(line => {
      val info = line.split(" ")
      UserInfo(info(0).toInt, info(1))
    }).toDF
    userInfoDataFrame.show()
  }
}

case class UserInfo(age: Int, name: String)

可以看到,自定义了一个UserInfo对象,它具有两个属性。通过case class则能使用反射的形式将它转换为对应的DataFrame。需要注意的是,这儿只能是case class,普通class无法转换。case class无需手动序列化,会默认是序列化的。

手动定义Schema

手动创建schema分了三步

def main(args: Array[String]): Unit = {
    val sparkSession = SparkSession.builder()
    .master("1ocal[*]")
    .enableHiveSupport()
    .appName("SparksqlDemo")
    .getOrCreate()

    val rowRdd = sparkSession.sparkContext.textFile("E:\\workspace\lperson.txt")
    .map(line => {
        val infos = line.split(" ")
        Row(infos(0), infos(1))
    })

    val schema = new StructType(Array(StructField("age", IntegerType), StructField("name", StringType)))
    val userInfoDataFrame = sparkSession.createDataFrame(rowRdd, schema)
    userInfoDataFrame.show()
}

上面便是通过已有的RDD来转换为DataFrame。这种场景一般用于一些非结构化的文件中,通过spark-core对其进行解析后,将它转换为结构化的数据,使其存入数据库中。