58

一文详解大规模数据计算处理原理及操作重点

 5 years ago
source link: http://bigdata.51cto.com/art/201808/580738.htm?amp%3Butm_medium=referral
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

一、RAID技术

大数据技术主要针对的是大规模数据的计算处理问题,那么要想解决的这一问题,首先要解决的就是大规模数据的存储问题。大规模数据存储要解决的核心问题有三个方面:

  • 数据存储容量的问题,既然大数据要解决的是数以PB计的数据计算问题,而一般的服务器磁盘容量通常1-2TB,那么如何存储这么大规模的数据?
  • 数据读写速度的问题,一般磁盘的连续读写速度为几十MB,以这样的速度,几十PB的数据恐怕要读写到天荒地老……
  • 数据可靠性的问题,磁盘大约是计算机设备中最易损坏的硬件了,在网站一块磁盘使用寿命大概是一年,如果磁盘损坏了,数据怎么办?

MFZrIrV.jpg!web

在大数据技术出现之前,人们就需要面对这些关于存储的问题,对应的解决方案就是RAID技术。

RAID(独立磁盘冗余阵列)技术主要是为了改善磁盘的存储容量、读写速度,增强磁盘的可用性和容错能力。目前服务器级别的计算机都支持插入多块磁盘(8块或者更多),通过使用RAID技术,实现数据在多块磁盘上的并发读写和数据备份。

常用RAID技术有以下几种,如图所示:

nuIRRj7.jpg!web

常用RAID技术原理图

假设服务器有N块磁盘:

RAID0

数据在从内存缓冲区写入磁盘时,根据磁盘数量将数据分成N份,这些数据同时并发写入N块磁盘,使得数据整体写入速度是一块磁盘的N倍,读取的时候也一样,因此RAID0具有极快的数据读写速度。但是RAID0不做数据备份,N块磁盘中只要有一块损坏,数据完整性就被破坏,所有磁盘的数据都会损坏。

RAID1

数据在写入磁盘时,将一份数据同时写入两块磁盘,这样任何一块磁盘损坏都不会导致数据丢失,插入一块新磁盘就可以通过复制数据的方式自动修复,具有极高的可靠性。

RAID10

结合RAID0和RAID1两种方案,将所有磁盘平均分成两份,数据同时在两份磁盘写入,相当于RAID1,但是在每一份磁盘里面的N/2块磁盘上,利用RAID0技术并发读写,既提高可靠性又改善性能,不过RAID10的磁盘利用率较低,有一半的磁盘用来写备份数据。

RAID3

一般情况下,一台服务器上不会出现同时损坏两块磁盘的情况,在只损坏一块磁盘的情况下,如果能利用其它磁盘的数据恢复损坏磁盘的数据,就能在保证可靠性和性能的同时,大幅提升磁盘利用率。

在数据写入磁盘的时候,将数据分成N-1份,并发写入N-1块磁盘,并在第N块磁盘记录校验数据,任何一块磁盘损坏(包括校验数据磁盘),都可以利用其它N-1块磁盘的数据修复。

但是在数据修改较多的场景中,任何磁盘修改数据都会导致第N块磁盘重写校验数据,频繁写入的后果是第N块磁盘比其它磁盘容易损坏,需要频繁更换,所以RAID3很少在实践中使用。

RAID5

相比RAID3,更多被使用的方案是RAID5。

RAID5和RAID3很相似,但是校验数据不是写入第N块磁盘,而是螺旋式地写入所有磁盘中。这样校验数据的修改也被平均到所有磁盘上,避免RAID3频繁写坏一块磁盘的情况。

RAID6

如果数据需要很高的可靠性,在出现同时损坏两块磁盘的情况下(或者运维管理水平比较落后,坏了一块磁盘但是迟迟没有更换,导致又坏了一块磁盘),仍然需要修复数据,这时候可以使用RAID6。

RAID6和RAID5类似,但是数据只写入N-2块磁盘,并螺旋式地在两块磁盘中写入校验信息(使用不同算法生成)。

在相同磁盘数目(N)的情况下,各种RAID技术的比较如下表所示:

RjEj2qi.jpg!web

几种RAID技术比较

RAID技术有硬件实现,比如专用的RAID卡或者主板直接支持,也可以通过软件实现,在操作系统层面将多块磁盘组成RAID,在逻辑视作一个访问目录。RAID技术在传统关系数据库及文件系统中应用比较广泛,是改善计算机存储特性的重要手段。

RAID技术只是在单台服务器的多块磁盘上组成阵列,大数据需要更大规模的存储空间和访问速度。将RAID技术原理应用到分布式服务器集群上,就形成了Hadoop分布式文件系统HDFS的架构思想。

二、HDFS架构思想

1、HDFS架构原理

和RAID在多个磁盘上进行文件存储及并行读写一样思路,HDFS在一个大规模分布式服务器集群上,对数据进行并行读写及冗余存储。因为HDFS可以部署在一个比较大的服务器集群上,集群中所有服务器的磁盘都可以供HDFS使用,所以整个HDFS的存储空间可以达到PB级容量。HDFS架构如图:

3Eb2im6.jpg!web

HDFS架构

HDFS中关键组件有两个,一个是NameNode,一个是DataNode。

DataNode负责文件数据的存储和读写操作,HDFS将文件数据分割成若干块(block),每个DataNode存储一部分block,这样文件就分布存储在整个HDFS服务器集群中。应用程序客户端(Client)可以并行对这些数据块进行访问,从而使得HDFS可以在服务器集群规模上实现数据并行访问,极大地提高访问速度。实践中HDFS集群的DataNode服务器会有很多台,一般在几百台到几千台这样的规模,每台服务器配有数块磁盘,整个集群的存储容量大概在几PB到数百PB。

NameNode负责整个分布式文件系统的元数据(MetaData)管理,也就是文件路径名,数据block的ID以及存储位置等信息,承担着操作系统中文件分配表(FAT)的角色。HDFS为了保证数据的高可用,会将一个block复制为多份(缺省情况为3份),并将三份相同的block存储在不同的服务器上。这样当有磁盘损坏或者某个DataNode服务器宕机导致其存储的block不能访问的时候,Client会查找其备份的block进行访问。

block多份复制存储如下图所示:

n2aqYvQ.jpg!web

HDFS的block复制备份策略

对于文件/users/sameerp/data/part-0,其复制备份数设置为2,存储的block ID为1,3,block1的两个备份存储在DataNode0和DataNode2两个服务器上,block3的两个备份存储DataNode4和DataNode6两个服务器上,上述任何一台服务器宕机后,每个block都至少还有一个备份存在,不会影响对文件/users/sameerp/data/part-0的访问。

事实上,DataNode会通过心跳和NameNode保持通信,如果DataNode超时未发送心跳,NameNode就会认为这个DataNode已经失效,立即查找这个DataNode上存储的block有哪些,以及这些block还存储在哪些服务器上,随后通知这些服务器再复制一份block到其它服务器上,保证HDFS存储的block备份数符合用户设置的数目,即使再有服务器宕机,也不会丢失数据。

2、HDFS应用

Hadoop分布式文件系统可以像一般的文件系统那样进行访问:使用命令行或者编程语言API进行文件读写操作。我们以HDFS写文件为例看HDFS处理过程,如下图:

yARbqeb.jpg!web

HDFS写文件操作
  • 应用程序Client调用HDFS API,请求创建文件,HDFS API包含在Client进程中;
  • HDFS API将请求参数发送给NameNode服务器,NameNode在meta信息中创建文件路径,并查找DataNode中空闲的block,然后将空闲block的id、对应的DataNode服务器信息返回给Client。因为数据块需要多个备份,所以即使Client只需要一个block的数据量,NameNode也会返回多个NameNode信息;
  • Client调用HDFS API,请求将数据流写出;
  • HDFS API连接第一个DataNode服务器,将Client数据流发送给DataNode,该DataNode一边将数据写入本地磁盘,一边发送给第二个DataNode,同理第二个DataNode记录数据并发送给第三个DataNode;
  • Client通知NameNode文件写入完成,NameNode将文件标记为正常,可以进行读操作了。

HDFS虽然提供了API,但是在实践中,我们很少自己编程直接去读取HDFS中的数据,原因正如开篇提到,在大数据场景下,移动计算比移动数据更划算。

与其写程序去读取分布在这么多DataNode上的数据,不如将程序分发到DataNode上去访问其上的block数据。但是如何对程序进行分发?分发出去的程序又如何访问HDFS上的数据?计算的结果如何处理,如果结果需要合并,该如何合并?

Hadoop提供了对存储在HDFS上的大规模数据进行并行计算的框架,就是MapReduce。

三、MapReduce

Hadoop解决大规模数据分布式计算的方案是MapReduce。MapReduce既是一个编程模型,又是一个计算框架。也就是说,开发人员必须基于MapReduce编程模型进行编程开发,然后将程序通过MapReduce计算框架分发到Hadoop集群中运行。我们先看一下作为编程模型的MapReduce。

1、MapReduce编程模型

MapReduce是一种非常简单又非常强大的编程模型。

简单在于其编程模型只包含map和reduce两个过程,map的主要输入是一对值,经过map计算后输出一对值;然后将相同key合并,形成;再将这个输入reduce,经过计算输出零个或多个对。

但是MapReduce同时又是非常强大的,不管是关系代数运算(SQL计算),还是矩阵运算(图计算),大数据领域几乎所有的计算需求都可以通过MapReduce编程来实现。

我们以WordCount程序为例。WordCount主要解决文本处理中的词频统计问题,就是统计文本中每一个单词出现的次数。如果只是统计一篇文章的词频,几十K到几M的数据,那么写一个程序,将数据读入内存,建一个Hash表记录每个词出现的次数就可以了,如下图:

IFviMfn.jpg!web

小数据量的词频统计

但是如果想统计全世界互联网所有网页(数万亿计)的词频数(这正是google这样的搜索引擎典型需求),你不可能写一个程序把全世界的网页都读入内存,这时候就需要用MapReduce编程来解决。

public class WordCount { 
  public static class TokenizerMapper 
       extends Mapper<Object, Text, Text, IntWritable>{ 
    private final static IntWritable one = new IntWritable(1); 
    private Text word = new Text(); 
    public void map(Object key, Text value, Context context 
                    ) throws IOException, InterruptedException { 
      StringTokenizer itr = new StringTokenizer(value.toString()); 
      while (itr.hasMoreTokens()) { 
        word.set(itr.nextToken()); 
        context.write(word, one); 
      } 
    } 
  } 
  public static class IntSumReducer 
       extends Reducer<Text,IntWritable,Text,IntWritable> { 
    private IntWritable result = new IntWritable(); 
    public void reduce(Text key, Iterable<IntWritable> values, 
                       Context context 
                       ) throws IOException, InterruptedException { 
      int sum = 0; 
      for (IntWritable val : values) { 
        sum += val.get(); 
      } 
      result.set(sum); 
      context.write(key, result); 
    } 
  }} 

其核心是一个map函数,一个reduce函数。

map函数的输入主要是一个对,在这个例子里,value是要统计的所有文本中的一行数据,key在这里不重要,我们忽略。

public void map(Object key, Text value, Context context 
                    ) 

map函数的计算过程就是,将这行文本中的单词提取出来,针对每个单词输出一个这样的对。

MapReduce计算框架会将这些收集起来,将相同的word放在一起,形成>这样的数据,然后将其输入给reduce函数。

public void reduce(Text key, Iterable<IntWritable> values, 
                      Context context 
                      ) 

这里的reduce的输入参数values就是由很多个1组成的集合,而key就是具体的单词word。

reduce函数的计算过程就是,将这个集合里的1求和,再将单词(word)和这个和(sum)组成一个()输出。每一个输出就是一个单词和它的词频统计总和。

假设有两个block的文本数据需要进行词频统计,MapReduce计算过程如下图:

qmIb2mb.jpg!web

MapReduce计算过程

一个map函数可以针对一部分数据进行运算,这样就可以将一个大数据切分成很多块(这也正是HDFS所做的),MapReduce计算框架为每个块分配一个map函数去计算,从而实现大数据的分布式计算。

2、MapReduce计算框架架构原理

前面提到MapReduce编程模型将大数据计算过程切分为map和reduce两个阶段,在map阶段为每个数据块分配一个map计算任务,然后将所有map输出的key进行合并,相同的key及其对应的value发送给同一个reduce任务去处理。

这个过程有两个关键问题需要处理:

  1. 如何为每个数据块分配一个map计算任务,代码是如何发送数据块所在服务器的,发送过去是如何启动的,启动以后又如何知道自己需要计算的数据在文件什么位置(数据块id是什么)?
  2. 处于不同服务器的map输出的 ,如何把相同的key聚合在一起发送给reduce任务?

这两个关键问题正好对应前面文章中“MapReduce计算过程”一图中两处“MapReduce框架处理”:

IzqeA3j.jpg!web

MapReduce计算过程中两处MapReduce框架处理

我们先看下MapReduce是如何启动处理一个大数据计算应用作业的:

MapReduce作业启动和运行机制

我们以Hadoop1为例,MapReduce运行过程涉及以下几类关键进程:

  • 大数据应用进程: 启动用户MapReduce程序的主入口,主要指定Map和Reduce类、输入输出文件路径等,并提交作业给Hadoop集群。
  • JobTracker进程: 根据要处理的输入数据量启动相应数量的map和reduce进程任务,并管理整个作业生命周期的任务调度和监控。JobTracker进程在整个Hadoop集群全局唯一。
  • TaskTracker进程: 负责启动和管理map进程以及reduce进程。因为需要每个数据块都有对应的map函数,TaskTracker进程通常和HDFS的DataNode进程启动在同一个服务器,也就是说,Hadoop集群中绝大多数服务器同时运行DataNode进程和TaskTacker进程。

如下图所示:

z673e2a.jpg!web

MapReduce作业启动和运行机制

具体作业启动和计算过程如下:

  • 应用进程将用户作业jar包存储在HDFS中,将来这些jar包会分发给Hadoop集群中的服务器执行MapReduce计算;
  • 应用程序提交job作业给JobTracker;
  • JobTacker根据作业调度策略创建JobInProcess树,每个作业都会有一个自己的JobInProcess树;
  • JobInProcess根据输入数据分片数目(通常情况就是数据块的数目)和设置的reduce数目创建相应数量的TaskInProcess;
  • TaskTracker进程和JobTracker进程进行定时通信;
  • 如果TaskTracker有空闲的计算资源(空闲CPU核),JobTracker就会给它分配任务。分配任务的时候会根据TaskTracker的服务器名字匹配在同一台机器上的数据块计算任务给它,使启动的计算任务正好处理本机上的数据,以实现我们一开始就提到的“移动计算比移动数据更划算”;
  • TaskRunner收到任务后根据任务类型(map还是reduce),任务参数(作业jar包路径,输入数据文件路径,要处理的数据在文件中的起始位置和偏移量,数据块多个备份的DataNode主机名等)启动相应的map或者reduce进程;
  • map或者reduce程序启动后,检查本地是否有要执行任务的jar包文件,如果没有,就去HDFS上下载,然后加载map或者reduce代码开始执行;
  • 如果是map进程,从HDFS读取数据(通常要读取的数据块正好存储在本机);如果是reduce进程,将结果数据写出到HDFS。

通过以上过程,MapReduce可以将大数据作业计算任务分布在整个Hadoop集群中运行,每个map计算任务要处理的数据通常都能从本地磁盘上读取到,而用户要做的仅仅是编写一个map函数和一个reduce函数就可以了,根本不用关心这两个函数是如何被分布启动到集群上的,数据块又是如何分配给计算任务的。这一切都由MapReduce计算框架完成。

MapReduce数据合并与连接机制

在WordCount例子中,要统计相同单词在所有输入数据中出现的次数,而一个map只能处理一部分数据,一个热门单词几乎会出现在所有的map中,这些单词必须要合并到一起进行统计才能得到正确的结果。

事实上,几乎所有的大数据计算场景都需要处理数据关联的问题,简单如WordCount只要对key进行合并就可以了,复杂如数据库的join操作,需要对两种类型(或者更多类型)的数据根据key进行连接。

MapReduce计算框架处理数据合并与连接的操作就在map输出与reduce输入之间,这个过程有个专门的词汇来描述,叫做shuffle。

nMbyEnM.jpg!web

MapReduce shuffle过程

每个map任务的计算结果都会写入到本地文件系统,等map任务快要计算完成的时候,MapReduce计算框架会启动shuffle过程,在map端调用一个Partitioner接口,对map产生的每个进行reduce分区选择,然后通过http通信发送给对应的reduce进程。这样不管map位于哪个服务器节点,相同的key一定会被发送给相同的reduce进程。reduce端对收到的进行排序和合并,相同的key放在一起,组成一个传递给reduce执行。

MapReduce框架缺省的Partitioner用key的哈希值对reduce任务数量取模,相同的key一定会落在相同的reduce任务id上,实现上,这样的Partitioner代码只需要一行,如下所示:

/** Use {@link Object#hashCode()} to partition. */ 
public int getPartition(K2 key, V2 value, int numReduceTasks) { 
    return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks; 
 } 

shuffle是大数据计算过程中发生奇迹的地方,不管是MapReduce还是Spark,只要是大数据批处理计算,一定会有shuffle过程,让数据关联起来,数据的内在关系和价值才会呈现出来。不理解shuffle,就会在map和reduce编程中产生困惑,不知道该如何正确设计map的输出和reduce的输入。shuffle也是整个MapReduce过程中最难最消耗性能的地方,在MapReduce早期代码中,一半代码都是关于shuffle处理的。

3、工具——Hive

既然MapReduce计算模型可以解决绝大多数的数据分析与数据挖掘任务,那么对于如下我们常见的一条SQL分析语句,MapReduce如何编程实现?

SELECT pageid, age, count(1) FROM pv_users GROUP BY pageid, age; 

这是一条非常常见的SQL统计分析语句,统计不同年龄的用户访问不同网页的兴趣偏好,对于产品运营和设计很有价值。具体数据输入和执行结果如下图示例:

