1. 前言

   本文讨论 InnoDB 的 Purge 子系统的代码实现,建议先阅读 Undo 系统的介绍。

2. Purge 系统

   InnoDB 控制 purge 操作的结构体是 trx_purge_t,其中主要维护了需要被 purge 的回滚段、purge view、purge 状态位置等。全局 trx_purge_t 结构会在 innodb 启动时的trx_sys_init_at_db_start函数通过扫描所有rollback segment 来初始化设定,其内容如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
struct trx_purge_t {
  sess_t *sess; // purge的系统session
  trx_t *trx; // purge的系统trx,不在 trx list

  rw_lock_t latch; // purge view,state的保护锁
  os_event_t event; // state的信号

  ulint n_stop; // 停止追踪器

  volatile bool running; // 是否在运行
  volatile purge_state_t state; // Coordinator状态:INIT、RUN、STOP、EXIT、DISABLED

  que_t *query; // 运行purge用的query graph

  ReadView view; // purge view,大于或出现在这个view的undo不会被purge
  bool view_active; // purge view是否有效

  volatile ulint n_submitted; // 提交的purge任务数目
  std::atomic<ulint> n_completed; // 完成的purge任务数目

  /* 追踪purge的位置,用于history list truncation */
  purge_iter_t iter; // 已经read和parsed的UNDO log位置,一定比limit更新
  purge_iter_t limit; // 已经purge(或已经分配马上要purge)的UNDO log位置

  bool next_stored; // 标记要purge的下一个undo是否存在下面这些变量中
  trx_rseg_t *rseg; // 下一个purge的回滚段
  page_no_t page_no; // 下一个purge的undo的page no
  ulint offset; // 下一个purge的undo的page in-page-offset
  page_no_t hdr_page_no; // 下一个purge的undo的header page
  ulint hdr_offset; // // 下一个purge的undo的header page in-page-offset

  TrxUndoRsegsIterator *rseg_iter; // 用于获取下一个purge的回滚段

  purge_pq_t *purge_queue; // 按trx_no排序的要被purge的(update)回滚段,内存
  PQMutex pq_mutex;

  undo::Truncate undo_trunc; // 标记要truncate的undospace

  mem_heap_t *heap;

  std::vector<trx_rseg_t *> rsegs_queue; // 存储所有的回滚段
};

3. Purge 主流程代码

   在ddl恢复完成(innobase_post_recover),保证 tablespaces 相关元信息状态一致后,系统会启动 srv_purge_coordinator_thread 和 srv_worker_thread 来进行 undo purge。srv_purge_coordinator_thread 是主要控制 purge 流程的任务线程,在运行期间循环调用 srv_do_purge 去尽可能 purge 所有 undo。在srv_do_purge中每次 purge 一批 undo 会根据系统状态自适应调整 purge 系统所使用的线程数目。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
void srv_purge_coordinator_thread() {
  // 初始化环境,something...

  do { // 常态循环
    /*
      如果上一次purge没有清理东西,则suspend暂停等待信号,避免空转;
      另外,外部操作像 FLUSH TABLES FOR EXPORT 会静默tablespace等的也会暂停purge
    */
    if (srv_shutdown_state == SRV_SHUTDOWN_NONE &&
        (purge_sys->state == PURGE_STATE_STOP || n_total_purged == 0)) {
      srv_purge_coordinator_suspend(slot, rseg_history_len);
    }

    if (srv_purge_should_exit(n_total_purged)) break;

    n_total_purged = 0;
    rseg_history_len = srv_do_purge(&n_total_purged); // 自己做purge
  } while (!srv_purge_should_exit(n_total_purged));

  // 退出清理undo
  /* 如果不是fast shutdown,确保所有记录被purge,退出阶段也可能有加入undo记录,清理所有后台线程参数的undo */
  ulint n_pages_purged = ULINT_MAX;
  while (srv_fast_shutdown == 0 && n_pages_purged > 0) {
    n_pages_purged = trx_purge(1, srv_purge_batch_size, false);
  }
  n_pages_purged =
      trx_purge(1, ut_min(srv_purge_batch_size, 20), true);
  ut_a(n_pages_purged == 0 || srv_fast_shutdown != 0);

  // 清理环境,something...
}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
static ulint srv_do_purge(ulint *n_total_purged) {
  // 初始化环境,something...

  do { // 常态循环尽可能purge
    // S1. 通过当前系统和history状态,调整purge threads数目:...

    // S2. 判断是否需要进行undo truncate:...

    // S3. 调用trx_purge实际做purge
    n_pages_purged =
        trx_purge(n_use_threads, srv_purge_batch_size, do_truncate);
    *n_total_purged += n_pages_purged;

    // S4. 判断是否有需要truncate的undo space以再次进入:...
  } while (purge_sys->state == PURGE_STATE_RUN &&
           (n_pages_purged > 0 || need_explicit_truncate) &&
           !srv_purge_should_exit(n_pages_purged));

  return rseg_history_len; // 上一批purge前的history长度
}

   Purge coordinator 的实际 purge 任务是在 trx_purge 中进行分配和进行的, coordinator 会将 undo record 分配给 srv_sys->tasks 中对应数目的 query thread,Purge worker 直接匹配系统环境的 query thread 拿 query thread node(purge_node_t类型)进行执行。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
ulint trx_purge(ulint n_purge_threads,
                ulint batch_size,
                bool truncate) {
  // S0. 初始化,something...

  /******************************************************
    S1. 获取(clone)最老 read view 用于为 MVCC 限制 purge 的位置
  *******************************************************/
  rw_lock_x_lock(&purge_sys->latch, UT_LOCATION_HERE);
  purge_sys->view_active = false;
  trx_sys->mvcc->clone_oldest_view(&purge_sys->view);
  purge_sys->view_active = true;
  rw_lock_x_unlock(&purge_sys->latch);

  /******************************************************
    S2. 给 purge_sys 每个 query thread 的执行节点(purge_node_t)分配 undo recs
  *******************************************************/
  /*
    S2.0. 在 trx_commit 时候将 update undo 写入 purge_queue,(insert undo 直接更新成 cache 或者 free);
    S2.1. purge_sys 维护了 next undo rec 的位置(purge_sys->iter),这里从这一位置开始获取batch_size大小的 undo rec
    S2.2. 将获取的 undo rec 分配给不同的work thread,这里官方先按table ID进行分配,再进行平衡尽可能保证各 worker 的 undo 数目均匀
      (查看提交 Bug #32089028 CONCURRENTLY UPDATING MANY JSON DOCUMENTS STEADILY INCREASES IBD FILE SIZE)
    S2.3. 将 undo rec 真正分配挂到 query thread 的执行节点 purge_node_t 上
  */
  n_pages_handled = trx_purge_attach_undo_recs(n_purge_threads, batch_size);

  /******************************************************
    S3. 启动并进行 purge 任务
  *******************************************************/
  // S3.1. 启动所有query thread
  if (n_purge_threads > 1) {
    for (ulint i = 0; i < n_purge_threads - 1; ++i) {
      thr = que_fork_scheduler_round_robin(purge_sys->query, thr);
      // 向后台 srv_sys->tasks 提交任务,以供 purger worker 执行
      srv_que_task_enqueue_low(thr);
    }
    purge_sys->n_submitted += n_purge_threads - 1;

    thr = que_fork_scheduler_round_robin(purge_sys->query, thr);
  } else {
    thr = que_fork_scheduler_round_robin(purge_sys->query, nullptr);
  }
  ++purge_sys->n_submitted;
  // S3.2. 执行purge任务,并等待所有worker完成
  que_run_threads(thr);
  purge_sys->n_completed.fetch_add(1);
  if (n_purge_threads > 1) trx_purge_wait_for_workers_to_complete();

  /******************************************************
    S4. 将所有 blob's first page 延迟到末尾统一释放,避免访问 freed page
  *******************************************************/
  for (thr = UT_LIST_GET_FIRST(purge_sys->query->thrs); thr != nullptr;
       thr = UT_LIST_GET_NEXT(thrs, thr)) {
    purge_node_t *node = static_cast<purge_node_t *>(thr->child);
    node->free_lob_pages();
  }

  /******************************************************
    S5. 进行 undospace truncate
  *******************************************************/
  if (truncate || srv_upgrade_old_undo_found) {
    trx_purge_truncate();
  }

  return (n_pages_handled);
}

4. Purge 物理操作

   在row_purge中会对对应 query thread 的 run_node(purge_node_t)中取的 undo record 记录进行 purge,直到 purge 完所有分配的 undo record:

  • row_purge_parse_undo_rec:

  • 首先逻辑类似 rollback 操作的的第一步,解析 undo record 获取 undo type、trxid、tableid 等操作信息(trx_undo_rec_get_parstrx_undo_update_rec_get_sys_cols

  • 通过 tableid 开启 innodb 表并获取 Shared MDL 锁(dd_table_open_on_id

  • 构建 目标主键的 row reference (dtuple_t) ,upd_t,row (dtuple_t)等信息(trx_undo_rec_get_row_reftrx_undo_update_rec_get_update

  • row_purge_record:

  • 对 delete mark (TRX_UNDO_DEL_MARK_REC) 类型的 undo,操作 purge 掉所有二级索 row_purge_remove_sec_if_poss 和主键 row_purge_remove_clust_if_poss 上不再需要的 index record(包括 extern field),这里也是先根据 row reference 来索引 search 到 BTree 到对应的 cursor 上,然后删除走的是 btr_cur 的 delete 接口 btr_cur_optimistic_deletebtr_cur_pessimistic_delete

  • 对于 update existing (TRX_UNDO_UPD_EXIST_REC) 类型的 undo,如果二级索引有修改,对其会通过 row_purge_remove_sec_if_poss 删除对应老版本的二级索引记录,然后走 lob::purge 删除 externally fields;主键记录已被修改为最新,不需要做处理。

5. Purge truncate

   前一个阶段 purge 物理操作只是将索引上的数据删除,但是不会处理 undospace 内的空间。虽然 undo log 可以被 purge,但是类似 ibd 文件(不主动optimize)一旦文件增大那么就无法缩小。Innodb 在一个 undo 表空间没有事务使用时,允许将其 truncate 来回收 undo 表空间。回收动作在 coordinator 一批 purge 任务完成后触发,接口为trx_purge_truncate,其内部主要分为 purge rollback segment 和 truncate space 两个部分。

  • purge rollback segment(trx_purge_truncate_history)会从所有表空间的所有回滚段回收无用的 undo 数据并清理 history list;
  • truncate space(trx_purge_truncate_undo_spaces)会检索是否存在满足truncate要求的 undospace(1. 手动设置为 inactive 或大小超过限制;2. 空间中 undo 记录被 purge 完全且没有被任何事务使用),并进行物理文件 truncate 操作。
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
/** 清理某个 rollback segment */
static void trx_purge_truncate_rseg_history(
    trx_rseg_t *rseg,          /*!< in: rollback segment */
    const purge_iter_t *limit) /*!< in: truncate offset */ {
  // 变量初始化,something...

  // 从当前 rollback segment 的 history list 上面获取最后位置的 undo log header
  rseg->latch();
  rseg_hdr = trx_rsegf_get(rseg->space_id, rseg->page_no, rseg->page_size, &mtr);
  hdr_addr = trx_purge_get_log_from_hist(flst_get_last(rseg_hdr + TRX_RSEG_HISTORY, &mtr));

loop:
  if (hdr_addr.page == FIL_NULL) {
    // history 不剩下需要处理的 undo
    rseg->unlatch();
    mtr_commit(&mtr);
    return;
  }

  undo_page = trx_undo_page_get(page_id_t(rseg->space_id, hdr_addr.page),
                                rseg->page_size, &mtr);
  log_hdr = undo_page + hdr_addr.boffset;
  seg_hdr = undo_page + TRX_UNDO_SEG_HDR;

  undo_trx_no = mach_read_from_8(log_hdr + TRX_UNDO_TRX_NO);
  if (undo_trx_no >= limit->trx_no) {
    // 当前要处理的 undo 已经到达、超过 purge limit
    if (undo_trx_no == limit->trx_no &&
        rseg->space_id == limit->undo_rseg_space) {
      // 将所有小于 limit 的 undo normal page free,header page empty
      trx_undo_truncate_start(rseg, hdr_addr.page, hdr_addr.boffset,
                              limit->undo_no);
    }
    rseg->unlatch();
    mtr_commit(&mtr);
    return;
  }

  prev_hdr_addr = trx_purge_get_log_from_hist(flst_get_prev_addr(log_hdr + TRX_UNDO_HISTORY_NODE, &mtr));

  if ((mach_read_from_2(seg_hdr + TRX_UNDO_STATE) == TRX_UNDO_TO_PURGE) &&
      (mach_read_from_2(log_hdr + TRX_UNDO_NEXT_LOG) == 0)) {
    // 无log剩余,回收整个 undo segment
    rseg->unlatch();
    mtr_commit(&mtr);
    trx_purge_free_segment(rseg, hdr_addr, is_temp); // 内部有trx_purge_remove_log_hdr删除history list节点
  } else {
    // 在history list上删除当前 log header(这里相当于我那次一条history的purge)
    trx_purge_remove_log_hdr(rseg_hdr, log_hdr, &mtr);
    rseg->unlatch();
    mtr_commit(&mtr);
  }

  // 转移到 history list 上的下一条log
  mtr_start(&mtr);
  if (is_temp) { mtr.set_log_mode(MTR_LOG_NO_REDO); }
  rseg->latch();
  rseg_hdr = trx_rsegf_get(rseg->space_id, rseg->page_no, rseg->page_size, &mtr);
  hdr_addr = prev_hdr_addr;

  goto loop;
}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
// 进行物理文件 truncate
static bool trx_purge_truncate_marked_undo() {
  // S0. 变量初始化,something...

  /******************************************************
    S1. 获取 MDL 锁
  *******************************************************/
  MDL_ticket *mdl_ticket;
  bool dd_result = dd_tablespace_get_mdl(space_name.c_str(), &mdl_ticket, false);
  // something...

  /******************************************************
    S2. 开始 truncate 操作,切换 undospace
  *******************************************************/
  mutex_enter(&undo::ddl_mutex);
  if (!trx_purge_truncate_marked_undo_low(space_num, space_name)) {
    mutex_exit(&undo::ddl_mutex);
    dd_release_mdl(mdl_ticket);
    MONITOR_INC_TIME(MONITOR_UNDO_TRUNCATE_MICROSECOND, counter_time_truncate);
    return (false);
  }

  /******************************************************
    S3. 删除 undo log file,标志 undo truncate 完成
  *******************************************************/
  undo::spaces->x_lock();
  undo::done_logging(space_num);
  undo::spaces->x_unlock();

  // S4. 清理环境,something...
  return (true);
}

static bool trx_purge_truncate_marked_undo_low(space_id_t space_num,
                                               std::string space_name) {
  /******************************************************
    S2.1. 获取环境
  *******************************************************/
  // someting...

  /******************************************************
    S2.2. 创建 undo 文件,标志 undo truncate 开始
  *******************************************************/
  dberr_t err = undo::start_logging(marked_space);
  // someting...

  /******************************************************
    S2.3. 过滤特殊条件,someting...
  *******************************************************/

  /******************************************************
    S2.4. 实际 undo space 文件裁剪轮转
  *******************************************************/
  /*
    计算新 undo space id 和 undo space no;
    删除+新建 file tablespace(fil_delete_tablespace + fil_ibd_create);
    重新初始化新 tablespace 文件,构建 undo 文件内容;
    重新设置相应 undo space 回滚段内存结构体(Rsegs *m_rsegs)
  */
  bool success = trx_undo_truncate_tablespace(marked_space);
  // someting...

  /******************************************************
    S2.5. 设定此 undo 空间后面的可用状态
  *******************************************************/
  space_id_t new_space_id = marked_space->id();
  dd_space_states next_state;
  undo::spaces->s_lock();
  Rsegs *marked_rsegs = marked_space->rsegs();
  marked_rsegs->x_lock();
  if (marked_rsegs->is_inactive_explicit()) {
    // 由外部手动 inactive
    next_state = DD_SPACE_STATE_EMPTY;
    marked_rsegs->set_empty();
  } else {
    // 由后台 purge 选择,可被再使用
    next_state = DD_SPACE_STATE_ACTIVE;
    marked_rsegs->set_active();
  }

  /******************************************************
    S2.6. 在 DD 中更新 space ID 和 state 信息
  *******************************************************/
  if (DD_FAILURE == dd_tablespace_set_id_and_state(space_name.c_str(),
                                                   new_space_id, next_state))
    return (false);

  return (true);
}

  • 版权声明:如需转载或引用,请附加本文链接并注明来源。