4

openGauss数据库源码解析系列文章——openGauss简介(二)

 3 years ago
source link: https://my.oschina.net/gaussdb/blog/5033658
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

上一篇openGauss数据库源码解析系列文章--openGauss简介(上)中,从openGauss概述、应用场景、系统结构、代码结构四个方面对openGauss进行了初步介绍。其中,openGauss的代码结构介绍了数据库系统通信管理、SQL引擎两方面内容,本篇接着从代码结构第三方面的内容——存储引擎,以及openGauss的价值特性方面展开介绍。

(三)存储引擎

openGauss存储引擎是可插拔、自组装的,支持多个存储引擎来满足不同场景的业务诉求,目前支持行存储引擎、列存储引擎和内存引擎。

早期计算机程序通过文件系统管理数据,到了20世纪60年代这种方式就开始不能满足数据管理要求了,用户逐渐对数据并发写入的完整性、高效检索提出更高的要求。由于机械磁盘的随机读写性能问题,从20世纪80年代开始,大多数数据库一直在围绕着减少随机读写磁盘进行设计。主要思路是把对数据页面的随机写盘转化为对WAL(Write Ahead Log,预写式日志)日志的顺序写盘,WAL日志持久化完成,事务就算提交成功,数据页面异步刷盘。但是随着内存容量变大、保电内存、非易失性内存的发展,以及SSD技术逐渐的成熟,IO性能极大提高,经历了几十年发展的存储引擎需要调整架构来发挥SSD的性能和充分利用大内存计算的优势。随着互联网、移动互联网的发展,数据量剧增,业务场景多样化,一套固定不变的存储引擎不可能满足所有应用场景的诉求。因此现在的DBMS需要设计支持多种存储引擎,根据业务场景来选择合适的存储模型。

1. 数据库存储引擎要解决的问题

  • 存储的数据必须要保证ACID:原子性、一致性、隔离性、持久性。
  • 高并发读写,高性能。
  • 数据高效存储和检索能力。

2. openGauss存储引擎概述

openGauss整个系统设计支持多个存储引擎来满足不同场景的业务诉求。当前openGauss存储引擎有以下3种:

  • 行存储引擎。主要面向OLTP(Online Transaction Processing,在线交易处理)场景设计,例如订货发货,银行交易系统。
  • 列存储引擎。主要面向OLAP(Online Analytical Processing,联机分析处理)场景设计,例如数据统计报表分析。
  • 内存引擎。主要面向极致性能场景设计,例如银行风控场景。

创建表的时候可以指定行存储引擎、列存引擎的表、内存引擎的表,支持一个事务里包含对3种引擎表的DML(Data Manipulation Language,数据操作语言)操作,可以保证事务ACID性质。

1) storage源码组织

storage源码目录为:/src/gausskernel/storage。storage源码文件如表1所示。

表1 storage源码文件 |storage |storage源码文件 | |--|--| |access | 基础行存储引擎方法cbtree、hash、heap、index、... | | buffer | 缓冲区| | freespace |空闲空间管理 | |ipc | 进程内交互 | | large_object |大对象处理 | | remote |远程读 | | replication | 复制备份| |smgr | 存储管理 | | cmgr | 公共缓存方法 | |cstore | 列存储引擎 | |dfs | 分布式文件系统| | file | 文件类 | | lmgr | 锁管理 | | mot | 内存引擎 | | page | 数据页 |

2) storage主流程

storage主流程代码如下。

/* smgr/smgr.cpp, 存储管理 */
...
/* 文件管理函数列表,包含磁盘初始化、开关、同步等操作函数 */
static const f_smgr g_smgrsw[] = {   
 /* 磁盘*/    
 {mdinit, 
        NULL, 
       mdclose, 
       mdcreate, 
       mdexists, 
       mdunlink, 
       mdextend, 
       mdprefetch, 
       mdread, 
       mdwrite, 
       mdwriteback, 
       mdnblocks, 
       mdtruncate,  
       mdimmedsync, 
       mdpreckpt,  
       mdsync, 
       mdpostckpt,
       mdasyncread, 
       mdasyncwrite}};
 /* 
 *  存储管理初始化 * 
 * 当服务器后端启动时调用 */
 * void smgrinit(void)
 * {    int i;
 * /* 初始化所有存储相关管理器 */    
 * for (i = 0; i < SMGRSW_LENGTH; i++) {
 if (g_smgrsw[i].smgr_init) {            (*(g_smgrsw[i].smgr_init))();        }    }    
  /* 登记存储管理终止程序 */  
    if (!IS_THREAD_POOL_SESSION) {        
    on_proc_exit(smgrshutdown, 0);   
     }}/* 
     * 当后端服务关闭时,执行存储管理关闭代码 */
     * static void smgrshutdown(int code, Datum arg){    int i;
     * /* 关闭所有存储关联服务 */
     *     for (i = 0; i < SMGRSW_LENGTH; i++) { 
      if (g_smgrsw[i].smgr_shutdown) {  
     (*(g_smgrsw[i].smgr_shutdown))(); 
     } 
      }
      }

3. 行存储引擎

openGauss的行存储引擎设计上支持MVCC(Multi-Version Concurrency Control,多版本并发控制),采用集中式垃圾版本回收机制,可以提供OLTP业务系统的高并发读写要求。支持存储计算分离架构,存储层异步回放日志。如图1所示。

在这里插入图片描述

|图1 行存储架构|

行存储引擎的关键技术有:

  • 基于CSN(Commit Sequence Number,待提交事务的序列号,它是一个64位递增无符号数)的MVCC并发控制机制,集中式垃圾数据清理。
  • 并行刷日志,并行恢复。传统数据库一般都采用串行刷日志的设计,因为日志有顺序依赖关系,例如一个是事务产生的redo/undo log是有前后依赖关系的。openGauss的日志系统采用多个logwriter线程并行写的机制,充分发挥SSD的多通道IO能力。
  • 基于大内存设计的Buffer manager。

行存储buffer主流程代码如下。

/* buffer/bufmgr.cpp, 基础行存储管理 */
...
/* 查找或创建一个缓冲区 */
Buffer ReadBufferExtended(
    Relation reln, ForkNumber fork_num, BlockNumber block_num, ReadBufferMode mode, BufferAccessStrategy strategy)
{
    bool hit = false;
    Buffer buf;

    if (block_num == P_NEW) {
        STORAGE_SPACE_OPERATION(reln, BLCKSZ);
    }

    /* 以smgr(存储管理器)级别打开一个缓冲区 */
    RelationOpenSmgr(reln);

    /* 拒绝读取非局部临时关系的请求,因为可能会获得监控不到的错误数据 */
    if (RELATION_IS_OTHER_TEMP(reln) && fork_num <= INIT_FORKNUM)
        ereport(ERROR,
            (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("cannot access temporary tables of other sessions")));

    /* 读取缓冲区,更新pgstat 数量反馈cache 命中与否情况 */
    pgstat_count_buffer_read(reln);
    pgstatCountBlocksFetched4SessionLevel();
    buf = ReadBuffer_common(reln->rd_smgr, reln->rd_rel->relpersistence, fork_num, block_num, mode, strategy, &hit);
    if (hit) {
        pgstat_count_buffer_hit(reln);
    }
    return buf;
}

/* 释放一个缓冲区 */
void ReleaseBuffer(Buffer buffer)
{
    BufferDesc* buf_desc = NULL;
    PrivateRefCountEntry* ref = NULL;
    /* 错误释放处理 */
    if (!BufferIsValid(buffer)) {
        ereport(ERROR, (errcode(ERRCODE_INVALID_BUFFER), (errmsg("bad buffer ID: %d", buffer))));
    }

    ResourceOwnerForgetBuffer(t_thrd.utils_cxt.CurrentResourceOwner, buffer);

    if (BufferIsLocal(buffer)) {
        Assert(u_sess->storage_cxt.LocalRefCount[-buffer - 1] > 0);
        u_sess->storage_cxt.LocalRefCount[-buffer - 1]--;
        return;
    }
    /* 释放当前缓冲区 */
    buf_desc = GetBufferDescriptor(buffer - 1);

    PrivateRefCountEntry *free_entry = NULL;
    ref = GetPrivateRefCountEntryFast(buffer, free_entry);
    if (ref == NULL) {
        ref = GetPrivateRefCountEntrySlow(buffer, false, false, free_entry);}
    Assert(ref != NULL);
    Assert(ref->refcount > 0);

    if (ref->refcount > 1) {
        ref->refcount--;
    } else {
        UnpinBuffer(buf_desc, false);
    }
}
/* 标记写脏缓冲区 */
void MarkBufferDirty(Buffer buffer)
{
    BufferDesc* buf_desc = NULL;
    uint32 buf_state;
    uint32 old_buf_state;

    if (!BufferIsValid(buffer)) {
        ereport(ERROR, (errcode(ERRCODE_INVALID_BUFFER), (errmsg("bad buffer ID: %d", buffer))));}

    if (BufferIsLocal(buffer)) {
        MarkLocalBufferDirty(buffer);
        return;
    }

    buf_desc = GetBufferDescriptor(buffer - 1);

    Assert(BufferIsPinned(buffer));
    Assert(LWLockHeldByMe(buf_desc->content_lock));

    old_buf_state = LockBufHdr(buf_desc);

    buf_state = old_buf_state | (BM_DIRTY | BM_JUST_DIRTIED);

    /* 将未入队的脏页入队 */
    if (g_instance.attr.attr_storage.enableIncrementalCheckpoint) {
        for (;;) {
            buf_state = old_buf_state | (BM_DIRTY | BM_JUST_DIRTIED);
            if (!XLogRecPtrIsInvalid(pg_atomic_read_u64(&buf_desc->rec_lsn))) {
                break;
            }

            if (!is_dirty_page_queue_full(buf_desc) && push_pending_flush_queue(buffer)) {
                break;
            }

            UnlockBufHdr(buf_desc, old_buf_state);
            pg_usleep(TEN_MICROSECOND);
            old_buf_state = LockBufHdr(buf_desc);
        }
    }

    UnlockBufHdr(buf_desc, buf_state);

    /* 如果缓冲区不是“脏”状态,则更新相关计数 */
    if (!(old_buf_state & BM_DIRTY)) {
        t_thrd.vacuum_cxt.VacuumPageDirty++;
        u_sess->instr_cxt.pg_buffer_usage->shared_blks_dirtied++;

        pgstatCountSharedBlocksDirtied4SessionLevel();

        if (t_thrd.vacuum_cxt.VacuumCostActive) {
            t_thrd.vacuum_cxt.VacuumCostBalance += u_sess->attr.attr_storage.VacuumCostPageDirty;
        }
    }
}

4. 列存储引擎

传统行存储数据压缩率低,必须按行读取,即使读取一列也必须读取整行。openGauss创建表的时候,可以指定行存储还是列存储。列存储表也支持DML操作,也支持MVCC。列存储架构如图2所示。

在这里插入图片描述

|图2 列存储架构|

列存储引擎有以下优势:

  • 列的数据特征比较相似,适合压缩,压缩比很高。
  • 当表列的个数比较多,但是访问的列个数比较少时,列存可以按需读取列数据,大大减少不必要的读IO,提高查询性能。
  • 基于列批量数据Vector(向量)的运算,CPU的cache命中率比较高,性能比较好。列存储引擎更适合OLAP大数据统计分析的场景。

1) 列存储源码组织

列存储源码目录为:/src/gausskernel/storage/cstore。列存储源码文件如表2所示。

表2 列存储源码文件

cstore 列存储源码文件 compression 数据压缩与解压 cstore_allocspace 空间分配 cstore_am 列存储公共API cstore_***_func 支持函数 cstore_psort 列内排序 cu 数据压缩单元 cucache_mgr 缓存管理器 custorage 持久化存储 cstore_delete 删除方法 cstore_update 更新方法 cstore_vector 缓冲区实现 cstore_rewrite SQL重写 cstore_insert 插入方法 cstore_mem_alloc 内存分配

2) 列存储主要API

列存储主要API代码如下。

/*  cstore_am.cpp */
...
/*  扫描 APIs */
    void InitScan(CStoreScanState *state, Snapshot snapshot = NULL);
    void InitReScan();
    void InitPartReScan(Relation rel);
    bool IsEndScan() const;

    /*  延迟读取APIs */
    bool IsLateRead(int id) const;
    void ResetLateRead();

    /*  更新列存储扫描计时标记*/
    void SetTiming(CStoreScanState *state);

    /*  列存储扫描*/
    void ScanByTids(_in_ CStoreIndexScanState *state, _in_ VectorBatch *idxOut, _out_ VectorBatch *vbout);
    void CStoreScanWithCU(_in_ CStoreScanState *state, BatchCUData *tmpCUData, _in_ bool isVerify = false);

    /*  加载数据压缩单元描述信息  */
    bool LoadCUDesc(_in_ int col, __inout LoadCUDescCtl *loadInfoPtr, _in_ bool prefetch_control, _in_ Snapshot snapShot = NULL);

    /*  从描述表中获取数据压缩单元描述*/
    bool GetCUDesc(_in_ int col, _in_ uint32 cuid, _out_ CUDesc *cuDescPtr, _in_ Snapshot snapShot = NULL);

    /*  获取元组删除信息*/
    void GetCUDeleteMaskIfNeed(_in_ uint32 cuid, _in_ Snapshot snapShot);

    bool GetCURowCount(_in_ int col, __inout LoadCUDescCtl *loadCUDescInfoPtr, _in_ Snapshot snapShot);
    /* 获取实时行号。 */
    int64 GetLivedRowNumbers(int64 *deadrows);

    /*  获得数据压缩单元*/
    CU *GetCUData(_in_ CUDesc *cuDescPtr, _in_ int colIdx, _in_ int valSize, _out_ int &slotId);

    CU *GetUnCompressCUData(Relation rel, int col, uint32 cuid, _out_ int &slotId, ForkNumber forkNum = MAIN_FORKNUM,
                            bool enterCache = true) const;

    /*  缓冲向量填充 APIs */
    int FillVecBatch(_out_ VectorBatch *vecBatchOut);

    /*  填充列向量*/
    template <bool hasDeadRow, int attlen>
    int FillVector(_in_ int colIdx, _in_ CUDesc *cu_desc_ptr, _out_ ScalarVector *vec);

    template <int attlen>
    void FillVectorByTids(_in_ int colIdx, _in_ ScalarVector *tids, _out_ ScalarVector *vec);

    template <int attlen>
    void FillVectorLateRead(_in_ int seq, _in_ ScalarVector *tids, _in_ CUDesc *cuDescPtr, _out_ ScalarVector *vec);

    void FillVectorByIndex(_in_ int colIdx, _in_ ScalarVector *tids, _in_ ScalarVector *srcVec, _out_ ScalarVector *destVec);

    /*  填充系统列*/
    int FillSysColVector(_in_ int colIdx, _in_ CUDesc *cu_desc_ptr, _out_ ScalarVector *vec);

    template <int sysColOid>
    void FillSysVecByTid(_in_ ScalarVector *tids, _out_ ScalarVector *destVec);

    template <bool hasDeadRow>
    int FillTidForLateRead(_in_ CUDesc *cuDescPtr, _out_ ScalarVector *vec);

    void FillScanBatchLateIfNeed(__inout VectorBatch *vecBatch);

    /* 设置数据压缩单元范围以支持索引扫描 */
    void SetScanRange();

    /*  判断行是否可用*/
    bool IsDeadRow(uint32 cuid, uint32 row) const;

    void CUListPrefetch();
    void CUPrefetch(CUDesc *cudesc, int col, AioDispatchCUDesc_t **dList, int &count, File *vfdList);

    /* 扫描函数 */
    typedef void (CStore::*ScanFuncPtr)(_in_ CStoreScanState *state, _out_ VectorBatch *vecBatchOut);
    void RunScan(_in_ CStoreScanState *state, _out_ VectorBatch *vecBatchOut);
    int GetLateReadCtid() const;
    void IncLoadCuDescCursor();

5. 内存引擎

openGauss引入了MOT(Memory-Optimized Table,内存优化表)存储引擎,它是一种事务性行存储,针对多核和大内存服务器进行了优化。MOT是openGauss数据库出色的生产级特性(Beta版本),它为事务性工作负载提供更高的性能。MOT完全支持ACID特性,并包括严格的持久性和高可用性支持。企业可以在关键任务、性能敏感的在线事务处理(OLTP)中使用MOT,以实现高性能、高吞吐、可预测低延迟以及多核服务器的高利用率。MOT尤其适合在多路和多核处理器的现代服务器上运行,例如基于ARM(Advanced RISC Machine,高级精简指令集计算机器)/鲲鹏处理器的华为TaiShan服务器,以及基于x86的戴尔或类似服务器。MOT存储引擎如图3所示。

在这里插入图片描述

|图3 openGauss内存引擎|

MOT与基于磁盘的普通表并排创建。MOT的有效设计实现了几乎完全的SQL覆盖,并且支持完整的数据库功能集,如存储过程和自定义函数。通过完全存储在内存中的数据和索引、非统一内存访问感知(NUMA-aware)设计、消除锁和锁存争用的算法以及查询原生编译,MOT可提供更快的数据访问和更高效的事务执行。MOT有效的几乎无锁的设计和高度调优的实现,使其在多核服务器上实现了卓越的近线性吞吐量扩展。

MOT的高性能(查询和事务延迟)、高可扩展性(吞吐量和并发量)等特点,在某些情况下低成本(高资源利用率)方面拥有显著优势。

  • 低延迟(Low Latency):提供快速的查询和事务响应时间。
  • 高吞吐量(High Throughput):支持峰值和持续高用户并发。
  • 高资源利用率(High Resource Utilization):充分利用硬件。

MOT的关键技术如下:

  • 内存优化数据结构:以实现高并发吞吐量和可预测的低延迟为目标,所有数据和索引都在内存中,不使用中间页缓冲区,并使用持续时间最短的锁。数据结构和所有算法都是专门为内存设计而优化的。
  • 免锁事务管理:MOT在保证严格一致性和数据完整性的前提下,采用乐观的策略实现高并发和高吞吐。在事务过程中,MOT不会对正在更新的数据行的任何版本加锁,从而大大降低了一些大内存系统中的争用。
  • 免锁索引:由于内存表的数据和索引完全存储在内存中,因此拥有一个高效的索引数据结构和算法非常重要。MOT索引机制基于领域前沿的树结构Masstree,一种用于多核系统的快速和可扩展的键值(Key Value,KV)存储索引,以B+树的Trie实现。通过这种方式,高并发工作负载在多核服务器上可以获得卓越的性能。同时MOT应用了各种先进的技术以优化性能,如优化锁方法、高速缓存感知和内存预取。
  • NUMA-aware的内存管理:MOT内存访问的设计支持非统一内存访问(NUMA,Non-Uniform Memory Access)感知。NUMA-aware算法增强了内存中数据布局的性能,使线程访问物理上连接到线程运行的核心的内存。这是由内存控制器处理的,不需要通过使用互连(如英特尔QPI(Quick Path Interconnect,快速路径互连))进行额外的跳转。MOT的智能内存控制模块,为各种内存对象预先分配了内存池,提高了性能、减少了锁、保证了稳定性。
  • 高效持久性:日志和检查点是实现磁盘持久化的关键能力,也是ACID的关键要求之一。目前所有的磁盘(包括SSD和NVMe(Non-Volatile Memory express,非易失性高速传输总线))都明显慢于内存,因此持久化是基于内存数据库引擎的瓶颈。作为一个基于内存的存储引擎,MOT的持久化设计必须实现各种各样的算法优化,以确保持久化的同时还能达到设计时的速度和吞吐量目标。
  • 高SQL覆盖率和功能集:MOT通过扩展的openGauss外部数据封装(Foreign Data Wrapper,FDW)以及索引,几乎支持完整的SQL范围,包括存储过程、用户定义函数和系统函数调用。
  • 使用PREPARE语句的查询原生编译:通过使用PREPARE客户端命令,可以以交互方式执行查询和事务语句。这些命令已被预编译成原生执行格式,也称为Code-Gen或即时(Just-in-Time,JIT)编译。这样可以实现平均30%的性能提升。
  • MOT和openGauss数据库的无缝集成:MOT是一个高性能的面向内存优化的存储引擎,已集成在openGauss软件包中。MOT的主内存引擎和基于磁盘的存储引擎并存,以支持多种应用场景,同时在内部重用数据库辅助服务,如WAL重做日志、复制、检查点和恢复高可用性等。

1) 内存引擎源码组织

内存引擎源码目录为:/src/gausskernel/storage/mot。内存引擎源码文件如表3所示。

表3 内存引擎源码文件

mot 内存引擎源码文件 concurrency_control 并发控制管理 infra 辅助与配置函数 memory 内存数据管理 storage 持久化存储 system 全局控制API utils 日志等通用方法

2) 内存引擎主流程

内存引擎主流程代码如下。

/* system/mot_engine.cpp */
...
/* 创建内存引擎实例 */
MOTEngine* MOTEngine::CreateInstance(
    const char* configFilePath /* = nullptr */, int argc /* = 0 */, char* argv[] /* = nullptr */)
{
    if (m_engine == nullptr) {
        if (CreateInstanceNoInit(configFilePath, argc, argv) != nullptr) {
            bool result = m_engine->LoadConfig();
            if (!result) {
                MOT_REPORT_ERROR(MOT_ERROR_INTERNAL, "System Startup", "Failed to load Engine configuration");
            } else {
                result = m_engine->Initialize();
                if (!result) {
                    MOT_REPORT_ERROR(MOT_ERROR_INTERNAL, "System Startup", "Engine initialization failed");
                }
            }

            if (!result) {
                DestroyInstance();
                MOT_ASSERT(m_engine == nullptr);
            }
        }
    }
    return m_engine;
}
/* 内存引擎初始化 */
bool MOTEngine::Initialize()
{
    bool result = false;
/* 初始化应用服务,开始后台任务 */
    do {  // instead of goto
        m_initStack.push(INIT_CORE_SERVICES_PHASE);
        result = InitializeCoreServices();
        CHECK_INIT_STATUS(result, "Failed to Initialize core services");

        m_initStack.push(INIT_APP_SERVICES_PHASE);
        result = InitializeAppServices();
        CHECK_INIT_STATUS(result, "Failed to Initialize applicative services");

        m_initStack.push(START_BG_TASKS_PHASE);
        result = StartBackgroundTasks();
        CHECK_INIT_STATUS(result, "Failed to start background tasks");
    } while (0);

    if (result) {
        MOT_LOG_INFO("Startup: MOT Engine initialization finished successfully");
        m_initialized = true;
    } else {
        MOT_LOG_PANIC("Startup: MOT Engine initialization failed!");
        /* 调用方应在失败后调用DestroyInstance() */
    }

    return result;
}
/* 销毁内存引擎实例 */
void MOTEngine::Destroy()
{
    MOT_LOG_INFO("Shutdown: Shutting down MOT Engine");
    while (!m_initStack.empty()) {
        switch (m_initStack.top()) {
            case START_BG_TASKS_PHASE:
                StopBackgroundTasks();
                break;

            case INIT_APP_SERVICES_PHASE:
                DestroyAppServices();
                break;

            case INIT_CORE_SERVICES_PHASE:
                DestroyCoreServices();
                break;

            case LOAD_CFG_PHASE:
                break;

            case INIT_CFG_PHASE:
                DestroyConfiguration();
                break;

            default:
                break;
        }
        m_initStack.pop();
    }
    ClearErrorStack();
    MOT_LOG_INFO("Shutdown: MOT Engine shutdown finished");
}

Gauss松鼠会是汇集数据库爱好者和关注者的大本营,大家共同学习、探索、分享数据库前沿知识和技术,互助解决问题,共建数据库技术交流圈。


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK