34

HBase BulkLoad批量写入数据实战

 5 years ago
source link: https://mp.weixin.qq.com/s/0Eej-xzBVq3_Vw1y4tA4Dw
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

HBase BulkLoad批量写入数据实战

Original 哥不是小萝莉 哥不是小萝莉 2018-08-19

在进行数据传输中,批量加载数据到HBase集群有多种方式,比如通过HBase API进行批量写入数据、使用Sqoop工具批量导数到HBase集群、使用MapReduce批量导入等。这些方式,在导入数据的过程中,如果数据量过大,可能耗时会比较严重或者占用HBase集群资源较多(如磁盘IO、HBase Handler数等)。今天这篇博客笔者将为大家分享使用HBase BulkLoad的方式来进行海量数据批量写入到HBase集群。

在使用BulkLoad之前,我们先来了解一下HBase的存储机制。HBase存储数据其底层使用的是HDFS来作为存储介质,HBase的每一张表对应的HDFS目录上的一个文件夹,文件夹名以HBase表进行命名(如果没有使用命名空间,则默认在default目录下),在表文件夹下存放在若干个Region命名的文件夹,Region文件夹中的每个列簇也是用文件夹进行存储的,每个列簇中存储就是实际的数据,以HFile的形式存在。路径格式如下:

1/hbase/data/default/<tbl_name>/<region_id>/<cf>/<hfile_id>

2.1 实现原理

按照HBase存储数据按照HFile格式存储在HDFS的原理,使用MapReduce直接生成HFile格式的数据文件,然后在通过RegionServer将HFile数据文件移动到相应的Region上去。流程如下图所示:

640?wx_fmt=png

2.2. 生成HFile文件

HFile文件的生成,可以使用MapReduce来进行实现,将数据源准备好,上传到HDFS进行存储,然后在程序中读取HDFS上的数据源,进行自定义封装,组装RowKey,然后将封装后的数据在回写到HDFS上,以HFile的形式存储到HDFS指定的目录中。实现代码如下:

 1/**
2 * Read DataSource from hdfs & Gemerator hfile.
3 * 
4 * @author smartloli.
5 *
6 *         Created by Aug 19, 2018
7 */
8public class GemeratorHFile2 {
9    static class HFileImportMapper2 extends Mapper<LongWritable, Text, ImmutableBytesWritable, KeyValue> {
10
11        protected final String CF_KQ = "cf";
12
13        @Override
14        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
15            String line = value.toString();
16            System.out.println("line : " + line);
17            String[] datas = line.split(" ");
18            String row = new Date().getTime() + "_" + datas[1];
19            ImmutableBytesWritable rowkey = new ImmutableBytesWritable(Bytes.toBytes(row));
20            KeyValue kv = new KeyValue(Bytes.toBytes(row), this.CF_KQ.getBytes(), datas[1].getBytes(), datas[2].getBytes());
21            context.write(rowkey, kv);
22        }
23    }
24
25    public static void main(String[] args) {
26        if (args.length != 1) {
27            System.out.println("<Usage>Please input hbase-site.xml path.</Usage>");
28            return;
29        }
30        Configuration conf = new Configuration();
31        conf.addResource(new Path(args[0]));
32        conf.set("hbase.fs.tmp.dir", "partitions_" + UUID.randomUUID());
33        String tableName = "person";
34        String input = "hdfs://nna:9000/tmp/person.txt";
35        String output = "hdfs://nna:9000/tmp/pres";
36        System.out.println("table : " + tableName);
37        HTable table;
38        try {
39            try {
40                FileSystem fs = FileSystem.get(URI.create(output), conf);
41                fs.delete(new Path(output), true);
42                fs.close();
43            } catch (IOException e1) {
44                e1.printStackTrace();
45            }
46
47            Connection conn = ConnectionFactory.createConnection(conf);
48            table = (HTable) conn.getTable(TableName.valueOf(tableName));
49            Job job = Job.getInstance(conf);
50            job.setJobName("Generate HFile");
51
52            job.setJarByClass(GemeratorHFile2.class);
53            job.setInputFormatClass(TextInputFormat.class);
54            job.setMapperClass(HFileImportMapper2.class);
55            FileInputFormat.setInputPaths(job, input);
56            FileOutputFormat.setOutputPath(job, new Path(output));
57
58            HFileOutputFormat2.configureIncrementalLoad(job, table);
59            try {
60                job.waitForCompletion(true);
61            } catch (InterruptedException e) {
62                e.printStackTrace();
63            } catch (ClassNotFoundException e) {
64                e.printStackTrace();
65            }
66        } catch (Exception e) {
67            e.printStackTrace();
68        }
69
70    }
71}

在HDFS目录/tmp/person.txt中,准备数据源如下:

11 smartloli 100
22 smartloli2 101
33 smartloli3 102

然后,将上述代码编译打包成jar,上传到Hadoop集群进行执行,执行命令如下:

1hadoop jar GemeratorHFile2.jar /data/soft/new/apps/hbaseapp/hbase-site.xml

如果在执行命令的过程中,出现找不到类的异常信息,可能是本地没有加载HBase依赖JAR包,在当前用户中配置如下环境变量信息:

1export HADOOP_CLASSPATH=$HBASE_HOME/lib/*:classpath

然后,执行source命令使配置的内容立即生生效。

2.3. 执行预览

在成功提交任务后,Linux控制台会打印执行任务进度,也可以到YARN的资源监控界面查看执行进度,结果如下所示:

640?wx_fmt=png

等待任务的执行,执行完成后,在对应HDFS路径上会生成相应的HFile数据文件,如下图所示:

640?wx_fmt=png

2.4 使用BulkLoad导入到HBase

然后,在使用BulkLoad的方式将生成的HFile文件导入到HBase集群中,这里有2种方式。一种是写代码实现导入,另一种是使用HBase命令进行导入。

2.4.1 代码实现导入

通过LoadIncrementalHFiles类来实现导入,具体代码如下:

 1/**
2* Use BulkLoad inport hfile from hdfs to hbase.
3* 
4* @author smartloli.
5*
6* Created by Aug 19, 2018
7*/
8public class BulkLoad2HBase {
9
10    public static void main(String[] args) throws Exception {
11        if (args.length != 1) {
12            System.out.println("<Usage>Please input hbase-site.xml path.</Usage>");
13            return;
14        }
15        String output = "hdfs://cluster1/tmp/pres";
16        Configuration conf = new Configuration();
17        conf.addResource(new Path(args[0]));
18        HTable table = new HTable(conf, "person");
19        LoadIncrementalHFiles loader = new LoadIncrementalHFiles(conf);
20        loader.doBulkLoad(new Path(output), table);
21    }
22
23}

执行上述代码,运行结果如下:

640?wx_fmt=png

2.4.2 使用HBase命令进行导入

先将生成好的HFile文件迁移到目标集群(即HBase集群所在的HDFS上),然后在使用HBase命令进行导入,执行命令如下:

1# 先使用distcp迁移hfile
2hadoop distcp -Dmapreduce.job.queuename=queue_1024_01 -update -skipcrccheck -m 10 /tmp/pres hdfs://nns:9000/tmp/pres
3
4# 使用bulkload方式导入数据
5hbase org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles /tmp/pres person

最后,我们可以到指定的RegionServer节点上查看导入的日志信息,如下所示为导入成功的日志信息:

12018-08-19 16:30:34,969 INFO  [B.defaultRpcServer.handler=7,queue=1,port=16020] regionserver.HStore: Successfully loaded store file hdfs://cluster1/tmp/pres/cf/7b455535f660444695589edf509935e9 into store cf (new location: hdfs://cluster1/hbase/data/default/person/2d7483d4abd6d20acdf16533a3fdf18f/cf/d72c8846327d42e2a00780ac2facf95b_SeqId_4_)

2.5 验证

使用BulkLoad方式导入数据后,可以进入到HBase集群,使用HBase Shell来查看数据是否导入成功,预览结果如下:

640?wx_fmt=png

本篇博客为了演示实战效果,将生成HFile文件和使用BulkLoad方式导入HFile到HBase集群的步骤进行了分解,实际情况中,可以将这两个步骤合并为一个,实现自动化生成与HFile自动导入。如果在执行的过程中出现RpcRetryingCaller的异常,可以到对应RegionServer节点查看日志信息,这里面记录了出现这种异常的详细原因。

4.结束语

这篇博客就和大家分享到这里,如果大家在研究学习的过程当中有什么问题,可以加群进行讨论或发送邮件给我,我会尽我所能为您解答,与君共勉!

640?wx_fmt=png

另外,博主出书了《Hadoop大数据挖掘从入门到进阶实战》,喜欢的朋友或同学, 可以去购买博主的书进行学习,在此感谢大家的支持。


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK