我们可以有很多方式可以把数据导入到hbase当中,比如说用map-reduce,使用TableOutputFormat这个类,但是这种方式不是最优的方式。
Bulk的方式直接生成HFiles,写入到文件系统当中,这种方式的效率很高。
一般的步骤有两步
(1)使用ImportTsv或者import工具或者自己写程序用hive/pig生成HFiles
(2)用completebulkload把HFiles加载到hdfs上
ImportTsv能把用Tab分隔的数据很方便的导入到hbase当中,但还有很多数据不是用Tab分隔的 下面我们介绍如何使用hive来导入数据到hbase当中。
1.准备输入内容
a.创建一个tables.ddl文件
-- pagecounts data comes from http://dumps.wikimedia.org/other/pagecounts-raw/-- documented http://www.mediawiki.org/wiki/Analytics/Wikistats-- define an external table over raw pagecounts dataCREATE TABLE IF NOT EXISTS pagecounts (projectcode STRING, pagenameSTRING, pageviews STRING, bytes STRING)ROW FORMATDELIMITED FIELDS TERMINATED BY ' 'LINES TERMINATED BY '\n'STORED AS TEXTFILELOCATION '/tmp/wikistats';-- create a view, building a custom hbase rowkeyCREATE VIEW IF NOT EXISTS pgc (rowkey, pageviews, bytes) ASSELECT concat_ws('/',projectcode,concat_ws('/',pagename,regexp_extract(INPUT__FILE__NAME, 'pagecounts-(\\d{8}-\\d{6})\\..*$', 1))),pageviews, bytesFROM pagecounts;-- create a table to hold the input split partitionsCREATE EXTERNAL TABLE IF NOT EXISTS hbase_splits(partition STRING)ROW FORMATSERDE 'org.apache.hadoop.hive.serde2.binarysortable.BinarySortableSerDe'STORED ASINPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat'OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveNullValueSequenceFileOutputFormat'LOCATION '/tmp/hbase_splits_out';-- create a location to store the resulting HFilesCREATE TABLE hbase_hfiles(rowkey STRING, pageviews STRING, bytes STRING)STORED ASINPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat'OUTPUTFORMAT 'org.apache.hadoop.hive.hbase.HiveHFileOutputFormat'TBLPROPERTIES('hfile.family.path' = '/tmp/hbase_hfiles/w');
b.创建HFils分隔文件,例子:sample.hql
-- prepate range partitioning of hfilesADD JAR /usr/lib/hive/lib/hive-contrib-0.11.0.1.3.0.0-104.jar;SET mapred.reduce.tasks=1;CREATE TEMPORARY FUNCTION row_seq AS 'org.apache.hadoop.hive.contrib.udf.UDFRowSequence';-- input file contains ~4mm records. Sample it so as to produce 5 inputsplits.INSERT OVERWRITE TABLE hbase_splitsSELECT rowkey FROM(SELECT rowkey, row_seq() AS seq FROM pgcTABLESAMPLE(BUCKET 1 OUT OF 10000 ON rowkey) sORDER BY rowkeyLIMIT 400) xWHERE (seq % 100) = 0ORDER BY rowkeyLIMIT 4;-- after this is finished, combined the splits file:dfs -cp /tmp/hbase_splits_out/* /tmp/hbase_splits;
c.创建hfiles.hql
ADD JAR /usr/lib/hbase/hbase-0.94.6.1.3.0.0-104-security.jar;ADD JAR /usr/lib/hive/lib/hive-hbase-handler-0.11.0.1.3.0.0-104.jar;SET mapred.reduce.tasks=5;SET hive.mapred.partitioner=org.apache.hadoop.mapred.lib.TotalOrderPartitioner;SET total.order.partitioner.path=/tmp/hbase_splits;-- generate hfiles using the splits rangesINSERT OVERWRITE TABLE hbase_hfilesSELECT * FROM pgcCLUSTER BY rowkey;
2.导入数据
注意:/$Path_to_Input_Files_on_Hive_Client是hive客户端的数据存储目录
mkdir /$Path_to_Input_Files_on_Hive_Client/wikistatswget http://dumps.wikimedia.org/other/pagecounts-raw/2008/2008-10/pagecounts-20081001-000000.gz hadoop fs -mkdir /$Path_to_Input_Files_on_Hive_Client/wikistatshadoop fs -put pagecounts-20081001-000000.gz /$Path_to_Input_Files_on_Hive_Client/wikistats/
3.创建必要的表
注意:$HCATALOG_USER是HCatalog服务的用户(默认是hcat)
$HCATALOG_USER-f /$Path_to_Input_Files_on_Hive_Client/tables.ddl
执行之后,我们会看到如下的提示:
OKTime taken: 1.886 secondsOKTime taken: 0.654 secondsOKTime taken: 0.047 secondsOKTime taken: 0.115 seconds
4.确认表已经正确创建
执行以下语句
$HIVE_USER-e "select * from pagecounts limit 10;"
执行之后,我们会看到如下的提示:
...OKaa Main_Page 4 41431aa Special:ListUsers 1 5555aa Special:Listusers 1 1052
再执行
$HIVE_USER-e "select * from pgc limit 10;"
执行之后,我们会看到如下的提示:
...OKaa/Main_Page/20081001-000000 4 41431aa/Special:ListUsers/20081001-000000 1 5555aa/Special:Listusers/20081001-000000 1 1052...
5.生成HFiles分隔文件
$HIVE_USER-f /$Path_to_Input_Files_on_Hive_Client/sample.hqlhadoop fs -ls /$Path_to_Input_Files_on_Hive_Client/hbase_splits
为了确认,执行以下命令
hadoop jar /usr/lib/hadoop/contrib/streaming/hadoop-streaming-1.2.0.1.3.0.0-104.jar -libjars /usr/lib/hive/lib/hive-exec-0.11.0.1.3.0.0-104.jar -input /tmp/hbase_splits -output /tmp/hbase_splits_txt -inputformatSequenceFileAsTextInputFormat
执行之后,我们会看到如下的提示:
...INFO streaming.StreamJob: Output: /tmp/hbase_splits_txt
再执行这一句
hadoop fs -cat /tmp/hbase_splits_txt/*
执行之后,我们会看到类似这样的结果
1 61 66 2e 71 2f 4d 61 69 6e 5f 50 61 67 65 2f 32 30 30 38 31 30 30 31 2d 3030 30 30 30 30 00 (null)01 61 66 2f 31 35 35 30 2f 32 30 30 38 31 30 30 31 2d 30 30 30 30 30 30 00 (null)01 61 66 2f 32 38 5f 4d 61 61 72 74 2f 32 30 30 38 31 30 30 31 2d 30 30 3030 30 30 00 (null)01 61 66 2f 42 65 65 6c 64 3a 31 30 30 5f 31 38 33 30 2e 4a 50 47 2f 32 3030 38 31 30 30 31 2d 30 30 30 30 30 30 00 (null)
7.生成HFiles
HADOOP_CLASSPATH=/usr/lib/hbase/hbase-0.94.6.1.3.0.0-104-security.jar hive -f /$Path_to_Input_Files_on_Hive_Client/hfiles.hql
以上内容是hdp的用户手册中推荐的方式,然后我顺便也从网上把最后的一步的命令格式给找出来了
hadoop jar hbase-VERSION.jar completebulkload /user/todd/myoutput mytable