aYZ3QvR.jpg!web

group by输入输出示例

左边是要分析的数据表,右边是分析结果。实际上把左边表相同的行累计求和,就得到右边的表了,看起来跟WordCount的计算很一样。确实也是这样,我们看下这条SQL语句的MapReduce的计算过程,map和reduce函数的输入输出以及函数处理过程分别是什么样。

首先,看下map函数的输入key和value,key不重要,忽略掉,value就是左边表中每一行的数据,<1, 25>这样。map函数的输出就是以输入的value作为key,value统一设为1,<<1, 25>, 1>这样。

map函数的输出经过shuffle以后,相同的key及其对应的value被放在一起组成一个,作为输入交给reduce函数处理。如<<2, 25>, 1>被map函数输出两次,那么到了reduce这里,就变成输入<<2, 25>, <1, 1>>,key是<2, 25>, value集合是<1, 1>。在reduce函数内部,value集合里所有的数字被相加,然后输出。reduce的输出就是<<2, 25>, 2>。

计算过程如下图示例:

vAVNzyE.jpg!web

group by的MapReduce计算过程示例

这样一条很有实用价值的SQL就这样被很简单的MapReduce计算过程处理好了。在数据仓库中,SQL是最常用的分析工具,那么有没有能够自动将SQL生成MapReduce代码的工具呢?这个工具就是Hadoop大数据仓库Hive。

自动将SQL生成MapReduce代码的工具——Hive

Hive能够直接处理用户输入的SQL语句(Hive的SQL语法和数据库标准SQL略有不同),调用MapReduce计算框架完成数据分析操作。具体架构如下图:

MRZbQzr.jpg!web

Hive架构

用户通过Hive的Client(Hive的命令行工具,JDBC等)向Hive提交SQL命令。如果是创建数据表的DDL语句,Hive就会通过执行引擎Driver将数据表的信息记录在Metastore组件中,这个组件通常用一个关系数据库实现,记录表名、字段名、字段类型、关联HDFS文件路径等这些数据库的meta信息(元信息)。

如果用户提交的是查询分析数据的DQL语句,Driver就会将该语句提交给自己的编译器Compiler进行语法分析、语法解析、语法优化等一系列操作,最后生成一个MapReduce执行计划。然后根据该执行计划生成一个MapReduce的作业,提交给Hadoop MapReduce计算框架处理。

对于一个较简单的SQL命令,比如:

SELECT * FROM status_updates WHERE status LIKE ‘michael jackson’; 

其对应的Hive执行计划如下图:

YnqEbyv.jpg!web

Hive执行计划示例

Hive内部预置了很多函数,Hive的执行计划就是根据SQL语句生成这些函数的DAG(有向无环图),然后封装进MapReduce的map和reduce函数中。这个例子中,map函数调用了三个Hive内置函数TableScanOpoerator、FilterOperator、FileOutputOperator,就完成了map计算,而且无需reduce函数。

除了上面这些简单的聚合(group by)、过滤(where)操作,Hive还能执行连接(join on)操作。上面例子中,pv_users表的数据在实际中是无法直接得到的,因为pageid数据来自用户访问日志,每个用户进行一次页面浏览,就会生成一条访问记录,保存在page_view表中。而年龄age信息则记录在用户表user中。如下图:

RFraimr.jpg!web

page_view表和user表示例

这两张表都有一个相同的字段userid,根据这个字段可以将两张表连接起来,生成前面的pv_users表,SQL命令如下:

SELECT pv.pageid, u.age FROM page_view pv JOIN user u ON (pv.userid = u.userid); 

同样,这个SQL命令也可以转化为MapReduce计算,如下图:

yQVfieq.jpg!web

join的MapReduce计算过程示例

join的MapReduce计算过程和前面的group by稍有不同,因为join涉及两张表,来自两个文件(夹),所以需要在map输出的时候进行标记,比如来自第一张表的输出value就记录为<1, X>,这里的1表示数据来自第一张表。这样经过shuffle以后,相同的key被输入到同一个reduce函数,就可以根据表的标记对value数据求笛卡尔积,输出就join的结果。

在实践中,工程师并不需要经常编写MapReduce程序,因为网站最主要的大数据处理就是SQL分析,在Facebook,据说90%以上的MapReduce任务都是Hive产生的。Hive在大数据应用中的作用非常重要。

作者介绍

李智慧,《大型网站技术架构:核心原理与案例分析》作者。曾供职于阿里巴巴与英特尔亚太研发中心,从事大型网站与大数据方面的研发工作,目前在做企业级区块链方面的开发工作。


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK