44

Hive 如何快速拉取大批量数据

 3 years ago
source link: http://www.cnblogs.com/yougewe/p/13909575.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

用hive来做数仓类操作,或者大数据的运算,是没有疑问的,至少在你没有更多选择之前。

当我们要hive来做类似于大批量数据的select时,也许问题就会发生了变化。

1. 通用解决方案之分页

首先,我们要基于一个事实,就是没有哪个数据库可以无限制的提供我们select任意数据量的数据。比如常用的 mysql, oracle, 一般你select 10w左右的数据量时已经非常厉害了。而我们的解决方法也比较简单,那就是分页获取,比如我一页取1w条,直到取完为止。同样,因为hive基于都支持sql92协议,所以你也可以同样的方案去解决大数据量的问题。

分页的解决方案会有什么问题?首先,我们要明白分页是如何完成的,首先数据库server会根据条件运算出所有或部分符合条件的数据(取决是否有额外的排序),然后再根据分页偏移信息,获取相应的数据。所以,一次次的分页,则必定涉及到一次次的数据运算。这在小数据量的情况下是可以接受的,因为计算机的高速运转能力。但是当数据量大到一定程序时,就不行了。比如我们停滞了许多年的大数据领域解决方案就是很好的证明。

本文基于hive处理数据,也就是说数据量自然也是大到了一定的级别,那么用分页也许就不好解决问题了。比如,单次地运算也许就是3-5分钟(基于分布式并行计算系统能力),当你要select 100w数据时,如果用一页1w的运算,那么就是100次来回,1次3-5分钟,100次就是5-8小时的时间,这就完全jj了。谁能等这么长时间?这样处理的最终结果就是,业务被砍掉,等着财务结账了。

所以,我们得改变点什么!

2. 使用hive-jdbc

jdbc本身不算啥,只是一个连接协议。但它的好处在于,可以维持长连接。这个连接有个好处,就是server可以随时输出数据,而client端则可以随时处理数据。这就给了我们一个机会,即比如100w的数据运算好之后,server只需源源不断的输出结果,而client端则源源不断地接收处理数据。

所以,我们解决方案是,基于hive-jdbc, 不使用分页,而全量获取数据即可。这给我们带来莫大的好处,即一次运算即可。比如1次运算3-5分钟,那么总共的运算也就是3-5分钟。

看起来不错,解决了重复运算的问题。好似万事大吉了。

具体实现就是引入几个hive-jdbc的依赖,然后提交查询,依次获取结果即可。样例如下:

<!-- pom 依赖 -->
<!-- https://mvnrepository.com/artifact/org.apache.hive/hive-jdbc -->
<dependency>
    <groupId>org.apache.hive</groupId>
    <artifactId>hive-jdbc</artifactId>
    <version>2.3.4</version>
</dependency>

--

// 测试hive-jdbc
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;

import java.sql.DriverManager;
 
public class HiveJdbcTest {
    private static Connection conn = getConnnection();
    private static PreparedStatement ps;
    private static ResultSet rs;
    // 获取所有数据
    public static void getAll(String tablename) {
        String sql="select * from " + tablename;
        System.out.println(sql);
        try {
            ps = prepare(conn, sql);
            rs = ps.executeQuery();
            int columns = rs.getMetaData().getColumnCount();
            while(rs.next()) {
                for(int i=1;i<=columns;i++) {
                    System.out.print(rs.getString(i));  
                    System.out.print("\t\t");
                }
                System.out.println();
            }
        } 
        catch (SQLException e) {
            e.printStackTrace();
        }
 
    }
    // 测试
    public static void main(String[] args) { 
        String tablename="t1";
        HiveJdbcTest.getAll(tablename);
    }
 
    private static String driverName = "org.apache.hive.jdbc.HiveDriver";
    private static String url = "jdbc:hive2://127.0.0.1:10000/";
    private static Connection conn;
    // 连接hive库
    public static Connection getConnnection() {
        try {
            Class.forName(driverName);
            conn = DriverManager.getConnection(url, "hive", "123");
        }
        catch(ClassNotFoundException e) {
            e.printStackTrace();
        }
        catch (SQLException e) {
            e.printStackTrace();
        }
        return conn;
    }
    public static PreparedStatement prepare(Connection conn, String sql) {
        PreparedStatement ps = null;
        try {
            ps = conn.prepareStatement(sql);
        } 
        catch (SQLException e) {
            e.printStackTrace();
        }
        return ps;
    }
}

样例代码,无需纠结。简单的jdbc操作样板。总体来说就是,不带分页的接收全量数据。

但是,这个会有什么问题?同样,小数据量时无任何疑问,但当数据量足够大时,每一次的数据接收,都需要一次网络通信请示,且都是单线程的。我们假设接受一条数据花费1ms, 那么接收1000条数就是1s, 6k条数据就是1min。36w条数据就是1h, 额,后面就无需再算了。同样是不可估量的时间消耗。(实际情况也许会好点,因为会有buffer缓冲的存在)

为什么会这样呢?运算量已经减小了,但是这网络通信量,我们又能如何?实际上,问题不在于网络通信问题,而在于我们使用这种方式,使我们从并行计算转到了串行计算的过程了。因为只有单点的数据接收,所以只能将数据汇集处理。从而就是一个串行化的东西了。

所以,我们更多应该从并行这一层面去解决问题。

3. 基于临时表实现

要解决并行变串行的问题,最根本的办法就是避免一条条读取数据。而要避免这个问题,一个很好想到的办法就是使用临时表,绕开自己代码的限制。让大数据集群自行处理并行计算问题,这是个不错的想法。

但具体如何做呢?我们面临至少这么几个问题:

1. 如何将数据写入临时表?

2. 写入临时表的数据如何取回?是否存在瓶颈问题?

3. 临时表后续如何处理?

我们一个个问题来,第1个,如何写临时表问题:我们可以选择先创建一个临时表,然后再使用insert into select ... from ... 的方式写入,但这种方式非常费力,首先你得固化下临时表的数据结构,其次你要处理多次写入问题。看起来不是最好的办法。幸好,hive中或者相关数据库产品都提供了另一种更方便的建临时表的方法: create table xxx as select ... from ...  你只需要使用一个语句就可以将结果写入到临时表了。但需要注意的是,我们创建时,需要指定好我们需要的格式,否则最终结果也许不是我们想要的,比如我们需要使用','分隔数据而非tab, 我们需要使用 text 形式的数据,而非压缩的二进制格式。

以下是个使用样例:

-- 外部使用 create table 包裹
CREATE TABLE tmp_2020110145409001 
    ROW FORMAT DELIMITED
    FIELDS TERMINATED BY ','
    STORED AS TEXTFILE as 
        -- 具体的业务select sql
        select t1.*, t2.* from test t1 left join test2 t2 on t1.id = t2.t_id
    ;

如此,我们就得到所需的结果了。以上结果,在hive中表现为一个临时表。而其背后则是一个个切分的文件,以','号分隔的文本文件,且会按照hive的默认存储目录存放。(更多具体语法请查询官网资料)

接下来,我们要解决第2个问题:如何将数据取回?这个问题也不难,首先,现在结果已经有了,我们可以一行行地读取返回,就像前面一样。但这时已经没有了数据运算,应该会好很多。但明显还是不够好,我们仍然需要反复的网络通信。我们知道,hive存储的背后,是一个个切分的文件,如果我们能够将该文件直接下载下来,那将会是非常棒的事。不错,最好的办法就是,直接下载hive的数据文件,hive会存储目录下,以类似于 part_0000, part_0001... 之类的文件存放。

那么,我们如何才能下载到这些文件呢?hive是基于hadoop的,所以,很明显我们要回到这个问题,基于hadoop去获取这些文件。即 hdfs 获取,命令如下:

// 查看所有分片数据文件列表
hdfs dfs -ls hdfs://xx/hive/mydb.db/*
// 下载所有数据文件到 /tmp/local_hdfs 目录
hdfs dfs -get hdfs://xx/hive/mydb.db/* /tmp/local_hdfs

我们可以通过以上命令,将数据文件下载到本地,也可以hdfs的jar包,使用 hdfs-client 进行下载。优缺点是:使用cli的方式简单稳定但依赖于服务器环境,而使用jar包的方式则部署方便但需要自己写更多代码保证稳定性。各自选择即可。

最后,我们还剩下1个问题:如何处理临时表的问题?hive目前尚不支持设置表的生命周期(阿里云的maxcompute则只是一个 lifecycle 选项的问题),所以,需要自行清理文件。这个问题的实现方式很多,比如你可以自行记录这些临时表的创建时间、位置、过期时间,然后再每天运行脚本清理表即可。再简单点就是你可以直接通过表名进行清理,比如你以年月日作为命令开头,那么你可以根据这日期删除临时表即可。如:

-- 列举表名
show tables like 'dbname.tmp_20201101*';
-- 删除具体表名
drop table dbname.tmp_2020110100001 ; 

至此,我们的所有问题已解决。总结下:首先使用临时表并行地将结果写入;其次通过hdfs将文件快速下载到本地即可;最后需要定时清理临时表;这样,你就可以高效,无限制的为用户拉取大批量数据了。

不过需要注意的是,我们的步骤从1个步骤变成了3个步骤,增加了复杂度。(实际上你可能还会处理更多的问题,比如元数据信息的对应问题)复杂度增加的最大问题就在于,它会带来更多的问题,所以我们一定要善于处理好这些问题,否则将会带来一副作用。


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK