HBase数据迁移

/ / 50浏览

背景

HBase原来部署在Docker,现需要迁移到我们大数据平台下的HBase中。

Docker中,HBase版本:2.1.2

大数据集群,HBase版本:2.1.0-cdh6.3.2

方法一:distcp

distcp是最简单的一种方案。

适用场景:

不适用场景:

distcp的原理是,将源表目录复制到目标目录。

例如源表目录在:hdfs://nameservice-0/hbase/data/default/t_test_lizy

目标表目录在:hdfs://nameservice-1/hbase/data/default/t_test_lizy

现在,目标HBase中,并无t_test_lizy表。让两个集群互相通信。在目标集群中执行命令:

hadoop distcp -i hdfs://nameservice-0/hbase/data/default/t_test_lizy hdfs://nameservice-1/hbase/data/default/t_test_lizy

执行后,会生成MapReduce程序来分布式拷贝。直至拷贝完成。

拷贝完成后,打开目标HBase shell。

hbase shell

查看HBase表:

hbase 0 > list

可以看到,目标HBase中存在了 t_test_lizy 表。scan一下:

hbase 1 > scan 't_test_lizy'

scan时,会提示:ERROR: Unknown table t_test_lizy!

其原因是,目标HBase的Meta中,并无t_test_lizy表记录。需要将它修复在meta中。

修复方式参考:https://github.com/darkphoenixs/hbase-meta-repair

说明:

在HBase 2.0 版本之前,可以使用:hbase hbck -fixAssignments -fixMeta 来修复。

但在2.0版本之后,HBase废弃了此修复方式。但是HBase社区,提供了hbck2工具来使用。

github地址:https://github.com/apache/hbase-operator-tools.git

之所以没有使用它,是因为它编译比较麻烦。所以采用了上面的方式。

修复完成后,可以看到:

image-20200810174633280

此时,重启HBase,再进行scan,即可查到同步过来的内容。

方法二:Bulkload

Bulkload是一种很不错的方案。它的原理是:从源HBase中查找数据,通过程序生成HFile文件。然后将生成的HFile文件写入在目标表中。

优点:

缺点:

Bulkload代码:

package com.xinyan.bigdata.common.utils


import java.text.SimpleDateFormat
import java.util.Date

import com.alibaba.fastjson.{JSON, JSONObject}
import com.xinyan.bigdata.common.constant.{CommonConstant, HiveConstant, KeyConstant}
import com.xinyan.bigdata.common.logger.KafkaLogger
import com.xinyan.bigdata.common.service.CommonService
import org.apache.commons.lang.StringUtils
import org.apache.hadoop.fs.permission.{FsAction, FsPermission}
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.hbase.client.{Connection, ConnectionFactory, Table}
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2
import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.{HBaseConfiguration, HConstants, KeyValue, TableName}
import org.apache.hadoop.mapreduce.Job
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.sql.{DataFrame, Row, SparkSession}

import scala.collection.mutable
import scala.collection.mutable.{ArrayBuffer, ListBuffer}

class Bulkload(@transient spark: SparkSession) {


  /**
    * 注意分区数,一个region最多加载2048个数据块
    * 列族默认为info
    * 强制添加date字段
    *
    * @param hbaseHost      hbase-zk地址
    * @param hbaseTableName 导入的hbase表名,包含namespace
    * @param tableDf        spark.table or spark.sql 读取生成的hive数据源
    * @param fileSavePath   bulkload数据保存路径,默认为/user/xy_app_spark/bulkload/${db-table},规定传入db.table
    * @param defRowkeyRule  rowkey生成函数,如:row.getAs[String]("rowkey")
    * @param load2HBase     是否在程序中直接load到hbase,数据量大的建议使用hbase shell工具,默认为false
    * @param removeColumnOpt 移除列:用于把rowkey的那一列移除
    *
    *
    * 修改:20181220
    * bulkload 里面不能过滤空字段,会影响后续程序(比如:实时流gateway),如果字段值是空的话,字段还是要先保留着(如果要过滤掉空值的字段,可以之后定好规则,保证后续不被影响也是可取的)
    **/
  def bulkload2HBase(hbaseHost: String, hbaseTableName: String, tableDf: DataFrame,
                     fileSavePath: String, defRowkeyRule: Row => String,
                     load2HBase: Boolean = false, updateDate: String, loadPath: Option[String] = None, isRemoveEmpty: Boolean = false,
                     removeColumnOpt:Option[String] = None) = {
    val conf = HBaseConfiguration.create()
    //    conf.addResource(new Path("/etc/hbase/hbase-conf/hbase-site.xml"))
    conf.set(HConstants.ZOOKEEPER_QUORUM, hbaseHost)
    conf.setInt(LoadIncrementalHFiles.MAX_FILES_PER_REGION_PER_FAMILY, 2048)
    conf.set("dfs.replication", "2")
    //设置Hfile压缩
    conf.set("hfile.compression", "snappy")
    conf.set("hbase.defaults.for.version.skip", "true")
    conf.set("hbase.regionserver.codecs", "snappy")
    conf.set("hbase.mapreduce.hfileoutputformat.table.name", hbaseTableName)
    conf.set(HConstants.TEMPORARY_FS_DIRECTORY_KEY, "/user/xy_app_spark/hbase-staging")
    println(s"$hbaseTableName set config done.")

    //create a HTable
    //使用新版Table API
    val connection: Connection = ConnectionFactory.createConnection(conf)
    val table: Table = connection.getTable(TableName.valueOf(hbaseTableName))

    println(s"hbase getTable $hbaseTableName done.")

    val job = Job.getInstance(conf)
    job.setMapOutputKeyClass(classOf[ImmutableBytesWritable])
    job.setMapOutputValueClass(classOf[KeyValue])
    HFileOutputFormat2.configureIncrementalLoad(job, table.getDescriptor, connection.getRegionLocator(TableName.valueOf(hbaseTableName)))
    //    HFileOutputFormat2.configureIncrementalLoadMap(job, table.getDescriptor)

    println(s"$hbaseTableName job HFileOutputFormat2 configureIncrementalLoad done.")

    val originColumns = tableDf.columns
    val columnsBuf = new ArrayBuffer[String]()
    columnsBuf ++= originColumns
    //如果有移除列,移除
    if (removeColumnOpt.nonEmpty) {
      val removeColumn = removeColumnOpt.get
      for (x <- originColumns.indices) {
        if (originColumns(x).equals(removeColumn)) {
          columnsBuf.remove(x)
        }
      }
    }

    val columns = columnsBuf.toArray

    //    val df = new DecimalFormat("#.0000")
    val beforeHFile = tableDf.rdd.map {
      row =>
        //表中已经包含rowkey字段
        val value = if (columns.contains("date")) columns.zip(row.mkString("####").split("####", -1))
        else (Array[String]("date") ++ columns).zip(Array[String](updateDate) ++ row.mkString("####").split("####", -1))

        val key = defRowkeyRule(row)
        if(isRemoveEmpty) {
          // 去除插入表中的空数据
          (key, value.filter(entry => CommonUtil.isNotNull(entry._2)))
        } else {
          (key, value)
        }
    }

    val rdd: RDD[(ImmutableBytesWritable, KeyValue)] = beforeHFile.sortByKey()
      //form the data as (ImmutableBytesWritable, KeyValue) to get ready for bulkloading afterword
      .flatMap {
      t =>
        val rk2Bytes = Bytes.toBytes(t._1)
        val immutableBytesWritable = new ImmutableBytesWritable(rk2Bytes)
        val values = t._2
        //sort the columnFamily + qualifier
        values.sortBy(_._1).map {
          value =>
            val qualifier = value._1
            val cellValue2Bytes = Bytes.toBytes(value._2)
            val kv = new KeyValue(rk2Bytes, Bytes.toBytes("info"), Bytes.toBytes(qualifier), cellValue2Bytes)
            (immutableBytesWritable, kv)
        }
    }

    println(s"$hbaseTableName rdd got.")

    /**
      * 1:Generate Hfiles
      * 2:Bulkload Hfiles to HBase(org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles)
      */
    val savePath = if (loadPath.isEmpty) {
      s"/user/xy_app_spark/bulkload/${fileSavePath.replace(".", "-")}"
    } else loadPath.get
    //路径已存在处理
    val hdfsPath = new Path(savePath)
    val hdfs = FileSystem.get(new org.apache.hadoop.conf.Configuration())
    if (hdfs.exists(hdfsPath)) hdfs.delete(hdfsPath, true)
    rdd.saveAsNewAPIHadoopFile(savePath, classOf[ImmutableBytesWritable], classOf[KeyValue], classOf[HFileOutputFormat2], conf)

    println(s"$hbaseTableName saveAsNewAPIHadoopFile done.")

    val all = FsAction.ALL
    val fsPermission = new FsPermission(all, all, all)
    hdfs.setPermission(hdfsPath, fsPermission)
    hdfs.setVerifyChecksum(true)

    if (load2HBase) {
      //todo
      //Bulkload Hfiles to Hbashell
      //      val bulkLoader = new LoadIncrementalHFiles(conf)
      //      val admin = connection.getAdmin
      //      bulkLoader.doBulkLoad(hdfsPath, admin, table, connection.getRegionLocator(TableName.valueOf(hbaseTableName)))
      val df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
      import sys.process._
      println(s"${df.format(new Date())} $hbaseTableName start distcp...")
      s"/home/xy_app_spark/shell/tool-distcp.sh ${savePath.substring(19)}".!
      println(s"${df.format(new Date())} $hbaseTableName distcp done!")
      println(s"${df.format(new Date())} $hbaseTableName start remote bulkload...")
      s"ssh cdh85-55 hbase org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles -Dcreate.table=no -Dhbase.mapreduce.bulkload.max.hfiles.perRegion.perFamily=2048 ${savePath.substring(19)} $hbaseTableName".!
      println(s"${df.format(new Date())} $hbaseTableName remote bulkload done!")
    }
  }

  def bulkloadHFiles(hbaseHost: String, hbaseTableName: String, hdfsPath: String) = {
    val conf = HBaseConfiguration.create()
    conf.addResource(new Path("/etc/hbase/conf/hbase-site.xml"))
    conf.set(HConstants.ZOOKEEPER_QUORUM, hbaseHost)
    conf.setInt(LoadIncrementalHFiles.MAX_FILES_PER_REGION_PER_FAMILY, 2048)
    conf.set("dfs.replication", "2")
    //设置Hfile压缩
    conf.set("hfile.compression", "snappy")
    conf.set("hbase.defaults.for.version.skip", "true")
    conf.set("hbase.regionserver.codecs", "snappy")
    //create a HTable
    //使用新版Table API
    val connection: Connection = ConnectionFactory.createConnection(conf)
    val table: Table = connection.getTable(TableName.valueOf(hbaseTableName))

    //Bulkload Hfiles to Hbase
    val bulkLoader = new LoadIncrementalHFiles(conf)
    val admin = connection.getAdmin
    bulkLoader.doBulkLoad(new Path(hdfsPath), admin, table, connection.getRegionLocator(TableName.valueOf(hbaseTableName)))
  }

  /**
    * bulkload 去除冗余数据,默认取最新上传的数据
    * 对每个分区进行排序去重,减少 shuffle 的量
    * @param dataFrame
    * @param defRowkeyRule
    * @param distinctAttr
    */
  def removeReduances(dataFrame: DataFrame, defRowkeyRule: Row => String, distinctAttr: String): DataFrame = {
    var rowkey: String = null
    var distinctVal: String = null

    val rows: RDD[Row] = dataFrame.rdd.map {
      row =>
        (defRowkeyRule(row), row.getAs[String](distinctAttr), row)
    }.filter(x => StringUtils.isNotBlank(x._2)).mapPartitions {
      // 每个分区做排序、去重
      singlePartDataSet =>
        singlePartDataSet.toSeq.groupBy(_._1).map(_._2.maxBy(_._2)).toIterator
    }.groupBy {
      // 按主键分组
      data =>
        data._2
    }.map(
      // 分区排序后去重后的数据做全部的排序去重
      dataSet =>
        dataSet._2.toList.maxBy(_._2)._3
    )
    spark.createDataFrame(rows, dataFrame.schema)
  }

  /**
    * bulkload 去除冗余数据,默认取最新上传的数据
    * 对每个分区进行排序去重,减少 shuffle 的量
    * @param dataFrame
    * @param defRowkeyRule
    * @param distinctAttr
    */
  def removeReduances(dataFrame: DataFrame, defRowkeyRule: Row => String, distinctAttr: String, defParse: JSONObject => JSONObject): DataFrame = {
    val fieldNames = dataFrame.schema.fields.map(_.name)
    val resultRdd: RDD[JSONObject] = dataFrame.rdd.map {
      row =>
        (defRowkeyRule(row), row.getAs[String](distinctAttr), row)
    }.filter(x => StringUtils.isNotBlank(x._2)).mapPartitions {
      // 每个分区做排序、去重
      singlePartDataSet =>
        singlePartDataSet.toSeq.groupBy(_._1).map(_._2.maxBy(_._2)).toIterator
    }.groupBy {
      // 按主键分组
      data =>
        data._2
    }.map(
      // 分区排序后去重后的数据做全部的排序去重
      dataSet =>
        dataSet._2.toList.maxBy(_._2)._3
    ).mapPartitions {
      data =>
        // 数据解析操作
        val result: ArrayBuffer[JSONObject] = ArrayBuffer[JSONObject]()
        while (data.hasNext) {
          try {
            result.append(defParse(SparkUtil.row2Object(fieldNames, data.next())))
          } catch {
            case ex =>
              println("Bulkload | removeReduances | Exception: {}", ex)
          }
        }
        result.toIterator
    }
    spark.read.json(resultRdd.map(_.toJSONString))
  }

}

object Bulkload {
  implicit def convert2BulkLoad(spark: SparkSession) = new Bulkload(spark)
}

例如生成的HFile文件在:hdfs://nameservice-1/bulkload/t_test_lizy/ 下。

然后执行Bulkload:

hbase org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles hdfs://nameservice-1/bulkload/t_test_lizy/ t_test_lizy