Spark之三:DataSource操作介绍

/ spark / 150浏览

Spark可以接入多种数据源,常见的有:多格式的文本文件、数据库。下面主要介绍一下有哪几种方式来加载不同 格式的文本文件。

万能的load

load是最简单、最通用的一种加载方式。spark默认是加载parquet格式的。如果需要其他格式,需要通过format指定格式,spark内部也提供了一些常用的格式,如果你的文件不满足这些格式,你可以自定义format方式。

加载parquet格式

val userInfoFrame = sparkSession.read.load("examples/src/main/resources/users.parquet")

由于spark默认的加载方式是加载parquet,所以直接load即可。

加载其他非parquet格式

spark已经定义了一些常用的格式,如json、csv、orc等。使用如下操作来指定具体的格式:

加载json

val dataFrame = sparkSession.read.format("json").load("examples/src/main/resources/users.json")

加载csv

val dataFrame = sparkSession.read
    .format("csv")
    .option("sep",";")
    .option("inferSchema","true")
    .option("header","true")
    .load("examples/src/main/resources/users.csv")

通过这两种加载方式可以看出,需要加载什么样的格式,就在format中执行。需要注意的是option中的参数。option中主要定义了一些操作该格式文件的一些条件,可以到DataFrameReader下面来看都有哪些关键条件,官方给每个值都提供了说明。如下图:

下面是一些比较关键的:

option_key 说明
sep 设置分割符,默认是","
encoding csv文件的编码格式,默认"utf-8",如果你的文件是gbk,可以通过这个选项设置
comment 用来设置跳过该字符开头的行,默认是空的,即所有的行都读取。常用语跳过csv文件中的第一行
header 表示是否让第一行作为列名,默认是false。如果你的csv就是从某个表读取的,则设置为true
inferSchema 是否开启自动推断schema,比如推断类型。默认是false。
nullValue 如果值为null,设置一个默认值,一般情况下不会设置。默认即是空字符串。
dateFormat 设置时间格式化方式,默认是:yyyy-MM-dd。如果你需要带上时分秒,可以重新设置

注意的是,不仅仅是csv有这些选项可以设置,json、orc、txt都有选项设置,具体的选项,可以到该类下参考,比较详细。

加载其他

上面是json、csv两种常见的格式的加载方式,其他的格式也可以用该形式来加载。如果是spark未支持的格式,可以使用自定义形式来加载。上面的加载方式都是使用的spark-sql中的形式。加载后的结果是一个DataFrame。

与spark-context中的textFile()方法不同的是,read方法读取的是格式化文件,textFile可以读取格式化文件也可以读取非格式化文件,每次只读一行,需要读完后再进行解析。

纯SQL加载

除万能的load之外,还有万能的sql。加载方式如下:

//加载parquet
sparkSession.sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`")
//加载json
sparkSession.sql("SELECT * FROM json.`examples/src/main/resources/users.json`")
//加载csv
sparkSession.sql("SELECT * FROM csv.`examples/src/main/resources/users.csv`")

输出DataFrame

spark默认是和hive进行了对接。因此,如果要将DataFrame里面的数据持久化至hive表,直接使用saveAsTable即可。

如果hive是基于文件的,例如text、parquet、csv等,可以通过选项path来进行输出。例如:df.write.option("path", "/some/path").saveAsTable("tableName")。

两者的区别是内部表和外部表的区别。第二种将文件写入到某个目录下,即使你的hive表已经删除了,数据还是在的,不过在写入表时,需要手动的刷新一下hive表,因为hive的元数据并没有被写进去。使用MSCK REPAIR TABLE来刷新。

如果需要对输出进行分桶和分区,可以使用bucketBy和partitionBy

saveAsTable

saveAsTable方法,如果原始表存在,则先删除原始表、再创建新表,每次写入的后数据不包含原来的数据。一般用于一次性数据表。即只使用一次。例如常见的一些测试,重复操作都需要新的一个结果。

insertInto

insertInto是保持原来的表不变,在原来的表的基础上进行插入。如果有分区,可以选择分区内追加或分区内覆盖。这种方式一般用于增量数据。