1. 前言#
本文讨论 InnoDB 的 Purge 子系统的代码实现,建议先阅读 Undo 系统的介绍。
2. Purge 系统#
InnoDB 控制 purge 操作的结构体是 trx_purge_t,其中主要维护了需要被 purge 的回滚段、purge view、purge 状态位置等。全局 trx_purge_t 结构会在 innodb 启动时的trx_sys_init_at_db_start
函数通过扫描所有rollback segment 来初始化设定,其内容如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
|
struct trx_purge_t {
sess_t *sess; // purge的系统session
trx_t *trx; // purge的系统trx,不在 trx list
rw_lock_t latch; // purge view,state的保护锁
os_event_t event; // state的信号
ulint n_stop; // 停止追踪器
volatile bool running; // 是否在运行
volatile purge_state_t state; // Coordinator状态:INIT、RUN、STOP、EXIT、DISABLED
que_t *query; // 运行purge用的query graph
ReadView view; // purge view,大于或出现在这个view的undo不会被purge
bool view_active; // purge view是否有效
volatile ulint n_submitted; // 提交的purge任务数目
std::atomic<ulint> n_completed; // 完成的purge任务数目
/* 追踪purge的位置,用于history list truncation */
purge_iter_t iter; // 已经read和parsed的UNDO log位置,一定比limit更新
purge_iter_t limit; // 已经purge(或已经分配马上要purge)的UNDO log位置
bool next_stored; // 标记要purge的下一个undo是否存在下面这些变量中
trx_rseg_t *rseg; // 下一个purge的回滚段
page_no_t page_no; // 下一个purge的undo的page no
ulint offset; // 下一个purge的undo的page in-page-offset
page_no_t hdr_page_no; // 下一个purge的undo的header page
ulint hdr_offset; // // 下一个purge的undo的header page in-page-offset
TrxUndoRsegsIterator *rseg_iter; // 用于获取下一个purge的回滚段
purge_pq_t *purge_queue; // 按trx_no排序的要被purge的(update)回滚段,内存
PQMutex pq_mutex;
undo::Truncate undo_trunc; // 标记要truncate的undospace
mem_heap_t *heap;
std::vector<trx_rseg_t *> rsegs_queue; // 存储所有的回滚段
};
|
3. Purge 主流程代码#
在ddl恢复完成(innobase_post_recover
),保证 tablespaces 相关元信息状态一致后,系统会启动 srv_purge_coordinator_thread 和 srv_worker_thread 来进行 undo purge。srv_purge_coordinator_thread 是主要控制 purge 流程的任务线程,在运行期间循环调用 srv_do_purge
去尽可能 purge 所有 undo。在srv_do_purge
中每次 purge 一批 undo 会根据系统状态自适应调整 purge 系统所使用的线程数目。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
|
void srv_purge_coordinator_thread() {
// 初始化环境,something...
do { // 常态循环
/*
如果上一次purge没有清理东西,则suspend暂停等待信号,避免空转;
另外,外部操作像 FLUSH TABLES FOR EXPORT 会静默tablespace等的也会暂停purge
*/
if (srv_shutdown_state == SRV_SHUTDOWN_NONE &&
(purge_sys->state == PURGE_STATE_STOP || n_total_purged == 0)) {
srv_purge_coordinator_suspend(slot, rseg_history_len);
}
if (srv_purge_should_exit(n_total_purged)) break;
n_total_purged = 0;
rseg_history_len = srv_do_purge(&n_total_purged); // 自己做purge
} while (!srv_purge_should_exit(n_total_purged));
// 退出清理undo
/* 如果不是fast shutdown,确保所有记录被purge,退出阶段也可能有加入undo记录,清理所有后台线程参数的undo */
ulint n_pages_purged = ULINT_MAX;
while (srv_fast_shutdown == 0 && n_pages_purged > 0) {
n_pages_purged = trx_purge(1, srv_purge_batch_size, false);
}
n_pages_purged =
trx_purge(1, ut_min(srv_purge_batch_size, 20), true);
ut_a(n_pages_purged == 0 || srv_fast_shutdown != 0);
// 清理环境,something...
}
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
|
static ulint srv_do_purge(ulint *n_total_purged) {
// 初始化环境,something...
do { // 常态循环尽可能purge
// S1. 通过当前系统和history状态,调整purge threads数目:...
// S2. 判断是否需要进行undo truncate:...
// S3. 调用trx_purge实际做purge
n_pages_purged =
trx_purge(n_use_threads, srv_purge_batch_size, do_truncate);
*n_total_purged += n_pages_purged;
// S4. 判断是否有需要truncate的undo space以再次进入:...
} while (purge_sys->state == PURGE_STATE_RUN &&
(n_pages_purged > 0 || need_explicit_truncate) &&
!srv_purge_should_exit(n_pages_purged));
return rseg_history_len; // 上一批purge前的history长度
}
|
Purge coordinator 的实际 purge 任务是在 trx_purge
中进行分配和进行的, coordinator 会将 undo record 分配给 srv_sys->tasks 中对应数目的 query thread,Purge worker 直接匹配系统环境的 query thread 拿 query thread node(purge_node_t类型)进行执行。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
|
ulint trx_purge(ulint n_purge_threads,
ulint batch_size,
bool truncate) {
// S0. 初始化,something...
/******************************************************
S1. 获取(clone)最老 read view 用于为 MVCC 限制 purge 的位置
*******************************************************/
rw_lock_x_lock(&purge_sys->latch, UT_LOCATION_HERE);
purge_sys->view_active = false;
trx_sys->mvcc->clone_oldest_view(&purge_sys->view);
purge_sys->view_active = true;
rw_lock_x_unlock(&purge_sys->latch);
/******************************************************
S2. 给 purge_sys 每个 query thread 的执行节点(purge_node_t)分配 undo recs
*******************************************************/
/*
S2.0. 在 trx_commit 时候将 update undo 写入 purge_queue,(insert undo 直接更新成 cache 或者 free);
S2.1. purge_sys 维护了 next undo rec 的位置(purge_sys->iter),这里从这一位置开始获取batch_size大小的 undo rec
S2.2. 将获取的 undo rec 分配给不同的work thread,这里官方先按table ID进行分配,再进行平衡尽可能保证各 worker 的 undo 数目均匀
(查看提交 Bug #32089028 CONCURRENTLY UPDATING MANY JSON DOCUMENTS STEADILY INCREASES IBD FILE SIZE)
S2.3. 将 undo rec 真正分配挂到 query thread 的执行节点 purge_node_t 上
*/
n_pages_handled = trx_purge_attach_undo_recs(n_purge_threads, batch_size);
/******************************************************
S3. 启动并进行 purge 任务
*******************************************************/
// S3.1. 启动所有query thread
if (n_purge_threads > 1) {
for (ulint i = 0; i < n_purge_threads - 1; ++i) {
thr = que_fork_scheduler_round_robin(purge_sys->query, thr);
// 向后台 srv_sys->tasks 提交任务,以供 purger worker 执行
srv_que_task_enqueue_low(thr);
}
purge_sys->n_submitted += n_purge_threads - 1;
thr = que_fork_scheduler_round_robin(purge_sys->query, thr);
} else {
thr = que_fork_scheduler_round_robin(purge_sys->query, nullptr);
}
++purge_sys->n_submitted;
// S3.2. 执行purge任务,并等待所有worker完成
que_run_threads(thr);
purge_sys->n_completed.fetch_add(1);
if (n_purge_threads > 1) trx_purge_wait_for_workers_to_complete();
/******************************************************
S4. 将所有 blob's first page 延迟到末尾统一释放,避免访问 freed page
*******************************************************/
for (thr = UT_LIST_GET_FIRST(purge_sys->query->thrs); thr != nullptr;
thr = UT_LIST_GET_NEXT(thrs, thr)) {
purge_node_t *node = static_cast<purge_node_t *>(thr->child);
node->free_lob_pages();
}
/******************************************************
S5. 进行 undospace truncate
*******************************************************/
if (truncate || srv_upgrade_old_undo_found) {
trx_purge_truncate();
}
return (n_pages_handled);
}
|
4. Purge 物理操作#
在row_purge
中会对对应 query thread 的 run_node(purge_node_t)中取的 undo record 记录进行 purge,直到 purge 完所有分配的 undo record:
-
row_purge_parse_undo_rec:
-
首先逻辑类似 rollback 操作的的第一步,解析 undo record 获取 undo type、trxid、tableid 等操作信息(trx_undo_rec_get_pars
,trx_undo_update_rec_get_sys_cols
)
-
通过 tableid 开启 innodb 表并获取 Shared MDL 锁(dd_table_open_on_id
)
-
构建 目标主键的 row reference (dtuple_t) ,upd_t,row (dtuple_t)等信息(trx_undo_rec_get_row_ref
,trx_undo_update_rec_get_update
)
-
row_purge_record:
-
对 delete mark (TRX_UNDO_DEL_MARK_REC) 类型的 undo,操作 purge 掉所有二级索 row_purge_remove_sec_if_poss
和主键 row_purge_remove_clust_if_poss
上不再需要的 index record(包括 extern field),这里也是先根据 row reference 来索引 search 到 BTree 到对应的 cursor 上,然后删除走的是 btr_cur 的 delete 接口 btr_cur_optimistic_delete
和 btr_cur_pessimistic_delete
;
-
对于 update existing (TRX_UNDO_UPD_EXIST_REC) 类型的 undo,如果二级索引有修改,对其会通过 row_purge_remove_sec_if_poss
删除对应老版本的二级索引记录,然后走 lob::purge 删除 externally fields;主键记录已被修改为最新,不需要做处理。
5. Purge truncate#
前一个阶段 purge 物理操作只是将索引上的数据删除,但是不会处理 undospace 内的空间。虽然 undo log 可以被 purge,但是类似 ibd 文件(不主动optimize)一旦文件增大那么就无法缩小。Innodb 在一个 undo 表空间没有事务使用时,允许将其 truncate 来回收 undo 表空间。回收动作在 coordinator 一批 purge 任务完成后触发,接口为trx_purge_truncate
,其内部主要分为 purge rollback segment 和 truncate space 两个部分。
- purge rollback segment(
trx_purge_truncate_history
)会从所有表空间的所有回滚段回收无用的 undo 数据并清理 history list;
- truncate space(
trx_purge_truncate_undo_spaces
)会检索是否存在满足truncate要求的 undospace(1. 手动设置为 inactive 或大小超过限制;2. 空间中 undo 记录被 purge 完全且没有被任何事务使用),并进行物理文件 truncate 操作。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
|
/** 清理某个 rollback segment */
static void trx_purge_truncate_rseg_history(
trx_rseg_t *rseg, /*!< in: rollback segment */
const purge_iter_t *limit) /*!< in: truncate offset */ {
// 变量初始化,something...
// 从当前 rollback segment 的 history list 上面获取最后位置的 undo log header
rseg->latch();
rseg_hdr = trx_rsegf_get(rseg->space_id, rseg->page_no, rseg->page_size, &mtr);
hdr_addr = trx_purge_get_log_from_hist(flst_get_last(rseg_hdr + TRX_RSEG_HISTORY, &mtr));
loop:
if (hdr_addr.page == FIL_NULL) {
// history 不剩下需要处理的 undo
rseg->unlatch();
mtr_commit(&mtr);
return;
}
undo_page = trx_undo_page_get(page_id_t(rseg->space_id, hdr_addr.page),
rseg->page_size, &mtr);
log_hdr = undo_page + hdr_addr.boffset;
seg_hdr = undo_page + TRX_UNDO_SEG_HDR;
undo_trx_no = mach_read_from_8(log_hdr + TRX_UNDO_TRX_NO);
if (undo_trx_no >= limit->trx_no) {
// 当前要处理的 undo 已经到达、超过 purge limit
if (undo_trx_no == limit->trx_no &&
rseg->space_id == limit->undo_rseg_space) {
// 将所有小于 limit 的 undo normal page free,header page empty
trx_undo_truncate_start(rseg, hdr_addr.page, hdr_addr.boffset,
limit->undo_no);
}
rseg->unlatch();
mtr_commit(&mtr);
return;
}
prev_hdr_addr = trx_purge_get_log_from_hist(flst_get_prev_addr(log_hdr + TRX_UNDO_HISTORY_NODE, &mtr));
if ((mach_read_from_2(seg_hdr + TRX_UNDO_STATE) == TRX_UNDO_TO_PURGE) &&
(mach_read_from_2(log_hdr + TRX_UNDO_NEXT_LOG) == 0)) {
// 无log剩余,回收整个 undo segment
rseg->unlatch();
mtr_commit(&mtr);
trx_purge_free_segment(rseg, hdr_addr, is_temp); // 内部有trx_purge_remove_log_hdr删除history list节点
} else {
// 在history list上删除当前 log header(这里相当于我那次一条history的purge)
trx_purge_remove_log_hdr(rseg_hdr, log_hdr, &mtr);
rseg->unlatch();
mtr_commit(&mtr);
}
// 转移到 history list 上的下一条log
mtr_start(&mtr);
if (is_temp) { mtr.set_log_mode(MTR_LOG_NO_REDO); }
rseg->latch();
rseg_hdr = trx_rsegf_get(rseg->space_id, rseg->page_no, rseg->page_size, &mtr);
hdr_addr = prev_hdr_addr;
goto loop;
}
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
|
// 进行物理文件 truncate
static bool trx_purge_truncate_marked_undo() {
// S0. 变量初始化,something...
/******************************************************
S1. 获取 MDL 锁
*******************************************************/
MDL_ticket *mdl_ticket;
bool dd_result = dd_tablespace_get_mdl(space_name.c_str(), &mdl_ticket, false);
// something...
/******************************************************
S2. 开始 truncate 操作,切换 undospace
*******************************************************/
mutex_enter(&undo::ddl_mutex);
if (!trx_purge_truncate_marked_undo_low(space_num, space_name)) {
mutex_exit(&undo::ddl_mutex);
dd_release_mdl(mdl_ticket);
MONITOR_INC_TIME(MONITOR_UNDO_TRUNCATE_MICROSECOND, counter_time_truncate);
return (false);
}
/******************************************************
S3. 删除 undo log file,标志 undo truncate 完成
*******************************************************/
undo::spaces->x_lock();
undo::done_logging(space_num);
undo::spaces->x_unlock();
// S4. 清理环境,something...
return (true);
}
static bool trx_purge_truncate_marked_undo_low(space_id_t space_num,
std::string space_name) {
/******************************************************
S2.1. 获取环境
*******************************************************/
// someting...
/******************************************************
S2.2. 创建 undo 文件,标志 undo truncate 开始
*******************************************************/
dberr_t err = undo::start_logging(marked_space);
// someting...
/******************************************************
S2.3. 过滤特殊条件,someting...
*******************************************************/
/******************************************************
S2.4. 实际 undo space 文件裁剪轮转
*******************************************************/
/*
计算新 undo space id 和 undo space no;
删除+新建 file tablespace(fil_delete_tablespace + fil_ibd_create);
重新初始化新 tablespace 文件,构建 undo 文件内容;
重新设置相应 undo space 回滚段内存结构体(Rsegs *m_rsegs)
*/
bool success = trx_undo_truncate_tablespace(marked_space);
// someting...
/******************************************************
S2.5. 设定此 undo 空间后面的可用状态
*******************************************************/
space_id_t new_space_id = marked_space->id();
dd_space_states next_state;
undo::spaces->s_lock();
Rsegs *marked_rsegs = marked_space->rsegs();
marked_rsegs->x_lock();
if (marked_rsegs->is_inactive_explicit()) {
// 由外部手动 inactive
next_state = DD_SPACE_STATE_EMPTY;
marked_rsegs->set_empty();
} else {
// 由后台 purge 选择,可被再使用
next_state = DD_SPACE_STATE_ACTIVE;
marked_rsegs->set_active();
}
/******************************************************
S2.6. 在 DD 中更新 space ID 和 state 信息
*******************************************************/
if (DD_FAILURE == dd_tablespace_set_id_and_state(space_name.c_str(),
new_space_id, next_state))
return (false);
return (true);
}
|
- 版权声明:如需转载或引用,请附加本文链接并注明来源。