3

CMU15445 (Fall 2019) 之 Project#3 - Query Execution 详解 - 之一Yo

 1 year ago
source link: https://www.cnblogs.com/zhiyiYo/p/16466144.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
api.php

CMU15445 (Fall 2019) 之 Project#3 - Query Execution 详解

发表于 2022-07-11 13:50 | 分类 C++ | 阅读量 27

经过前面两个实验的铺垫,终于到了给数据库系统添加执行查询计划功能的时候了。给定一条 SQL 语句,我们可以将其中的操作符组织为一棵树,树中的每一个父节点都能从子节点获取 tuple 并处理成操作符想要的样子,下图的根节点 ππ 会输出最终的查询结果。

2065884-20220711092623489-1931858129.png

对于这样一棵树,我们获取查询结果的方式有许多种,包括:迭代模型、物化模型和向量化模型。本次实验使用的是迭代模型,每个节点都会实现一个 Next() 函数,用于向父节点提供一个 tuple。从根节点开始,每个父节点每次向子节点索取一个 tuple 并处理之后输出:

2065884-20220711094136300-1353836918.png

实验主要有三个任务:目录表、执行器和用线性探测哈希表重新实现 hash join 执行器,下面会一个个介绍这几个任务的完成过程。

目录表可以根据 table_oid 或者 table_name 返回表的元数据,其中最重要的一个字段就是 table_,该字段表示一张表,用于查询、插入、修改和删除 tuple:

复制using table_oid_t = uint32_t;
using column_oid_t = uint32_t;

struct TableMetadata {
  TableMetadata(Schema schema, std::string name, std::unique_ptr<TableHeap> &&table, table_oid_t oid)
      : schema_(std::move(schema)), name_(std::move(name)), table_(std::move(table)), oid_(oid) {}
  Schema schema_;
  std::string name_;
  std::unique_ptr<TableHeap> table_;
  table_oid_t oid_;
};

目录表类 SimpleCatalog 中有三个要求我们实现的方法:CreateTableGetTable(const std::string &table_name)GetTable(table_oid_t table_oid),第一个方法用于创建一个新的表,后面两个方法用于获取表:

复制class SimpleCatalog {
 public:
  SimpleCatalog(BufferPoolManager *bpm, LockManager *lock_manager, LogManager *log_manager)
      : bpm_{bpm}, lock_manager_{lock_manager}, log_manager_{log_manager} {}

  /**
   * Create a new table and return its metadata.
   * @param txn the transaction in which the table is being created
   * @param table_name the name of the new table
   * @param schema the schema of the new table
   * @return a pointer to the metadata of the new table
   */
  TableMetadata *CreateTable(Transaction *txn, const std::string &table_name, const Schema &schema) {
    BUSTUB_ASSERT(names_.count(table_name) == 0, "Table names should be unique!");
    table_oid_t oid = next_table_oid_++;

    auto table = std::make_unique<TableHeap>(bpm_, lock_manager_, log_manager_, txn);
    tables_[oid] = std::make_unique<TableMetadata>(schema, table_name, std::move(table), oid);
    names_[table_name] = oid;

    return tables_[oid].get();
  }

  /** @return table metadata by name */
  TableMetadata *GetTable(const std::string &table_name) {
    auto it = names_.find(table_name);
    if (it == names_.end()) {
      throw std::out_of_range("The table name doesn't exist.");
    }

    return GetTable(it->second);
  }

  /** @return table metadata by oid */
  TableMetadata *GetTable(table_oid_t table_oid) {
    auto it = tables_.find(table_oid);
    if (it == tables_.end()) {
      throw std::out_of_range("The table oid doesn't exist.");
    }

    return it->second.get();
  }

 private:
  [[maybe_unused]] BufferPoolManager *bpm_;
  [[maybe_unused]] LockManager *lock_manager_;
  [[maybe_unused]] LogManager *log_manager_;

  /** tables_ : table identifiers -> table metadata. Note that tables_ owns all table metadata. */
  std::unordered_map<table_oid_t, std::unique_ptr<TableMetadata>> tables_;
  /** names_ : table names -> table identifiers */
  std::unordered_map<std::string, table_oid_t> names_;
  /** The next table identifier to be used. */
  std::atomic<table_oid_t> next_table_oid_{0};
};

测试结果如下:

2065884-20220711100635332-333426595.png

执行器用于执行查询计划,该实验要求我们实现下述四种执行器:

  • SeqScanExecutor:顺序扫描执行器,遍历表并返回符合查询条件的 tuple,比如 SELECT * FROM user where id=1 通过该执行器获取查询结果
  • InsertExecutor:插入执行器,向表格中插入任意数量的 tuple,比如 INSERT INTO user VALUES (1, 2), (2, 3)
  • HashJoinExecutor:哈希连接执行器,用于内连接查询操作,比如 SELECT u.id, c.class FROM u JOIN c ON u.id = c.uid
  • AggregationExecutor:聚合执行器,用于执行聚合操作,比如 SELECT MIN(grade), MAX(grade) from user

每个执行器都继承自抽象类 AbstractExecutor ,有两个纯虚函数 Init()Next(Tuple *tuple) 需要实现,其中 Init() 用于初始化执行器,比如需要在 HashJoinExecutorInit() 中对 left table(outer table) 创建哈希表。AbstractExecutor 还有一个 ExecutorContext 成员,包含一些查询的元数据,比如 BufferPoolManager 和上个任务实现的 SimpleCatalog

复制class AbstractExecutor {
 public:
  /**
   * Constructs a new AbstractExecutor.
   * @param exec_ctx the executor context that the executor runs with
   */
  explicit AbstractExecutor(ExecutorContext *exec_ctx) : exec_ctx_{exec_ctx} {}

  /** Virtual destructor. */
  virtual ~AbstractExecutor() = default;

  /**
   * Initializes this executor.
   * @warning This function must be called before Next() is called!
   */
  virtual void Init() = 0;

  /**
   * Produces the next tuple from this executor.
   * @param[out] tuple the next tuple produced by this executor
   * @return true if a tuple was produced, false if there are no more tuples
   */
  virtual bool Next(Tuple *tuple) = 0;

  /** @return the schema of the tuples that this executor produces */
  virtual const Schema *GetOutputSchema() = 0;

  /** @return the executor context in which this executor runs */
  ExecutorContext *GetExecutorContext() { return exec_ctx_; }

 protected:
  ExecutorContext *exec_ctx_;
};

执行器内部会有一个代表执行计划的 AbstractPlanNode 的子类数据成员,而这些子类内部又会有一个 AbstractExpression 的子类数据成员用于判断查询条件是否成立等操作。

提供的代码中为我们实现了一个 TableIterator 类,用于迭代 TableHeap,我们只要在 Next 函数中判断迭代器所指的 tuple 是否满足查询条件并递增迭代器,如果满足条件就返回该 tuple,不满足就接着迭代:

复制class SeqScanExecutor : public AbstractExecutor {
 public:
  /**
   * Creates a new sequential scan executor.
   * @param exec_ctx the executor context
   * @param plan the sequential scan plan to be executed
   */
  SeqScanExecutor(ExecutorContext *exec_ctx, const SeqScanPlanNode *plan);

  void Init() override;

  bool Next(Tuple *tuple) override;

  const Schema *GetOutputSchema() override { return plan_->OutputSchema(); }

 private:
  /** The sequential scan plan node to be executed. */
  const SeqScanPlanNode *plan_;
  TableMetadata *table_metadata_;
  TableIterator table_iterator_;
};

实现代码如下:

复制SeqScanExecutor::SeqScanExecutor(ExecutorContext *exec_ctx, const SeqScanPlanNode *plan)
    : AbstractExecutor(exec_ctx),
      plan_(plan),
      table_metadata_(exec_ctx->GetCatalog()->GetTable(plan->GetTableOid())),
      table_iterator_(table_metadata_->table_->Begin(exec_ctx->GetTransaction())) {}

void SeqScanExecutor::Init() {}

bool SeqScanExecutor::Next(Tuple *tuple) {
  auto predicate = plan_->GetPredicate();
  while (table_iterator_ != table_metadata_->table_->End()) {
    *tuple = *table_iterator_++;
    if (!predicate || predicate->Evaluate(tuple, &table_metadata_->schema_).GetAs<bool>()) {
      return true;
    }
  }

  return false;
}

插入操作分为两种:

  • raw inserts:插入数据直接来自插入执行器本身,比如 INSERT INTO tbl_user VALUES (1, 15), (2, 16)
  • not-raw inserts:插入的数据来自子执行器,比如 INSERT INTO tbl_user1 SELECT * FROM tbl_user2

可以根据插入计划的 IsRawInsert() 判断插入操作的类型,这个函数根据子查询器列表是否为空进行判断:

复制/** @return true if we embed insert values directly into the plan, false if we have a child plan providing tuples */
bool IsRawInsert() const { return GetChildren().empty(); }

如果是 raw inserts,我们直接根据插入执行器中的数据构造 tuple 并插入表中,否则调用子执行器的 Next 函数获取数据并插入表中:

复制class InsertExecutor : public AbstractExecutor {
 public:
  /**
   * Creates a new insert executor.
   * @param exec_ctx the executor context
   * @param plan the insert plan to be executed
   * @param child_executor the child executor to obtain insert values from, can be nullptr
   */
  InsertExecutor(ExecutorContext *exec_ctx, const InsertPlanNode *plan,
                 std::unique_ptr<AbstractExecutor> &&child_executor);

  const Schema *GetOutputSchema() override;

  void Init() override;

  // Note that Insert does not make use of the tuple pointer being passed in.
  // We return false if the insert failed for any reason, and return true if all inserts succeeded.
  bool Next([[maybe_unused]] Tuple *tuple) override;

 private:
  /** The insert plan node to be executed. */
  const InsertPlanNode *plan_;
  std::unique_ptr<AbstractExecutor> child_executor_;
  TableMetadata *table_metadata_;
};

实现代码为:

复制InsertExecutor::InsertExecutor(ExecutorContext *exec_ctx, const InsertPlanNode *plan,
                               std::unique_ptr<AbstractExecutor> &&child_executor)
    : AbstractExecutor(exec_ctx),
      plan_(plan),
      child_executor_(std::move(child_executor)),
      table_metadata_(exec_ctx->GetCatalog()->GetTable(plan->TableOid())) {}

const Schema *InsertExecutor::GetOutputSchema() { return plan_->OutputSchema(); }

void InsertExecutor::Init() {}

bool InsertExecutor::Next([[maybe_unused]] Tuple *tuple) {
  RID rid;

  if (plan_->IsRawInsert()) {
    for (const auto &values : plan_->RawValues()) {
      Tuple tuple(values, &table_metadata_->schema_);
      if (!table_metadata_->table_->InsertTuple(tuple, &rid, exec_ctx_->GetTransaction())) {
        return false;
      };
    }
  } else {
    Tuple tuple;
    while (child_executor_->Next(&tuple)) {
      if (!table_metadata_->table_->InsertTuple(tuple, &rid, exec_ctx_->GetTransaction())) {
        return false;
      };
    }
  }

  return true;
}

哈希连接执行器使用的是最基本的哈希连接算法,没有使用布隆过滤器等优化措施。该算法分为两个阶段:

  1. 将 left table 的 join 语句中各个条件所在列的值作为键,tuple 或者 row id 作为值构造哈希表,这一步允许将相同哈希值的 tuple 插入哈希表
  2. 对 right table 的 join 语句中各个条件所在列的值作为键,在哈希表中进行查询获取所以系统哈希值的 left table 中的 tuple,再使用 join 条件进行精确匹配

2065884-20220711110228413-1582143010.png

对 tuple 进行哈希的函数为:

复制/**
 * Hashes a tuple by evaluating it against every expression on the given schema, combining all non-null hashes.
 * @param tuple tuple to be hashed
 * @param schema schema to evaluate the tuple on
 * @param exprs expressions to evaluate the tuple with
 * @return the hashed tuple
 */
hash_t HashJoinExecutor::HashValues(const Tuple *tuple, const Schema *schema, const std::vector<const AbstractExpression *> &exprs) {
  hash_t curr_hash = 0;
  // For every expression,
  for (const auto &expr : exprs) {
    // We evaluate the tuple on the expression and schema.
    Value val = expr->Evaluate(tuple, schema);
    // If this produces a value,
    if (!val.IsNull()) {
      // We combine the hash of that value into our current hash.
      curr_hash = HashUtil::CombineHashes(curr_hash, HashUtil::HashValue(&val));
    }
  }
  return curr_hash;
}

为了方便我们的测试,实验提供了一个简易的哈希表 SimpleHashJoinHashTable 用于插入 (hash, tuple) 键值对,该哈希表直接整个放入内存中,如果 tuple 很多,内存会放不下这个哈希表,所以任务三会替换为上一个实验中实现的 LinearProbeHashTable

复制using HT = SimpleHashJoinHashTable;

class HashJoinExecutor : public AbstractExecutor {
 public:
  /**
   * Creates a new hash join executor.
   * @param exec_ctx the context that the hash join should be performed in
   * @param plan the hash join plan node
   * @param left the left child, used by convention to build the hash table
   * @param right the right child, used by convention to probe the hash table
   */
  HashJoinExecutor(ExecutorContext *exec_ctx, const HashJoinPlanNode *plan, std::unique_ptr<AbstractExecutor> &&left,
                   std::unique_ptr<AbstractExecutor> &&right);

  /** @return the JHT in use. Do not modify this function, otherwise you will get a zero. */
  const HT *GetJHT() const { return &jht_; }

  const Schema *GetOutputSchema() override { return plan_->OutputSchema(); }

  void Init() override;

  bool Next(Tuple *tuple) override;

  hash_t HashValues(const Tuple *tuple, const Schema *schema, const std::vector<const AbstractExpression *> &exprs) { // 省略 }

 private:
  /** The hash join plan node. */
  const HashJoinPlanNode *plan_;
  std::unique_ptr<AbstractExecutor> left_executor_;
  std::unique_ptr<AbstractExecutor> right_executor_;

  /** The comparator is used to compare hashes. */
  [[maybe_unused]] HashComparator jht_comp_{};
  /** The identity hash function. */
  IdentityHashFunction jht_hash_fn_{};

  /** The hash table that we are using. */
  HT jht_;
  /** The number of buckets in the hash table. */
  static constexpr uint32_t jht_num_buckets_ = 2;
};

根据上述的算法过程可以得到实现代码为:

复制HashJoinExecutor::HashJoinExecutor(ExecutorContext *exec_ctx, const HashJoinPlanNode *plan,
                                   std::unique_ptr<AbstractExecutor> &&left, std::unique_ptr<AbstractExecutor> &&right)
    : AbstractExecutor(exec_ctx),
      plan_(plan),
      left_executor_(std::move(left)),
      right_executor_(std::move(right)),
      jht_("join hash table", exec_ctx->GetBufferPoolManager(), jht_comp_, jht_num_buckets_, jht_hash_fn_) {}

void HashJoinExecutor::Init() {
  left_executor_->Init();
  right_executor_->Init();

  // create hash table for left child
  Tuple tuple;
  while (left_executor_->Next(&tuple)) {
    auto h = HashValues(&tuple, left_executor_->GetOutputSchema(), plan_->GetLeftKeys());
    jht_.Insert(exec_ctx_->GetTransaction(), h, tuple);
  }
}

bool HashJoinExecutor::Next(Tuple *tuple) {
  auto predicate = plan_->Predicate();
  auto left_schema = left_executor_->GetOutputSchema();
  auto right_schema = right_executor_->GetOutputSchema();
  auto out_schema = GetOutputSchema();
  Tuple right_tuple;

  while (right_executor_->Next(&right_tuple)) {
    // get all tuples with the same hash values in left child
    auto h = HashValues(&right_tuple, right_executor_->GetOutputSchema(), plan_->GetRightKeys());
    std::vector<Tuple> left_tuples;
    jht_.GetValue(exec_ctx_->GetTransaction(), h, &left_tuples);

    // get the exact matching left tuple
    for (auto &left_tuple : left_tuples) {
      if (!predicate || predicate->EvaluateJoin(&left_tuple, left_schema, &right_tuple, right_schema).GetAs<bool>()) {
        // create output tuple
        std::vector<Value> values;
        for (uint32_t i = 0; i < out_schema->GetColumnCount(); ++i) {
          auto expr = out_schema->GetColumn(i).GetExpr();
          values.push_back(expr->EvaluateJoin(&left_tuple, left_schema, &right_tuple, right_schema));
        }

        *tuple = Tuple(values, out_schema);
        return true;
      }
    }
  }

  return false;
}

聚合执行器内部维护了一个哈希表 SimpleAggregationHashTable 以及哈希表迭代器 aht_iterator_,将键值对插入哈希表的时候会立刻更新聚合结果,最终的查询结果也从该哈希表获取:

复制AggregationExecutor::AggregationExecutor(ExecutorContext *exec_ctx, const AggregationPlanNode *plan,
                                         std::unique_ptr<AbstractExecutor> &&child)
    : AbstractExecutor(exec_ctx),
      plan_(plan),
      child_(std::move(child)),
      aht_(plan->GetAggregates(), plan->GetAggregateTypes()),
      aht_iterator_(aht_.Begin()) {}

const AbstractExecutor *AggregationExecutor::GetChildExecutor() const { return child_.get(); }

const Schema *AggregationExecutor::GetOutputSchema() { return plan_->OutputSchema(); }

void AggregationExecutor::Init() {
  child_->Init();

  // initialize aggregation hash table
  Tuple tuple;
  while (child_->Next(&tuple)) {
    aht_.InsertCombine(MakeKey(&tuple), MakeVal(&tuple));
  }

  aht_iterator_ = aht_.Begin();
}

bool AggregationExecutor::Next(Tuple *tuple) {
  auto having = plan_->GetHaving();
  auto out_schema = GetOutputSchema();

  while (aht_iterator_ != aht_.End()) {
    auto group_bys = aht_iterator_.Key().group_bys_;
    auto aggregates = aht_iterator_.Val().aggregates_;

    if (!having || having->EvaluateAggregate(group_bys, aggregates).GetAs<bool>()) {
      std::vector<Value> values;
      for (uint32_t i = 0; i < out_schema->GetColumnCount(); ++i) {
        auto expr = out_schema->GetColumn(i).GetExpr();
        values.push_back(expr->EvaluateAggregate(group_bys, aggregates));
      }

      *tuple = Tuple(values, out_schema);
      ++aht_iterator_;
      return true;
    }

    ++aht_iterator_;
  }

  return false;
}

测试结果如下图所示,成功通过所有测试用例:

2065884-20220711130743654-664446826.png

线性探测哈希表

这个任务要求将哈希连接中的 SimpleHashJoinHashTable 更换成 LinearProbeHashTable,这样就能在磁盘中保存 left table 的哈希表。实验还提示我们可以实现 TmpTuplePage,用于保存 left table 的 tuple,其实我们完全可以用代码中写好的 TablePage 来实现该目的,但是 TmpTuplePage 结构更为精简,可以搭配 Tuple::DeserializeFrom 食用,通过实现 TmpTuplePage,我们也能加深对 tuple 存储方式的理解。

TmpTuplePage 的格式如下所示:

复制 ---------------------------------------------------------------------------------------------------------
| PageId (4) | LSN (4) | FreeSpace (4) | (free space) | TupleSize2 | TupleData2 | TupleSize1 | TupleData1 |
 ---------------------------------------------------------------------------------------------------------
 \-----------------V------------------/               ^
                 header                               free space pointer

前 12 个字节是 header,记录了 page id、lsn 和 free space pointer,此处的 free space pointer 是相对 page id 的地址而言的。如果表中一个 tuple 都没有,且表大小为 PAGE_SIZE,那么 free space pointer 的值就是 PAGE_SIZE。tuple 从末尾开始插入,每个 tuple 后面跟着 tuple 的大小(占用 4 字节),也就是说插入一个 tuple 占用的空间大小为 tuple.size_ + 4

理解上述内容后,实现 TmpTupleHeader 就很简单了,模仿 TablePage 的写法即可(需要将 TmpTuplePage 声明为 Tuple 的友元):

复制class TmpTuplePage : public Page {
 public:
  void Init(page_id_t page_id, uint32_t page_size) {
    memcpy(GetData(), &page_id, sizeof(page_id));
    SetFreeSpacePointer(page_size);
  }

  /** @return the page ID of this temp table page */
  page_id_t GetTablePageId() { return *reinterpret_cast<page_id_t *>(GetData()); }

  bool Insert(const Tuple &tuple, TmpTuple *out) {
    // determine whether there is enough space to insert tuple
    if (GetFreeSpaceRemaining() < tuple.size_ + SIZE_TUPLE) {
      return false;
    }

    // insert tuple and its size
    SetFreeSpacePointer(GetFreeSpacePointer() - tuple.size_);
    memcpy(GetData() + GetFreeSpacePointer(), tuple.data_, tuple.size_);
    SetFreeSpacePointer(GetFreeSpacePointer() - SIZE_TUPLE);
    memcpy(GetData() + GetFreeSpacePointer(), &tuple.size_, SIZE_TUPLE);
    out->SetPageId(GetPageId());
    out->SetOffset(GetFreeSpacePointer());
    return true;
  }

 private:
  static_assert(sizeof(page_id_t) == 4);

  static constexpr size_t SIZE_TABLE_PAGE_HEADER = 12;
  static constexpr size_t SIZE_TUPLE = 4;
  static constexpr size_t OFFSET_FREE_SPACE = 8;

  /** @return pointer to the end of the current free space, see header comment */
  uint32_t GetFreeSpacePointer() { return *reinterpret_cast<uint32_t *>(GetData() + OFFSET_FREE_SPACE); }

  /** set the pointer of the end of current free space.
   * @param free_space_ptr the pointer relative to data_
   */
  void SetFreeSpacePointer(uint32_t free_space_ptr) {
    memcpy(GetData() + OFFSET_FREE_SPACE, &free_space_ptr, sizeof(uint32_t));
  }

  /** @return the size of free space */
  uint32_t GetFreeSpaceRemaining() { return GetFreeSpacePointer() - SIZE_TABLE_PAGE_HEADER; }
};

Insert 函数中更新了 TmpTuple 的参数,我们会将 TmpTuple 作为 left table 哈希表的值,而 tuple 放在 TmpTuplePage 中,根据 TmpTuple 中保存的 offset 获取 tuple:

复制class TmpTuple {
 public:
  TmpTuple(page_id_t page_id, size_t offset) : page_id_(page_id), offset_(offset) {}

  inline bool operator==(const TmpTuple &rhs) const { return page_id_ == rhs.page_id_ && offset_ == rhs.offset_; }

  page_id_t GetPageId() const { return page_id_; }
  size_t GetOffset() const { return offset_; }
  void SetPageId(page_id_t page_id) { page_id_ = page_id; }
  void SetOffset(size_t offset) { offset_ = offset; }

 private:
  page_id_t page_id_;
  size_t offset_;
};

接着需要将哈希表更换为 LinearProbeHashTable,在 linear_probe_hash_table.cpp 中需要进行模板特例化:

复制template class LinearProbeHashTable<hash_t, TmpTuple, HashComparator>;

还要对 HashTableBlockPage 进行模板特例化:

复制template class HashTableBlockPage<hash_t, TmpTuple, HashComparator>;

接着更改 HT

复制using HashJoinKeyType = hash_t;
using HashJoinValType = TmpTuple;
using HT = LinearProbeHashTable<HashJoinKeyType, HashJoinValType, HashComparator>;

由于 tuple 可能很多,将 jht_num_buckets_ 设置为 1000 可以减少调整大小的次数,最后是实现代码:

复制void HashJoinExecutor::Init() {
  left_executor_->Init();
  right_executor_->Init();

  // create temp tuple page
  auto buffer_pool_manager = exec_ctx_->GetBufferPoolManager();
  page_id_t tmp_page_id;
  auto tmp_page = reinterpret_cast<TmpTuplePage *>(buffer_pool_manager->NewPage(&tmp_page_id)->GetData());
  tmp_page->Init(tmp_page_id, PAGE_SIZE);

  // create hash table for left child
  Tuple tuple;
  TmpTuple tmp_tuple(tmp_page_id, 0);
  while (left_executor_->Next(&tuple)) {
    auto h = HashValues(&tuple, left_executor_->GetOutputSchema(), plan_->GetLeftKeys());

    // insert tuple to page, creata a new temp tuple page if page if full
    if (!tmp_page->Insert(tuple, &tmp_tuple)) {
      buffer_pool_manager->UnpinPage(tmp_page_id, true);
      tmp_page = reinterpret_cast<TmpTuplePage *>(buffer_pool_manager->NewPage(&tmp_page_id)->GetData());
      tmp_page->Init(tmp_page_id, PAGE_SIZE);

      // try inserting tuple to page again
      tmp_page->Insert(tuple, &tmp_tuple);
    }

    jht_.Insert(exec_ctx_->GetTransaction(), h, tmp_tuple);
  }

  buffer_pool_manager->UnpinPage(tmp_page_id, true);
}

bool HashJoinExecutor::Next(Tuple *tuple) {
  auto buffer_pool_manager = exec_ctx_->GetBufferPoolManager();
  auto left_schema = left_executor_->GetOutputSchema();
  auto right_schema = right_executor_->GetOutputSchema();
  auto predicate = plan_->Predicate();
  auto out_schema = GetOutputSchema();
  Tuple right_tuple;

  while (right_executor_->Next(&right_tuple)) {
    // get all tuples with the same hash values in left child
    auto h = HashValues(&right_tuple, right_executor_->GetOutputSchema(), plan_->GetRightKeys());
    std::vector<TmpTuple> tmp_tuples;
    jht_.GetValue(exec_ctx_->GetTransaction(), h, &tmp_tuples);

    // get the exact matching left tuple
    for (auto &tmp_tuple : tmp_tuples) {
      // convert tmp tuple to left tuple
      auto page_id = tmp_tuple.GetPageId();
      auto tmp_page = buffer_pool_manager->FetchPage(page_id);
      Tuple left_tuple;
      left_tuple.DeserializeFrom(tmp_page->GetData() + tmp_tuple.GetOffset());
      buffer_pool_manager->UnpinPage(page_id, false);

      if (!predicate || predicate->EvaluateJoin(&left_tuple, left_schema, &right_tuple, right_schema).GetAs<bool>()) {
        // create output tuple
        std::vector<Value> values;
        for (uint32_t i = 0; i < out_schema->GetColumnCount(); ++i) {
          auto expr = out_schema->GetColumn(i).GetExpr();
          values.push_back(expr->EvaluateJoin(&left_tuple, left_schema, &right_tuple, right_schema));
        }

        *tuple = Tuple(values, out_schema);
        return true;
      }
    }
  }

  return false;
}

测试结果如下:

2065884-20220711134616186-463516478.png

通过这次实验,可以加深对目录、查询计划、迭代模型和 tuple 页布局的理解,算是收获满满的一次实验了,以上~~


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK