33

如何使用MaxCompute Spark读写阿里云Hbase

 3 years ago
source link: http://mp.weixin.qq.com/s?__biz=MzI0NTE4NjA0OQ%3D%3D&%3Bmid=2658365239&%3Bidx=3&%3Bsn=ddd63f3e7dcb0986ad68ede91016b6fa
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

Spark on MaxCompute可以访问位于阿里云VPC内的实例(例如ECS、HBase、RDS),默认MaxCompute底层网络和外网是隔离的,Spark on MaxCompute提供了一种方案通过配置spark.hadoop.odps.cupid.vpc.domain.list来访问阿里云的vpc网络环境的Hbase。Hbase标准版和增强版的配置不同,本文通过访问阿里云的标准版和增强版的Hbase简单的描述需要加的配置。

Hbase标准版

环境准备

Hbase的网络环境是存在vpc下的,所以我们首先要添加安全组开放端口2181、10600、16020.同时Hbase有白名单限制我们需要把对应的MaxCompute的IP加入到Hbase的白名单。

设置对应vpc的安全组

zQbiQbE.png!web

找到对应的vpc id然后添加安全组设置端口

eMbU7fr.png!web

添加Hbase的白名单

在hbase的白名单添加

100.104.0.0/16

创建Hbase表

create 'test','cf'

编写Spark程序

需要的Hbase依赖

<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-mapreduce</artifactId>
<version>2.0.2</version>
</dependency>
<dependency>
<groupId>com.aliyun.hbase</groupId>
<artifactId>alihbase-client</artifactId>
<version>2.0.5</version>
</dependency>

编写代码

object App {
def main(args: Array[String]) {
val spark = SparkSession
.builder()
.appName("HbaseTest")
.config("spark.sql.catalogImplementation", "odps")
.config("spark.hadoop.odps.end.point","http://service.cn.maxcompute.aliyun.com/api")
.config("spark.hadoop.odps.runtime.end.point","http://service.cn.maxcompute.aliyun-inc.com/api")
.getOrCreate()

val sc = spark.sparkContext
val config = HBaseConfiguration.create()
val zkAddress = "hb-2zecxg2ltnpeg8me4-master*-***:2181,hb-2zecxg2ltnpeg8me4-master*-***:2181,hb-2zecxg2ltnpeg8me4-master*-***:2181"
config.set(HConstants.ZOOKEEPER_QUORUM, zkAddress);
val jobConf = new JobConf(config)
jobConf.setOutputFormat(classOf[TableOutputFormat])
jobConf.set(TableOutputFormat.OUTPUT_TABLE,"test")


try{

import spark._
spark.sql("select '7', 88 ").rdd.map(row => {
val name= row(0).asInstanceOf[String]
val id = row(1).asInstanceOf[Integer]
val put = new Put(Bytes.toBytes(id))
put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes(id), Bytes.toBytes(name))
(new ImmutableBytesWritable, put)
}).saveAsHadoopDataset(jobConf)
} finally {
sc.stop()
}
}
}

提交到DataWorks

由于大于50m通过odps客户端提交

add jar SparkHbase-1.0-SNAPSHOT -f;

进入数据开发新建spark节点

b6bEJjR.png!web

添加配置

需要配置spark.hadoop.odps.cupid.vpc.domain.list

这里的hbase域名需要hbase所有的机器,少一台可能会造成网络不通

{
"regionId":"cn-beijing",
"vpcs":[
{
"vpcId":"vpc-2zeaeq21mb1dmkqh0exox",
"zones":[
{
"urls":[
{
"domain":"hb-2zecxg2ltnpeg8me4-master*-***.hbase.rds.aliyuncs.com",
"port":2181
},
{
"domain":"hb-2zecxg2ltnpeg8me4-master*-***.hbase.rds.aliyuncs.com",
"port":16000
},
{
"domain":"hb-2zecxg2ltnpeg8me4-master*-***.hbase.rds.aliyuncs.com",
"port":16020
},
{
"domain":"hb-2zecxg2ltnpeg8me4-master*-***.hbase.rds.aliyuncs.com",
"port":2181
},
{
"domain":"hb-2zecxg2ltnpeg8me4-master*-***.hbase.rds.aliyuncs.com",
"port":16000
},
{
"domain":"hb-2zecxg2ltnpeg8me4-master*-***.hbase.rds.aliyuncs.com",
"port":16020
},
{
"domain":"hb-2zecxg2ltnpeg8me4-master*-***.hbase.rds.aliyuncs.com",
"port":2181
},
{
"domain":"hb-2zecxg2ltnpeg8me4-master*-***.hbase.rds.aliyuncs.com",
"port":16000
},
{
"domain":"hb-2zecxg2ltnpeg8me4-master*-***.hbase.rds.aliyuncs.com",
"port":16020
},
{
"domain":"hb-2zecxg2ltnpeg8me4-cor*-***.hbase.rds.aliyuncs.com",
"port":16020
},
{
"domain":"hb-2zecxg2ltnpeg8me4-cor*-***.hbase.rds.aliyuncs.com",
"port":16020
},
{
"domain":"hb-2zecxg2ltnpeg8me4-cor*-***.hbase.rds.aliyuncs.com",
"port":16020
}
]
}
]
}
]
}

7Fby6rE.png!web

 Hbase增强版

环境准备

Hbase增强版的端口是30020、10600、16020.同时Hbase有白名单限制我们需要把对应的MaxCompute的IP加入到Hbase的白名单。

设置对应vpc的安全组

找到对应的vpc id然后添加安全组设置端口

添加Hbase的白名单

100.104.0.0/16

创建Hbase表 

create 'test','cf'

编写Spark程序

需要的Hbase依赖,引用的包必须是阿里云增强版的依赖

<dependency>
<groupId>com.aliyun.hbase</groupId>
<artifactId>alihbase-client</artifactId>
<version>2.0.8</version>
</dependency>

编写代码

object McToHbase {
def main(args: Array[String]) {
val spark = SparkSession
.builder()
.appName("spark_sql_ddl")
.config("spark.sql.catalogImplementation", "odps")
.config("spark.hadoop.odps.end.point","http://service.cn.maxcompute.aliyun.com/api")
.config("spark.hadoop.odps.runtime.end.point","http://service.cn.maxcompute.aliyun-inc.com/api")
.getOrCreate()

val sc = spark.sparkContext


try{
spark.sql("select '7', 'long'").rdd.foreachPartition { iter =>
val config = HBaseConfiguration.create()
// 集群的连接地址(VPC内网地址)在控制台页面的数据库连接界面获得
config.set("hbase.zookeeper.quorum", ":30020");
import spark._
// xml_template.comment.hbaseue.username_password.default
config.set("hbase.client.username", "");
config.set("hbase.client.password", "");
val tableName = TableName.valueOf( "test")
val conn = ConnectionFactory.createConnection(config)
val table = conn.getTable(tableName);
val puts = new util.ArrayList[Put]()
iter.foreach(
row => {
val id = row(0).asInstanceOf[String]
val name = row(1).asInstanceOf[String]
val put = new Put(Bytes.toBytes(id))
put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes(id), Bytes.toBytes(name))
puts.add(put)
table.put(puts)
}
)
}
} finally {
sc.stop()
}



}
}

注意

hbase clinet会报org.apache.spark.SparkException: Task not serializable

原因是spark会把序列化对象以将其发送给其他的worker

解决方案

- 使类可序列化
- 仅在map中传递的lambda函数中声明实例。
- 将NotSerializable对象设置为静态对象,并在每台计算机上创建一次。
- 调用rdd.forEachPartition并在其中创建

Serializable对象,如下所示:

rdd.forEachPartition(iter-> {NotSerializable notSerializable = new NotSerializable();<br />// ...现在处理iter});

提交到DataWorks

由于大于50m通过odps客户端提交

add jar SparkHbase-1.0-SNAPSHOT -f;

进入数据开发新建spark节点

b6bEJjR.png!web

添加配置

需要配置spark.hadoop.odps.cupid.vpc.domain.list

注意

1.这个里需要添加增强版java api访问地址,这里必须采用ip的形式。 ip通过直接ping该地址获取,这里的ip是172.16.0.10添加端口16000

2.这里的hbase域名需要hbase所有的机器,少一台可能会造成网络不通

{
"regionId":"cn-beijing",
"vpcs":[
{
"vpcId":"vpc-2zeaeq21mb1dmkqh0exox",
"zones":[
{
"urls":[
{
"domain":"hb-2zecxg2ltnpeg8me4-master*-***.hbase.rds.aliyuncs.com",
"port":30020
},
{
"domain":"hb-2zecxg2ltnpeg8me4-master*-***.hbase.rds.aliyuncs.com",
"port":16000
},
{
"domain":"hb-2zecxg2ltnpeg8me4-master*-***.hbase.rds.aliyuncs.com",
"port":16020
},
{
"domain":"hb-2zecxg2ltnpeg8me4-master*-***.hbase.rds.aliyuncs.com",
"port":30020
},
{
"domain":"hb-2zecxg2ltnpeg8me4-master*-***.hbase.rds.aliyuncs.com",
"port":16000
},
{
"domain":"hb-2zecxg2ltnpeg8me4-master*-***.hbase.rds.aliyuncs.com",
"port":16020
},
{
"domain":"hb-2zecxg2ltnpeg8me4-master*-***.hbase.rds.aliyuncs.com",
"port":30020
},
{
"domain":"hb-2zecxg2ltnpeg8me4-master*-***.hbase.rds.aliyuncs.com",
"port":16000
},
{
"domain":"hb-2zecxg2ltnpeg8me4-master*-***.hbase.rds.aliyuncs.com",
"port":16020
},
{
"domain":"hb-2zecxg2ltnpeg8me4-cor*-***.hbase.rds.aliyuncs.com",
"port":16020
},
{
"domain":"hb-2zecxg2ltnpeg8me4-cor*-***.hbase.rds.aliyuncs.com",
"port":16020
},
{
"domain":"hb-2zecxg2ltnpeg8me4-cor*-***.hbase.rds.aliyuncs.com",
"port":16020
},
{"domain":"172.16.0.10","port":16000}
]
}
]
}
]
}

jAvuai2.png!web

M7nAzyj.png!web

 动动小手指 了解更多详情 !

R3Ufaay.gif


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK