Java操作HDFS

/ 随笔 / 50浏览

背景

最近需要写一个FlinkSource组件,从HDFS读取文件,由于很久没有写过这些基础代码了,有些东西基本都忘了,所以为了后期方便使用,在这儿记录一下。

引入依赖

操作HDFS文件,需要先引入hadoop-client依赖。

compile group: 'org.apache.hadoop', name: 'hadoop-client', version: '2.7.2'

初始化

操作HDFS文件,需要先初始化FileSystem。初始化FileSystem有两种方式。

引入xml文件

引入xml文件是最简单的一种方式。将hdfs相关的文件放在classpath下即可。

resource:
    -- core-site.xml
    -- hdfs-site.xml

关于这两个文件,如果是CDH管理的Hadoop,可以在CDH上下载客户端配置。如果是手动管理的Hadoop,从conf目录下找到即可。

操作HDFS主要依靠FileSystem来操作。创建FileSystem的方式也极为简便。

public class HdfsOperateDemo {

    private static Configuration configuration;
    private static FileSystem fileSystem;

    //初始化
    private static void init() throws IOException {
        configuration = new Configuration();
        fileSystem = FileSystem.get(configuration);
    }

    public static void main(String[] args) {

    }

}

手动设置配置参数

除了引入xml文件来创建FileSystem之外,通过configuration设置必备参数也可以创建。期中一个参数是:fs.defaultFS

public class HdfsOperateDemo {

    private static Configuration configuration;
    private static FileSystem fileSystem;

    private static void init() throws IOException {
        configuration = new Configuration();
        configuration.set(CommonConfigurationKeys.FS_DEFAULT_NAME_KEY, "hdfs://cos6-data1.test.allsenseww.com:8020");
        fileSystem = FileSystem.get(configuration);
    }

    private static void checkExist(String path) throws IOException {
        boolean exists = fileSystem.exists(new Path(path));
        System.out.println(exists);
    }

    public static void main(String[] args) throws IOException {
        init();
        checkExist("/user/lizy/");
    }

}

操作HDFS

查看文件或文件目录是否存在

/**
 * 检查是否存在
 */
private static void checkExist(String path) throws IOException {
    boolean exists = fileSystem.exists(new Path(path));
    System.out.println(exists);
}

创建目录

/**
 * 创建目录
 */
private static void mkdir() throws IOException {
    fileSystem.mkdirs(new Path("/user/lizy/test_mkdir"));
}

创建文件

/**
 * 创建文件
 */
private static void createFile(String path) throws IOException {
    FSDataOutputStream fsDataOutputStream = fileSystem.create(new Path(path));
    fsDataOutputStream.write("hello hdfs".getBytes());
    fsDataOutputStream.flush();
    fsDataOutputStream.close();
}

读取文本文件

/**
 * 读取文本文件
 */
private static void readTxtFile(String path) throws IOException {
    StringBuilder result = new StringBuilder();
    FSDataInputStream fsDataInputStream = fileSystem.open(new Path(path));
    BufferedReader br = new BufferedReader(new InputStreamReader(fsDataInputStream));
    String readLine;
    while ((readLine = br.readLine()) != null) {
        result.append(readLine).append("\n");
    }
    System.out.println(result);
}

读取gz压缩文件

/**
 * 读取gz压缩文件
 */
private static void readGzipFile(String path) throws IOException {
    StringBuilder result = new StringBuilder();
    Path file = new Path(path);
    FSDataInputStream fsDataInputStream = fileSystem.open(file);

    GZIPInputStream gzInputStream = new GZIPInputStream(fsDataInputStream);
    InputStreamReader reader = new InputStreamReader(gzInputStream);
    BufferedReader bufferedReader = new BufferedReader(reader);

    String readLine;
    while ((readLine = bufferedReader.readLine()) != null) {
        result.append(readLine).append("\n");
    }
    System.out.println(result);
}

通用读取多种格式压缩文件

/**
 * 通用读取压缩文件
 */
private static void commonReadCompressionFile(String path) throws IOException {
    StringBuilder result = new StringBuilder();
    Path file = new Path(path);
    FSDataInputStream fsDataInputStream = fileSystem.open(file);
    CompressionCodecFactory factory = new CompressionCodecFactory(configuration);
    CompressionCodec codec = factory.getCodec(file);
    BufferedReader reader = null;

    if (codec == null) {
        reader = new BufferedReader(new InputStreamReader(fsDataInputStream));
    } else {
        CompressionInputStream comInStream = codec.createInputStream(fsDataInputStream);
        reader = new BufferedReader(new InputStreamReader(comInStream));
    }

    String readLine;
    while ((readLine = reader.readLine()) != null) {
        result.append(readLine).append("\n");
    }
    System.out.println(result);
}

注意,压缩类型一定要正确。因为它的解压缩是Hadoop中的方式,因此,压缩时,也需要Hadoop支持的压缩格式。比如flume收集日志到HDFS上,使用gzip压缩,后缀是gz,解析的时候就不能用它解析出来。必须要使用:org.apache.hadoop.io.compress.GzipCodec。

Hadoop支持的压缩格式:

压缩类型 压缩算法 后缀
org.apache.hadoop.io.compress.DefaultCodec deflate .deflate
org.apache.hadoop.io.compress.GzipCodec gzip .gz
org.apache.hadoop.io.compress.BZip2Codec bzip2 .bz2
org.apache.hadoop.io.compress.DeflateCodec deflate .deflate
org.apache.hadoop.io.compress.SnappyCodec snappy .snappy
org.apache.hadoop.io.compress.Lz4Codec lz4 .lz4

读取snappy压缩文件

有了通用的压缩文件读取的方式,为啥还要单独再写一个读取snappy压缩文件呢。其主要原因是,snappy和lz4压缩文件需要native库支持。如果本机并未装native库,则会提示:native snappy library not available 的错误。

解决这个问题也很好解决,就是将已经编译好的native库加入到classpath下即可。

vim /etc/profile

export LD_LIBRARY_PATH=~/snappy/native

source /etc/profile

删除目录及文件

/**
 * 删除目录
 */
private static void deleteDir(String path) throws IOException {
    fileSystem.delete(new Path(path), true);
}

完整的代码程序

package com.wdzaslzy.hdfs;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.io.compress.CompressionInputStream;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.zip.GZIPInputStream;

public class HdfsOperateDemo {

    private static Configuration configuration;
    private static FileSystem fileSystem;

    public static void main(String[] args) throws IOException {
        init();
//        checkExist("/user/lizy/");
//        mkdir("/user/lizy/test_mkdir");
//        createFile("/user/lizy/java_demo.txt");
//        readTxtFile("/user/lizy/demo_flume.1589245099068.txt");
//        readGzipFile("/user/lizy/read_test/flume-0.flume-headless.dev.svc.cluster.local_flume.1589956906232.gz");
//        commonReadCompressionFile("/user/lizy/demo_flume.1589289624325.snappy");
        deleteDir("/user/lizy/read_test");
        fileSystem.close();
    }

    private static void init() throws IOException {
        configuration = new Configuration();
        configuration.set(CommonConfigurationKeys.FS_DEFAULT_NAME_KEY, "hdfs://cos6-data1.test.allsenseww.com:8020");
        configuration.setBoolean(CommonConfigurationKeys.IO_NATIVE_LIB_AVAILABLE_KEY, true);
        fileSystem = FileSystem.get(configuration);
    }

    private static void stop() throws IOException {
        fileSystem.close();
    }

    /**
     * 检查是否存在
     */
    private static void checkExist(String path) throws IOException {
        boolean exists = fileSystem.exists(new Path(path));
        System.out.println(exists);
    }

    /**
     * 创建目录
     */
    private static void mkdir(String path) throws IOException {
        fileSystem.mkdirs(new Path(path));
    }

    /**
     * 创建文件
     */
    private static void createFile(String path) throws IOException {
        FSDataOutputStream fsDataOutputStream = fileSystem.create(new Path(path));
        fsDataOutputStream.write("hello hdfs".getBytes());
        fsDataOutputStream.flush();
        fsDataOutputStream.close();
    }

    /**
     * 读取文本文件
     */
    private static void readTxtFile(String path) throws IOException {
        StringBuilder result = new StringBuilder();
        FSDataInputStream fsDataInputStream = fileSystem.open(new Path(path));
        BufferedReader br = new BufferedReader(new InputStreamReader(fsDataInputStream));
        String readLine;
        while ((readLine = br.readLine()) != null) {
            result.append(readLine).append("\n");
        }
        System.out.println(result);
    }

    /**
     * 读取gz压缩文件
     */
    private static void readGzipFile(String path) throws IOException {
        StringBuilder result = new StringBuilder();
        Path file = new Path(path);
        FSDataInputStream fsDataInputStream = fileSystem.open(file);

        GZIPInputStream gzInputStream = new GZIPInputStream(fsDataInputStream);
        InputStreamReader reader = new InputStreamReader(gzInputStream);
        BufferedReader bufferedReader = new BufferedReader(reader);

        String readLine;
        while ((readLine = bufferedReader.readLine()) != null) {
            result.append(readLine).append("\n");
        }
        System.out.println(result);
    }

    /**
     * 通用读取压缩文件
     */
    private static void commonReadCompressionFile(String path) throws IOException {
        StringBuilder result = new StringBuilder();
        Path file = new Path(path);
        FSDataInputStream fsDataInputStream = fileSystem.open(file);
        CompressionCodecFactory factory = new CompressionCodecFactory(configuration);
        CompressionCodec codec = factory.getCodec(file);
        BufferedReader reader = null;

        if (codec == null) {
            reader = new BufferedReader(new InputStreamReader(fsDataInputStream));
        } else {
            CompressionInputStream comInStream = codec.createInputStream(fsDataInputStream);
            reader = new BufferedReader(new InputStreamReader(comInStream));
        }

        String readLine;
        while ((readLine = reader.readLine()) != null) {
            result.append(readLine).append("\n");
        }
        System.out.println(result);
    }

    /**
     * 删除目录
     */
    private static void deleteDir(String path) throws IOException {
        fileSystem.delete(new Path(path), true);
    }

}