OceanBase性能调优
本文介绍调试OceanBase代码的方法,以及Nested-Loop-Join性能优化的思路
编译部署OceanBase
-
从源代码编译OceanBase
想要在OceanBase源代码上进行调试,优化,第一件要做的事就是从源代码编译OceanBase。这里推荐克隆OceanBase数据库大赛的比赛分支,克隆完成之后按照官方教程从源码构建OceanBase数据库,其中debug版本可以打断点调试,release版本可以用来测试性能。
-
安装Oceanbase部署工具OBD
接下来参照官方文档安装OceanBase的部署工具OBD,安装完成之后进入OceanBase源码的编译目录(如build_release),在ocenbase-ce v3.1.0的基础上创建tag为obcompetition的OBD镜像。
obd mirror create -n oceanbase-ce -V 3.1.0 -p ./usr/local -t obcompetition
-
部署数据库
创建完镜像之后,可以通过配置文件部署数据库,官方有一些配置文件的示例,本文使用的配置文件ob.yaml如下:
oceanbase-ce: # tag设置为刚才创建镜像obcompetition的tag tag: obcompetition servers: - name: test ip: 127.0.0.1 global: # home_path需要修改成自己想要部署的目录 home_path: ***** devname: lo mysql_port: 2881 rpc_port: 2882 zone: zone1 cluster_id: 1 datafile_size: 10G appname: obcompetition test: syslog_level: INFO enable_syslog_recycle: true enable_syslog_wf: true max_syslog_file_count: 4 memory_limit: 12G system_memory: 6G cpu_count: 16
部署数据库前确定目录home_path为空,之后使用autodeploy自动部署名称为obcompetition的数据库。
obd cluster deploy obcompetition -c ob.yaml
启动数据库,创建测试用租户test,并且将除sys租户以外的资源全部给test租户。
obd cluster start obcompetition obd cluster tenant create obcompetition --tenant-name test
创建完租户之后就可以通过mysql客户端,连接OceanBase的test租户或者sys租户。
mysql --host 127.0.0.1 --port 2881 -uroot@test mysql --host 127.0.0.1 --port 2881 -uroot@sys
-
修改代码后重新部署数据库
在源代码上进行修改之后,首先需要重新编译代码,然后用编译完的内容替换正在运行的observer。首先查看正在运行的observer所在位置,即bin/observer所在的位置,然后ls -l看出这个observer是一个软连接,想要替换它只需要将软连接连接到刚编译出来的observer二进制文件。
ps -ef | grep observer ls -l ***/bin/observer
最后重新启动obcompetition
obd cluster restart obcompetition
代码调试工具
-
vscode远程调试
在阅读,修改OceanBase源码的时候,需要调试代码,不过OceanBase需要的配置比较高,一般部署在服务器上,这时候使用vscode进行远程调试就比较优雅。
首先在vscode装一下Remote - SSH插件,打开服务器上的OceanBase源代码目录,然后再Debug界面创建一个新的launch.json文件。
创建新的launch.json
将launch.json替换为下面的配置,”configurations”→”program”需要替换为OceanBase配置文件里对应内容。
{ "configurations": [ { "name": "(gdb) Attach", "type": "cppdbg", "request": "attach", // program需要替换为/home_path/bin/observer // 其中home_path是OceanBase配置文件里的对应内容 "program": "****", "processId": "${input:FindPID}", "MIMode": "gdb", "sudo": true, "miDebuggerPath": "gdb", "setupCommands": [ { "description": "Enable pretty-printing for gdb", "text": "-enable-pretty-printing", "ignoreFailures": true } ], // 这里建立了一些目录映射 // 如果调试的时候提示找不到source,还需要自己加上对应的目录映射 "sourceFileMap": { "./build_debug/src/observer/./src/observer/omt": { "editorPath": "${workspaceFolder}/src/observer/omt" }, "./build_debug/src/sql/parser/./src/sql/parser": { "editorPath": "${workspaceFolder}/src/sql/parser", "useForBreakpoints": true }, "./build_debug/src/sql/./src/sql": { "editorPath": "${workspaceFolder}/src/sql", "useForBreakpoints": true }, "./build_debug/src/sql/engine/join/./src/sql/engine/join": { "editorPath": "${workspaceFolder}/src/sql/engine/join", "useForBreakpoints": true }, "./build_debug/src/storage/./src/storage": { "editorPath": "${workspaceFolder}/src/storage", "useForBreakpoints": true }, "./build_debug/src/rootserver/./src/rootserver": { "editorPath": "${workspaceFolder}/src/rootserver", "useForBreakpoints": true }, "./build_debug/src/share/./src/share": { "editorPath": "${workspaceFolder}/src/share", "useForBreakpoints": true } } } ], "inputs": [ { "id": "FindPID", "type": "command", "command": "shellCommand.execute", "args": { "command": "ps -aux | grep /bin/observer | awk '{print $2}' | head -1", "description": "Select your observer PID", "useFirstResult": true, } } ] }
然后在服务器上装一个Tasks Shell Input插件,来通过脚本动态获取observer的进程id。
安装Tasks Shell Input插件
这样子在启动observer以后就能成功gdb attach了。
成功gdb attach
-
打日志调试
vscode调试还是存在一些问题的,比如打断点的位置可能有很多系统进程都会访问(尤其是存储层的代码),mysql客户端输入sql以后,catch住的进程不一定是执行sql的工作线程,函数的调用栈可能不是你想要的,这时候可以通过打日志的方式进行调试。
OceanBase的日志类型定义在deps/oblib/src/lib/oblog/ob_log_module.h里面,日志目录在/home_path/log,日志内容的格式:
[time]log_level[module_name]function_name(filename:file_no)[thread_id] [Ytrace_id0_trace_id1][log=last_log_print_time]log_data #time 日志记录时间 #log_level 日志级别 #module_name 模块名 #filename:file_no 文件名:行号 #thread_id 线程id
此外,官方也有讲一些调试手段。
性能测试工具SysBench
OceanBase数据库大赛使用SysBench进行性能测试,首先在测试机(客户端)上安装sysbench。
-
subplan.lua
性能测试使用的是sysbench的subplan.lua脚本,该脚本在sysbench安装目录内,脚本里的schema为两张表t1和t2。
CREATE TABLE t1( c1 int primary key, c2 int, c3 int, v1 CHAR(60), v2 CHAR(60), v3 CHAR(60), v4 CHAR(60), v5 CHAR(60), v6 CHAR(60), v7 CHAR(60), v8 CHAR(60), v9 CHAR(60) ); CREATE TABLE t2( c1 int primary key, c2 int, c3 int, v1 CHAR(60), v2 CHAR(60), v3 CHAR(60), v4 CHAR(60), v5 CHAR(60), v6 CHAR(60), v7 CHAR(60), v8 CHAR(60), v9 CHAR(60) )
t1,t2表建完后插入数据。
INSERT INTO t1 (c1, c2, c3, v1, v2, v3, v4, v5, v6, v7, v8, v9) VALUES(...); INSERT INTO t2 (c1, c2, c3, v1, v2, v3, v4, v5, v6, v7, v8, v9) VALUES(...);
插入数据后在t2表建索引,由于两个索引键都非主键,这两个索引都是二级索引,在查内表时会有一个回表操作,也就是根据索引键查询主键对应的行数据。
create index t2_i1 on t2(c2) local; create index t2_i2 on t2(c3) local;
Select操作限定外表为200个元素的范围查询,通过Hint强制使用Nested-Loop-Join(Index Nested-Loop-Join),并且在c2,c3列进行等值连接。
select /*+ordered use_nl(A,B)*/ * from t1 A, t2 B where A.c1 >= ? and A.c1 < ? and A.c2 = B.c2 and A.c3 = B.c3
explain看一下OceanBase的查询执行计划,A表是一个全表的scan,查出200行数据作为内表,在一次查询内,对于内表的每一行数据,B表作为外表可以通过t2_i2索引快速定位到相对应的匹配的数据。
=============================================== |ID|OPERATOR |NAME |EST. ROWS|COST | ----------------------------------------------- |0 |NESTED-LOOP JOIN| |193 |138185| |1 | TABLE SCAN |A |200 |188 | |2 | TABLE SCAN |B(t2_i2)|1 |690 | =============================================== Outputs & filters: ------------------------------------- 0 - output([A.c1], [A.c2], [A.c3], [A.v1], [A.v2], [A.v3], [A.v4], [A.v5], [A.v6], [A.v7], [A.v8], [A.v9], [B.c1], [B.c2], [B.c3], [B.v1], [B.v2], [B.v3], [B.v4], [B.v5], [B.v6], [B.v7], [B.v8], [B.v9]), filter(nil), conds(nil), nl_params_([A.c2], [A.c3]), batch_join=false 1 - output([A.c1], [A.c2], [A.c3], [A.v1], [A.v2], [A.v3], [A.v4], [A.v5], [A.v6], [A.v7], [A.v8], [A.v9]), filter(nil), access([A.c1], [A.c2], [A.c3], [A.v1], [A.v2], [A.v3], [A.v4], [A.v5], [A.v6], [A.v7], [A.v8], [A.v9]), partitions(p0), is_index_back=false, range_key([A.c1]), range[200 ; 400), range_cond([A.c1 >= 200], [A.c1 < 400]) 2 - output([B.c2], [B.c3], [B.c1], [B.v1], [B.v2], [B.v3], [B.v4], [B.v5], [B.v6], [B.v7], [B.v8], [B.v9]), filter([? = B.c2]), access([B.c2], [B.c3], [B.c1], [B.v1], [B.v2], [B.v3], [B.v4], [B.v5], [B.v6], [B.v7], [B.v8], [B.v9]), partitions(p0), is_index_back=true, filter_before_indexback[false], range_key([B.c3], [B.c1]), range(MIN ; MAX), range_cond([? = B.c3])
-
测试脚本
USER=root@test DB=test HOST=127.0.0.1 PORT=2881 THREADS=128 TABLE_SIZE=100000 TABLES=3 TIME=300 REPORT_INTERVAL=10 sysbench --db-ps-mode=disable --mysql-host=$HOST --mysql-port=$PORT \ --rand-type=uniform --rand-seed=26765 \ --mysql-user=$USER --mysql-db=$DB \ --threads=$THREADS \ --tables=$TABLES --table_size=$TABLE_SIZE \ --time=$TIME --report-interval=$REPORT_INTERVAL \ subplan cleanup sysbench --db-ps-mode=disable --mysql-host=$HOST --mysql-port=$PORT \ --rand-type=uniform --rand-seed=26765 \ --mysql-user=$USER --mysql-db=$DB \ --threads=$THREADS \ --tables=$TABLES --table_size=$TABLE_SIZE \ --time=$TIME --report-interval=$REPORT_INTERVAL \ subplan prepare sysbench --db-ps-mode=disable --mysql-host=$HOST --mysql-port=$PORT \ --rand-type=uniform --rand-seed=26765 \ --mysql-user=$USER --mysql-db=$DB \ --threads=$THREADS \ --tables=$TABLES --table_size=$TABLE_SIZE \ --time=$TIME --report-interval=$REPORT_INTERVAL \ subplan run
性能调优
-
iotop
首先看一下sysbench测试过程中,observer的磁盘IO情况,这里选用iotop来从系统/proc目录下读取进程的IO信息进行汇总。
sudo iotop -o
sysbench过程中observer的磁盘IO情况
可以看出在sysbench测试一段时间后,只有一些异步日志落盘和事务redo日志会产生写IO,读请求的内容不多,应该被cache在内存里了。再加上官方给的优化建议也是从内存优化入手,我们可以把重心放在内存优化上。
-
perf
perf是一个轻量级的profiling工具。perf top可以实时打印采样函数,显示出花费大部分CPU时间的函数。
sudo perf top -p observer_pid
perf top查看热点函数
perf top返回的界面还可以交互,通过annotate跳进函数,还可以看到每个指令的耗时占比。不过返回的都是反汇编的结果,难以将其与源代码联系起来。
annotate查看函数每条指令的执行时间占比
perf stat还能看程序的branch-misses情况。
sudo perf stat -p observer_pid
-
FlameGraph
perf工具的采样结果需要在终端一个个点开函数才能看到调用栈的信息,比较难以对代码的执行流程有一个宏观上的认识,FlameGraph能够帮助我们可视化perf采样的结果。
NestedLoopJoin算子的三个主要部分
跑sysbench的同时跑一下火焰图,可以看到在NLJ的负载模式下,OceanBase的NestedLoopJoin物理算子执行流程主要包含三个部分(三个蓝色箭头指示),中间部分是对左表的扫描,右边部分是根据左表的每一行,先通过B.c3列(其实就是A.c3列的值)查询索引t2_i2,获取到rowkey后再查询t2,左边部分是左表每一行匹配完,与右表完成Join之后,会重置右表的扫描状态。
一些优化点
-
优化右表回表逻辑
右表会从索引表一次拿batch rowkeys,然后根据rowkeys数组通过ObMultipleGetMerge查询主表,从火焰图可以看出,这一块占了很大的比重。事实上,如果从索引表只拿到一个rowkey,可以使用ObSingleMerge查询主表,效率更高。
原始代码的右表查索引表和回表过程
diff --git a/src/storage/ob_index_merge.cpp b/src/storage/ob_index_merge.cpp index e6386773..82c59ba4 100644 --- a/src/storage/ob_index_merge.cpp +++ b/src/storage/ob_index_merge.cpp @@ -30,7 +30,9 @@ ObIndexMerge::ObIndexMerge() rowkeys_(), rowkey_allocator_(ObModIds::OB_SSTABLE_GET_SCAN), rowkey_range_idx_(), - index_range_array_cursor_(0) + index_range_array_cursor_(0), + table_iter_single_(), + is_single_(0) {} ObIndexMerge::~ObIndexMerge() @@ -49,12 +51,20 @@ void ObIndexMerge::reset() rowkey_allocator_.reset(); rowkey_range_idx_.reset(); index_range_array_cursor_ = 0; + if (is_single_) { + table_iter_single_.reset(); + is_single_ = 0; + } } void ObIndexMerge::reuse() { table_iter_.reuse(); index_range_array_cursor_ = 0; + // if (is_single_) { + // table_iter_single_.reuse(); + // is_single_ = 0; + // } } int ObIndexMerge::open(ObQueryRowIterator& index_iter) @@ -71,11 +81,14 @@ int ObIndexMerge::init(const ObTableAccessParam& param, const ObTableAccessParam int ret = OB_SUCCESS; if (OB_FAIL(table_iter_.init(param, context, get_table_param))) { STORAGE_LOG(WARN, "Fail to init table iter, ", K(ret)); + } else if (OB_FAIL(table_iter_single_.init(param, context, get_table_param))) { + STORAGE_LOG(WARN, "Fail to init table single iter, ", K(ret)); } else { index_param_ = &index_param; access_ctx_ = &context; rowkey_cnt_ = param.iter_param_.rowkey_cnt_; } + is_single_ = 0; return ret; } @@ -107,7 +120,6 @@ int ObIndexMerge::get_next_row(ObStoreRow*& row) ObExtStoreRowkey dest_key; rowkeys_.reuse(); rowkey_allocator_.reuse(); - table_iter_.reuse(); access_ctx_->allocator_->reuse(); for (int64_t i = 0; OB_SUCC(ret) && i < MAX_NUM_PER_BATCH; ++i) { if (OB_FAIL(index_iter_->get_next_row(index_row))) { @@ -139,10 +151,21 @@ int ObIndexMerge::get_next_row(ObStoreRow*& row) } if (OB_SUCC(ret)) { - if (OB_FAIL(table_iter_.open(rowkeys_))) { - STORAGE_LOG(WARN, "fail to open iterator", K(ret)); + if (1 == rowkeys_.count()) { + table_iter_single_.reuse(); + is_single_ = 1; + if (OB_FAIL(table_iter_single_.open(rowkeys_[0]))) { + STORAGE_LOG(WARN, "fail to open iterator", K(ret)); + } else { + main_iter_ = &table_iter_single_; + } } else { - main_iter_ = &table_iter_; + table_iter_.reuse(); + if (OB_FAIL(table_iter_.open(rowkeys_))) { + STORAGE_LOG(WARN, "fail to open iterator", K(ret)); + } else { + main_iter_ = &table_iter_; + } } } } diff --git a/src/storage/ob_index_merge.h b/src/storage/ob_index_merge.h index e7a6cde5..cdeb0da1 100644 --- a/src/storage/ob_index_merge.h +++ b/src/storage/ob_index_merge.h @@ -19,6 +19,7 @@ #include "storage/ob_multiple_get_merge.h" #include "storage/ob_query_iterator_util.h" #include "storage/blocksstable/ob_block_sstable_struct.h" +#include "storage/ob_single_merge.h" namespace oceanbase { namespace storage { @@ -50,6 +51,8 @@ class ObIndexMerge : public ObQueryRowIterator { common::ObArenaAllocator rowkey_allocator_; ObArray<int64_t> rowkey_range_idx_; int64_t index_range_array_cursor_; + ObSingleMerge table_iter_single_; + int is_single_; private: DISALLOW_COPY_AND_ASSIGN(ObIndexMerge); diff --git a/src/storage/ob_single_merge.cpp b/src/storage/ob_single_merge.cpp index 42a34425..3c76aa0f 100644 --- a/src/storage/ob_single_merge.cpp +++ b/src/storage/ob_single_merge.cpp @@ -141,9 +141,10 @@ int ObSingleMerge::inner_get_next_row(ObStoreRow& row) int64_t end_table_idx = 0; int64_t row_cache_snapshot_version = 0; const ObIArray<ObITable*>& tables = tables_handle_.get_tables(); - const bool enable_fuse_row_cache = is_x86() && access_ctx_->use_fuse_row_cache_ && - access_param_->iter_param_.enable_fuse_row_cache() && - access_ctx_->fuse_row_cache_hit_rate_ > 6; + // const bool enable_fuse_row_cache = is_x86() && access_ctx_->use_fuse_row_cache_ && + // access_param_->iter_param_.enable_fuse_row_cache() && + // access_ctx_->fuse_row_cache_hit_rate_ > 6; + const bool enable_fuse_row_cache = false; access_ctx_->query_flag_.set_not_use_row_cache(); const int64_t table_cnt = tables.count(); ObITable* table = NULL;
rowkey为1时使用SingleMerge回表
-
rescan过程中尽量少进行对象析构
从火焰图中可以看出,reuse_row_iters会析构掉很多对象,优化思路是保证这些被析构的对象在整个查询过程中始终内存有效,并且每次rescan时重置一些状态。
reuse row iters的内容
diff --git a/src/storage/memtable/ob_memtable.cpp b/src/storage/memtable/ob_memtable.cpp index 3a4d28f7..ba7d295d 100644 --- a/src/storage/memtable/ob_memtable.cpp +++ b/src/storage/memtable/ob_memtable.cpp @@ -1033,7 +1033,7 @@ int ObMemtable::get(const storage::ObTableIterParam& param, storage::ObTableAcce TRANS_LOG(WARN, "invalid argument, ", K(ret), K(param), K(context)); } else if (OB_FAIL(context.store_ctx_->mem_ctx_->get_trans_status())) { TRANS_LOG(WARN, "trans already end", K(ret)); - } else if (NULL == (get_iter_buffer = context.allocator_->alloc(sizeof(ObMemtableGetIterator))) || + } else if (NULL == (get_iter_buffer = context.stmt_allocator_->alloc(sizeof(ObMemtableGetIterator))) || NULL == (get_iter_ptr = new (get_iter_buffer) ObMemtableGetIterator())) { TRANS_LOG(WARN, "construct ObMemtableGetIterator fail"); ret = OB_ALLOCATE_MEMORY_FAILED; @@ -1082,7 +1082,7 @@ int ObMemtable::scan(const storage::ObTableIterParam& param, storage::ObTableAcc } else { if (param.is_multi_version_minor_merge_) { if (GCONF._enable_sparse_row) { - if (NULL == (scan_iter_buffer = context.allocator_->alloc(sizeof(ObMemtableMultiVersionScanSparseIterator))) || + if (NULL == (scan_iter_buffer = context.stmt_allocator_->alloc(sizeof(ObMemtableMultiVersionScanSparseIterator))) || NULL == (scan_iter_ptr = new (scan_iter_buffer) ObMemtableMultiVersionScanSparseIterator())) { TRANS_LOG(WARN, "construct ObMemtableMultiVersionScanSparseIterator fail", @@ -1099,7 +1099,7 @@ int ObMemtable::scan(const storage::ObTableIterParam& param, storage::ObTableAcc TRANS_LOG(WARN, "scan iter init fail", "ret", ret, K(real_range), K(param), K(context)); } } else { - if (NULL == (scan_iter_buffer = context.allocator_->alloc(sizeof(ObMemtableMultiVersionScanIterator))) || + if (NULL == (scan_iter_buffer = context.stmt_allocator_->alloc(sizeof(ObMemtableMultiVersionScanIterator))) || NULL == (scan_iter_ptr = new (scan_iter_buffer) ObMemtableMultiVersionScanIterator())) { TRANS_LOG(WARN, "construct ObMemtableScanIterator fail", @@ -1117,7 +1117,7 @@ int ObMemtable::scan(const storage::ObTableIterParam& param, storage::ObTableAcc } } } else { - if (NULL == (scan_iter_buffer = context.allocator_->alloc(sizeof(ObMemtableScanIterator))) || + if (NULL == (scan_iter_buffer = context.stmt_allocator_->alloc(sizeof(ObMemtableScanIterator))) || NULL == (scan_iter_ptr = new (scan_iter_buffer) ObMemtableScanIterator())) { TRANS_LOG(WARN, "construct ObMemtableScanIterator fail", @@ -1162,7 +1162,7 @@ int ObMemtable::multi_get(const storage::ObTableIterParam& param, storage::ObTab TRANS_LOG(WARN, "invalid argument, ", K(ret), K(param), K(context), K(rowkeys)); } else if (OB_FAIL(context.store_ctx_->mem_ctx_->get_trans_status())) { TRANS_LOG(WARN, "trans already end", K(ret)); - } else if (NULL == (mget_iter_buffer = context.allocator_->alloc(sizeof(ObMemtableMGetIterator))) || + } else if (NULL == (mget_iter_buffer = context.stmt_allocator_->alloc(sizeof(ObMemtableMGetIterator))) || NULL == (mget_iter_ptr = new (mget_iter_buffer) ObMemtableMGetIterator())) { TRANS_LOG(WARN, "construct ObMemtableMGetIterator fail", @@ -1212,7 +1212,7 @@ int ObMemtable::multi_scan(const storage::ObTableIterParam& param, storage::ObTa TRANS_LOG(WARN, "invalid argument, ", K(ret), K(param), K(context), K(ranges)); } else if (OB_FAIL(context.store_ctx_->mem_ctx_->get_trans_status())) { TRANS_LOG(WARN, "trans already end", K(ret)); - } else if (NULL == (mscan_iter_buffer = context.allocator_->alloc(sizeof(ObMemtableMScanIterator))) || + } else if (NULL == (mscan_iter_buffer = context.stmt_allocator_->alloc(sizeof(ObMemtableMScanIterator))) || NULL == (mscan_iter_ptr = new (mscan_iter_buffer) ObMemtableMScanIterator())) { TRANS_LOG(WARN, "construct ObMemtableMScanIterator fail", diff --git a/src/storage/ob_i_store.h b/src/storage/ob_i_store.h index e13283f7..69971590 100644 --- a/src/storage/ob_i_store.h +++ b/src/storage/ob_i_store.h @@ -833,6 +833,8 @@ public: } virtual void reuse() {} + virtual void reset() + {} virtual bool is_base_sstable_iter() const { return false; diff --git a/src/storage/ob_multiple_get_merge.cpp b/src/storage/ob_multiple_get_merge.cpp index ebfd26a7..c5686543 100644 --- a/src/storage/ob_multiple_get_merge.cpp +++ b/src/storage/ob_multiple_get_merge.cpp @@ -82,7 +82,7 @@ void ObMultipleGetMerge::reset_with_fuse_row_cache() handles_ = nullptr; } prefetch_cnt_ = 0; - reuse_iter_array(); + reset_iter_array(); } void ObMultipleGetMerge::reset() diff --git a/src/storage/ob_multiple_merge.cpp b/src/storage/ob_multiple_merge.cpp index be8de75f..f5c92405 100644 --- a/src/storage/ob_multiple_merge.cpp +++ b/src/storage/ob_multiple_merge.cpp @@ -505,6 +505,10 @@ void ObMultipleMerge::reset() if (NULL != (iter = iters_.at(i))) { iter->~ObStoreRowIterator(); } + if (OB_NOT_NULL(access_ctx_->stmt_allocator_)) { + access_ctx_->stmt_allocator_->free(iter); + } + iter = NULL; } padding_allocator_.reset(); iters_.reset(); @@ -541,17 +545,31 @@ void ObMultipleMerge::reuse() read_memtable_only_ = false; } -void ObMultipleMerge::reuse_iter_array() +void ObMultipleMerge::reset_iter_array() { ObStoreRowIterator* iter = NULL; for (int64_t i = 0; i < iters_.count(); ++i) { if (NULL != (iter = iters_.at(i))) { iter->~ObStoreRowIterator(); } + if (OB_NOT_NULL(access_ctx_->stmt_allocator_)) { + access_ctx_->stmt_allocator_->free(iter); + } + iter = NULL; } iters_.reuse(); } +void ObMultipleMerge::reuse_iter_array() +{ + ObStoreRowIterator* iter = NULL; + for (int64_t i = 0; i < iters_.count(); ++i) { + if (NULL != (iter = iters_.at(i))) { + iter->reuse(); + } + } +} + int ObMultipleMerge::open() { int ret = OB_SUCCESS; @@ -946,7 +964,7 @@ int ObMultipleMerge::refresh_table_on_demand() } else if (need_refresh) { if (OB_FAIL(save_curr_rowkey())) { STORAGE_LOG(WARN, "fail to save current rowkey", K(ret)); - } else if (FALSE_IT(reuse_iter_array())) { + } else if (FALSE_IT(reset_iter_array())) { } else if (OB_FAIL(prepare_read_tables())) { STORAGE_LOG(WARN, "fail to prepare read tables", K(ret)); } else if (OB_FAIL(reset_tables())) { diff --git a/src/storage/ob_multiple_merge.h b/src/storage/ob_multiple_merge.h index ed227202..a560172d 100644 --- a/src/storage/ob_multiple_merge.h +++ b/src/storage/ob_multiple_merge.h @@ -80,6 +80,7 @@ protected: const ObTableIterParam* get_actual_iter_param(const ObITable* table) const; int project_row(const ObStoreRow& unprojected_row, const common::ObIArray<int32_t>* projector, const int64_t range_idx_delta, ObStoreRow& projected_row); + void reset_iter_array(); void reuse_iter_array(); virtual int skip_to_range(const int64_t range_idx); diff --git a/src/storage/ob_sstable.cpp b/src/storage/ob_sstable.cpp index 13a3f0fa..c0713bbc 100644 --- a/src/storage/ob_sstable.cpp +++ b/src/storage/ob_sstable.cpp @@ -1105,14 +1105,14 @@ int ObSSTable::get(const storage::ObTableIterParam& param, storage::ObTableAcces ObISSTableRowIterator* row_getter = NULL; if (is_multi_version_minor_sstable() && (context.is_multi_version_read(get_upper_trans_version()) || contain_uncommitted_row() || !meta_.has_compact_row_)) { - if (NULL == (buf = context.allocator_->alloc(sizeof(ObSSTableMultiVersionRowGetter)))) { + if (NULL == (buf = context.stmt_allocator_->alloc(sizeof(ObSSTableMultiVersionRowGetter)))) { ret = OB_ALLOCATE_MEMORY_FAILED; STORAGE_LOG(WARN, "Fail to allocate memory, ", K(ret)); } else { row_getter = new (buf) ObSSTableMultiVersionRowGetter(); } } else { - if (NULL == (buf = context.allocator_->alloc(sizeof(ObSSTableRowGetter)))) { + if (NULL == (buf = context.stmt_allocator_->alloc(sizeof(ObSSTableRowGetter)))) { ret = OB_ALLOCATE_MEMORY_FAILED; STORAGE_LOG(WARN, "Fail to allocate memory, ", K(ret)); } else { @@ -1163,14 +1163,14 @@ int ObSSTable::multi_get(const ObTableIterParam& param, ObTableAccessContext& co ObISSTableRowIterator* row_getter = NULL; if (is_multi_version_minor_sstable() && (context.is_multi_version_read(get_upper_trans_version()) || contain_uncommitted_row() || !meta_.has_compact_row_)) { - if (NULL == (buf = context.allocator_->alloc(sizeof(ObSSTableMultiVersionRowMultiGetter)))) { + if (NULL == (buf = context.stmt_allocator_->alloc(sizeof(ObSSTableMultiVersionRowMultiGetter)))) { ret = OB_ALLOCATE_MEMORY_FAILED; STORAGE_LOG(WARN, "Fail to allocate memory, ", K(ret)); } else { row_getter = new (buf) ObSSTableMultiVersionRowMultiGetter(); } } else { - if (NULL == (buf = context.allocator_->alloc(sizeof(ObSSTableRowMultiGetter)))) { + if (NULL == (buf = context.stmt_allocator_->alloc(sizeof(ObSSTableRowMultiGetter)))) { ret = OB_ALLOCATE_MEMORY_FAILED; STORAGE_LOG(WARN, "Fail to allocate memory, ", K(ret)); } else { @@ -1269,21 +1269,21 @@ int ObSSTable::scan(const ObTableIterParam& param, ObTableAccessContext& context void* buf = NULL; ObISSTableRowIterator* row_scanner = NULL; if (context.query_flag_.is_whole_macro_scan()) { - if (NULL == (buf = context.allocator_->alloc(sizeof(ObSSTableRowWholeScanner)))) { + if (NULL == (buf = context.stmt_allocator_->alloc(sizeof(ObSSTableRowWholeScanner)))) { ret = OB_ALLOCATE_MEMORY_FAILED; STORAGE_LOG(WARN, "Fail to allocate memory, ", K(ret)); } else { row_scanner = new (buf) ObSSTableRowWholeScanner(); } } else if (is_multi_version_minor_sstable()) { - if (NULL == (buf = context.allocator_->alloc(sizeof(ObSSTableMultiVersionRowScanner)))) { + if (NULL == (buf = context.stmt_allocator_->alloc(sizeof(ObSSTableMultiVersionRowScanner)))) { ret = OB_ALLOCATE_MEMORY_FAILED; STORAGE_LOG(WARN, "Fail to allocate memory, ", K(ret)); } else { row_scanner = new (buf) ObSSTableMultiVersionRowScanner(); } } else { - if (NULL == (buf = context.allocator_->alloc(sizeof(ObSSTableRowScanner)))) { + if (NULL == (buf = context.stmt_allocator_->alloc(sizeof(ObSSTableRowScanner)))) { ret = OB_ALLOCATE_MEMORY_FAILED; STORAGE_LOG(WARN, "Fail to allocate memory, ", K(ret)); } else { @@ -1435,14 +1435,14 @@ int ObSSTable::multi_scan(const ObTableIterParam& param, ObTableAccessContext& c void* buf = NULL; ObISSTableRowIterator* row_scanner = NULL; if (is_multi_version_minor_sstable()) { - if (NULL == (buf = context.allocator_->alloc(sizeof(ObSSTableMultiVersionRowMultiScanner)))) { + if (NULL == (buf = context.stmt_allocator_->alloc(sizeof(ObSSTableMultiVersionRowMultiScanner)))) { ret = OB_ALLOCATE_MEMORY_FAILED; STORAGE_LOG(WARN, "Fail to allocate memory, ", K(ret)); } else { row_scanner = new (buf) ObSSTableMultiVersionRowMultiScanner(); } } else { - if (NULL == (buf = context.allocator_->alloc(sizeof(ObSSTableRowMultiScanner)))) { + if (NULL == (buf = context.stmt_allocator_->alloc(sizeof(ObSSTableRowMultiScanner)))) { ret = OB_ALLOCATE_MEMORY_FAILED; STORAGE_LOG(WARN, "Fail to allocate memory, ", K(ret)); } else { diff --git a/src/storage/ob_sstable_row_iterator.cpp b/src/storage/ob_sstable_row_iterator.cpp index 27c89147..cc0d2dd5 100644 --- a/src/storage/ob_sstable_row_iterator.cpp +++ b/src/storage/ob_sstable_row_iterator.cpp @@ -1539,7 +1539,7 @@ int ObSSTableRowIterator::alloc_micro_getter() int ret = OB_SUCCESS; void* buf = NULL; if (NULL == micro_getter_) { - if (NULL == (buf = access_ctx_->allocator_->alloc(sizeof(ObMicroBlockRowGetter)))) { + if (NULL == (buf = access_ctx_->stmt_allocator_->alloc(sizeof(ObMicroBlockRowGetter)))) { ret = OB_ALLOCATE_MEMORY_FAILED; STORAGE_LOG(WARN, "Fail to allocate memory, ", K(ret)); } else { @@ -1572,14 +1572,14 @@ int ObSSTableRowIterator::open_cur_micro_block(ObSSTableReadHandle& read_handle, if (NULL == micro_scanner_) { // alloc scanner if (!sstable_->is_multi_version_minor_sstable()) { - if (NULL == (buf = access_ctx_->allocator_->alloc(sizeof(ObMicroBlockRowScanner)))) { + if (NULL == (buf = access_ctx_->stmt_allocator_->alloc(sizeof(ObMicroBlockRowScanner)))) { ret = OB_ALLOCATE_MEMORY_FAILED; STORAGE_LOG(WARN, "Fail to allocate memory for micro block scanner, ", K(ret)); } else { micro_scanner_ = new (buf) ObMicroBlockRowScanner(); } } else { - if (NULL == (buf = access_ctx_->allocator_->alloc(sizeof(ObMultiVersionMicroBlockRowScanner)))) { + if (NULL == (buf = access_ctx_->stmt_allocator_->alloc(sizeof(ObMultiVersionMicroBlockRowScanner)))) { ret = OB_ALLOCATE_MEMORY_FAILED; STORAGE_LOG(WARN, "Fail to allocate memory for micro block scanner, ", K(ret)); } else { diff --git a/src/storage/memtable/ob_memtable.cpp b/src/storage/memtable/ob_memtable.cpp index ba7d295d..d1a02dc1 100644 --- a/src/storage/memtable/ob_memtable.cpp +++ b/src/storage/memtable/ob_memtable.cpp @@ -1048,6 +1048,7 @@ int ObMemtable::get(const storage::ObTableIterParam& param, storage::ObTableAcce if (OB_FAIL(ret)) { if (NULL != get_iter_ptr) { get_iter_ptr->~ObMemtableGetIterator(); + context.stmt_allocator_->free(get_iter_ptr); get_iter_ptr = NULL; } TRANS_LOG(WARN, "get fail", K(ret), K_(key), K(param.table_id_)); @@ -1139,6 +1140,7 @@ int ObMemtable::scan(const storage::ObTableIterParam& param, storage::ObTableAcc } else { if (NULL != scan_iter_ptr) { scan_iter_ptr->~ObIMemtableScanIterator(); + context.stmt_allocator_->free(scan_iter_ptr); scan_iter_ptr = NULL; } TRANS_LOG( @@ -1182,6 +1184,7 @@ int ObMemtable::multi_get(const storage::ObTableIterParam& param, storage::ObTab if (OB_FAIL(ret)) { if (NULL != mget_iter_ptr) { mget_iter_ptr->~ObMemtableMGetIterator(); + context.stmt_allocator_->free(mget_iter_ptr); mget_iter_ptr = NULL; } TRANS_LOG(WARN, @@ -1233,6 +1236,7 @@ int ObMemtable::multi_scan(const storage::ObTableIterParam& param, storage::ObTa if (OB_FAIL(ret)) { if (NULL != mscan_iter_ptr) { mscan_iter_ptr->~ObMemtableMScanIterator(); + context.stmt_allocator_->free(mscan_iter_ptr); mscan_iter_ptr = NULL; } TRANS_LOG(WARN, diff --git a/src/storage/ob_multiple_merge.cpp b/src/storage/ob_multiple_merge.cpp index f5c92405..6426010c 100644 --- a/src/storage/ob_multiple_merge.cpp +++ b/src/storage/ob_multiple_merge.cpp @@ -993,7 +993,7 @@ int ObMultipleMerge::release_table_ref() STORAGE_LOG(WARN, "fail to check need refresh table", K(ret)); } else if (need_refresh) { tables_handle_.reset(); - reuse_iter_array(); + reset_iter_array(); is_tables_reset_ = true; STORAGE_LOG(INFO, "table need to be released", "table_id", access_param_->iter_param_.table_id_, K(*access_param_), K(curr_scan_index_)); diff --git a/src/storage/ob_sstable_multi_version_row_iterator.cpp b/src/storage/ob_sstable_multi_version_row_iterator.cpp index 95b94b11..e295ccee 100644 --- a/src/storage/ob_sstable_multi_version_row_iterator.cpp +++ b/src/storage/ob_sstable_multi_version_row_iterator.cpp @@ -57,10 +57,13 @@ void ObSSTableMultiVersionRowIterator::reset() void ObSSTableMultiVersionRowIterator::reuse() { - ObISSTableRowIterator::reuse(); + ObISSTableRowIterator::reset(); + // ObISSTableRowIterator::reuse(); query_range_ = NULL; if (NULL != iter_) { - iter_->reuse(); + // iter_->reuse(); + iter_->~ObSSTableRowIterator(); + iter_ = NULL; } out_cols_cnt_ = 0; range_idx_ = 0; @@ -123,7 +126,7 @@ int ObSSTableMultiVersionRowGetter::inner_open( if (OB_FAIL(ObVersionStoreRangeConversionHelper::store_rowkey_to_multi_version_range( *rowkey_, access_ctx.trans_version_range_, *access_ctx.allocator_, multi_version_range_))) { LOG_WARN("convert to multi version range failed", K(ret), K(*rowkey_)); - } else if (OB_FAIL(new_iterator<ObSSTableRowScanner>(*access_ctx.allocator_))) { + } else if (OB_FAIL(new_iterator<ObSSTableRowScanner>(*access_ctx.stmt_allocator_))) { LOG_WARN("failed to new iterator", K(ret)); } else if (OB_FAIL(iter_->init(iter_param, access_ctx, table, &multi_version_range_))) { LOG_WARN("failed to open scanner", K(ret)); @@ -213,7 +216,7 @@ int ObSSTableMultiVersionRowScanner::inner_open( if (OB_FAIL(ObVersionStoreRangeConversionHelper::range_to_multi_version_range( *range_, access_ctx.trans_version_range_, *access_ctx.allocator_, multi_version_range_))) { LOG_WARN("convert to multi version range failed", K(ret), K(*range_)); - } else if (OB_FAIL(new_iterator<ObSSTableRowScanner>(*access_ctx.allocator_))) { + } else if (OB_FAIL(new_iterator<ObSSTableRowScanner>(*access_ctx.stmt_allocator_))) { LOG_WARN("failed to new iterator", K(ret)); } else if (OB_FAIL(iter_->init(iter_param, access_ctx, table, &multi_version_range_))) { LOG_WARN("failed to open scanner", K(ret)); @@ -306,7 +309,7 @@ int ObSSTableMultiVersionRowMultiGetter::inner_open( } } if (OB_FAIL(ret)) { - } else if (OB_FAIL(new_iterator<ObSSTableRowMultiScanner>(*access_ctx.allocator_))) { + } else if (OB_FAIL(new_iterator<ObSSTableRowMultiScanner>(*access_ctx.stmt_allocator_))) { LOG_WARN("failed to new iterator", K(ret)); } else if (OB_FAIL(iter_->init(iter_param, access_ctx, table, &multi_version_ranges_))) { LOG_WARN("failed to open multi scanner", K(ret)); @@ -431,7 +434,7 @@ int ObSSTableMultiVersionRowMultiScanner::inner_open( } if (OB_FAIL(ret)) { - } else if (OB_FAIL(new_iterator<ObSSTableRowMultiScanner>(*access_ctx.allocator_))) { + } else if (OB_FAIL(new_iterator<ObSSTableRowMultiScanner>(*access_ctx.stmt_allocator_))) { LOG_WARN("failed to new iterator", K(ret)); } else if (OB_FAIL(iter_->init(iter_param, access_ctx, table, &multi_version_ranges_))) { LOG_WARN("failed to open scanner", K(ret)); diff --git a/src/storage/ob_sstable_row_iterator.cpp b/src/storage/ob_sstable_row_iterator.cpp index cc0d2dd5..acc43774 100644 --- a/src/storage/ob_sstable_row_iterator.cpp +++ b/src/storage/ob_sstable_row_iterator.cpp @@ -469,13 +469,13 @@ int ObSSTableRowIterator::inner_open( STORAGE_LOG(WARN, "Unexpected error, ", K(ret), K_(read_handle_cnt), K_(micro_handle_cnt)); } else if (OB_FAIL(init_handle_mgr(iter_param, access_ctx, query_range))) { STORAGE_LOG(WARN, "fail to init handle mgr", K(ret), K(iter_param), K(access_ctx)); - } else if (OB_FAIL(read_handles_.reserve(*access_ctx.allocator_, read_handle_cnt_))) { + } else if (OB_FAIL(read_handles_.reserve(*access_ctx.stmt_allocator_, read_handle_cnt_))) { STORAGE_LOG(WARN, "failed to reserve read handles", K(ret), K_(read_handle_cnt)); - } else if (OB_FAIL(micro_handles_.reserve(*access_ctx.allocator_, micro_handle_cnt_))) { + } else if (OB_FAIL(micro_handles_.reserve(*access_ctx.stmt_allocator_, micro_handle_cnt_))) { STORAGE_LOG(WARN, "failed to reserve micro handles", K(ret), K_(micro_handle_cnt)); - } else if (OB_FAIL(sstable_micro_infos_.reserve(*access_ctx.allocator_, micro_handle_cnt_))) { + } else if (OB_FAIL(sstable_micro_infos_.reserve(*access_ctx.stmt_allocator_, micro_handle_cnt_))) { STORAGE_LOG(WARN, "failed to reserve sstable micro infos", K(ret), K_(micro_handle_cnt)); - } else if (OB_FAIL(sorted_sstable_micro_infos_.reserve(*access_ctx.allocator_, micro_handle_cnt_))) { + } else if (OB_FAIL(sorted_sstable_micro_infos_.reserve(*access_ctx.stmt_allocator_, micro_handle_cnt_))) { STORAGE_LOG(WARN, "failed to reserve sorted sstable micro infos", K(ret), K_(micro_handle_cnt)); } else { sstable_ = static_cast<ObSSTable*>(table); diff --git a/src/storage/ob_multiple_multi_scan_merge.cpp b/src/storage/ob_multiple_multi_scan_merge.cpp index a7b2a571..55bdabd8 100644 --- a/src/storage/ob_multiple_multi_scan_merge.cpp +++ b/src/storage/ob_multiple_multi_scan_merge.cpp @@ -228,7 +228,7 @@ int ObMultipleMultiScanMerge::construct_iters() iter->~ObStoreRowIterator(); STORAGE_LOG(WARN, "Fail to push iter to iterator array, ", K(ret), K(i)); } - } else if (OB_ISNULL(iters_.at(tables.count() - 1 - i))) { + } else if (OB_ISNULL(iter = iters_.at(tables.count() - 1 - i))) { ret = OB_ERR_UNEXPECTED; STORAGE_LOG(WARN, "Unexpected null iter", K(ret), "idx", tables.count() - 1 - i, K_(iters)); } else if (OB_FAIL(iter->init(*iter_param, *access_ctx_, table, ranges_))) { diff --git a/src/storage/blocksstable/ob_micro_block_row_scanner.cpp b/src/storage/blocksstable/ob_micro_block_row_scanner.cpp index d6fd2648..cdc0297f 100644 --- a/src/storage/blocksstable/ob_micro_block_row_scanner.cpp +++ b/src/storage/blocksstable/ob_micro_block_row_scanner.cpp @@ -445,7 +445,7 @@ int ObMicroBlockRowScanner::init(const ObTableIterParam& param, ObTableAccessCon STORAGE_LOG(WARN, "fail to get projector", K(ret)); } else if (OB_FAIL(param_->get_column_map(false /*is get*/, column_id_map))) { STORAGE_LOG(WARN, "fail to get column id map", K(ret)); - } else if (OB_FAIL(column_map_.init(*context_->allocator_, + } else if (OB_FAIL(column_map_.init(*context_->stmt_allocator_, param_->schema_version_, param_->rowkey_cnt_, 0, /*store count*/ @@ -573,7 +573,7 @@ int ObMultiVersionMicroBlockRowScanner::init( STORAGE_LOG(WARN, "fail to get projector", K(ret)); } else if (OB_FAIL(param_->get_column_map(context.use_fuse_row_cache_, column_id_map))) { STORAGE_LOG(WARN, "fail to get column id map", K(ret)); - } else if (OB_FAIL(column_map_.init(*context_->allocator_, + } else if (OB_FAIL(column_map_.init(*context_->stmt_allocator_, param_->schema_version_, param_->rowkey_cnt_, 0, /*store count*/ @@ -1358,7 +1358,7 @@ int ObMultiVersionMicroBlockMinorMergeRowScanner::init( // minor merge should contain 2 if (OB_FAIL(build_minor_merge_out_cols(*param_, out_cols, expect_multi_version_col_cnt))) { STORAGE_LOG(WARN, "fail to build minor merge out columns", K(ret)); - } else if (OB_FAIL(column_map_.init(*context_->allocator_, + } else if (OB_FAIL(column_map_.init(*context_->stmt_allocator_, param_->schema_version_, param_->rowkey_cnt_, 0, /*store count*/ diff --git a/src/storage/memtable/ob_memtable.cpp b/src/storage/memtable/ob_memtable.cpp index d1a02dc1..94050470 100644 --- a/src/storage/memtable/ob_memtable.cpp +++ b/src/storage/memtable/ob_memtable.cpp @@ -927,7 +927,7 @@ int ObMemtable::get(const storage::ObTableIterParam& param, storage::ObTableAcce const ColumnMap* param_column_map = nullptr; if (nullptr == row.row_val_.cells_) { if (nullptr == - (row.row_val_.cells_ = static_cast<ObObj*>(context.allocator_->alloc(sizeof(ObObj) * out_cols->count())))) { + (row.row_val_.cells_ = static_cast<ObObj*>(context.stmt_allocator_->alloc(sizeof(ObObj) * out_cols->count())))) { ret = OB_ALLOCATE_MEMORY_FAILED; TRANS_LOG(WARN, "Fail to allocate memory, ", K(ret)); } else { @@ -940,11 +940,11 @@ int ObMemtable::get(const storage::ObTableIterParam& param, storage::ObTableAcce TRANS_LOG(WARN, "fail to get column map", K(ret)); } else if (NULL == param_column_map) { void* buf = NULL; - if (NULL == (buf = context.allocator_->alloc(sizeof(ColumnMap)))) { + if (NULL == (buf = context.stmt_allocator_->alloc(sizeof(ColumnMap)))) { ret = OB_ALLOCATE_MEMORY_FAILED; TRANS_LOG(WARN, "Fail to allocate memory, ", K(ret)); } else { - local_map = new (buf) ColumnMap(*context.allocator_); + local_map = new (buf) ColumnMap(*context.stmt_allocator_); if (OB_FAIL(local_map->init(*out_cols))) { TRANS_LOG(WARN, "Fail to build column map, ", K(ret)); }
-
复用handle mgr
ObSSTableRowIterator中使用了block_handle_mgr_和block_index_handle_mgr_来缓存访问到的block_handle和block_index_handle,可以识别出rescan场景并且保持mgr一直有效。
diff --git a/src/storage/ob_i_store.h b/src/storage/ob_i_store.h index 69971590..eb5274c9 100644 --- a/src/storage/ob_i_store.h +++ b/src/storage/ob_i_store.h @@ -785,7 +785,7 @@ public: class ObStoreRowIterator : public ObIStoreRowIterator { public: - ObStoreRowIterator() : type_(0) + ObStoreRowIterator() : type_(0), is_rescan_(false) {} virtual ~ObStoreRowIterator() {} @@ -855,8 +855,14 @@ public: } VIRTUAL_TO_STRING_KV(K_(type)); + virtual void set_rescan_true() + { + is_rescan_ = true; + } + protected: int type_; + int is_rescan_; private: DISALLOW_COPY_AND_ASSIGN(ObStoreRowIterator); diff --git a/src/storage/ob_multiple_scan_merge.cpp b/src/storage/ob_multiple_scan_merge.cpp index 958c335e..130b53e9 100644 --- a/src/storage/ob_multiple_scan_merge.cpp +++ b/src/storage/ob_multiple_scan_merge.cpp @@ -160,6 +160,9 @@ int ObMultipleScanMerge::construct_iters() } STORAGE_LOG(DEBUG, "[PUSHDOWN]", K_(consumer), K(iter->is_base_sstable_iter())); STORAGE_LOG(DEBUG, "add iter for consumer", KPC(table), KPC(access_param_)); + if (is_rescan()) { + iter->set_rescan_true(); + } } } diff --git a/src/storage/ob_sstable_row_iterator.cpp b/src/storage/ob_sstable_row_iterator.cpp index a09c3e30..0fe498ae 100644 --- a/src/storage/ob_sstable_row_iterator.cpp +++ b/src/storage/ob_sstable_row_iterator.cpp @@ -218,6 +218,7 @@ void ObISSTableRowIterator::reset() batch_rows_ = NULL; batch_row_count_ = 0; batch_row_pos_ = 0; + is_rescan_ = false; } void ObISSTableRowIterator::reuse() @@ -428,7 +429,8 @@ ObSSTableRowIterator::ObSSTableRowIterator() io_micro_infos_(), micro_info_iter_(), prefetch_handle_depth_(DEFAULT_PREFETCH_HANDLE_DEPTH), - prefetch_micro_depth_(DEFAULT_PREFETCH_MICRO_DEPTH) + prefetch_micro_depth_(DEFAULT_PREFETCH_MICRO_DEPTH), + hdr_flag_(0) {} ObSSTableRowIterator::~ObSSTableRowIterator() @@ -640,6 +642,7 @@ void ObSSTableRowIterator::reset() storage_file_ = nullptr; prefetch_handle_depth_ = DEFAULT_PREFETCH_HANDLE_DEPTH; prefetch_micro_depth_ = DEFAULT_PREFETCH_MICRO_DEPTH; + hdr_flag_ = 0; } void ObSSTableRowIterator::reuse() @@ -666,8 +669,6 @@ void ObSSTableRowIterator::reuse() cur_range_idx_ = -1; io_micro_infos_.reuse(); micro_info_iter_.reuse(); - block_index_handle_mgr_.reset(); - block_handle_mgr_.reset(); table_store_stat_.reuse(); skip_ctx_.reset(); storage_file_ = nullptr; @@ -1683,6 +1684,29 @@ int ObSSTableRowIterator::init_handle_mgr( const ObTableIterParam& iter_param, ObTableAccessContext& access_ctx, const void* query_range) { int ret = OB_SUCCESS; + if (is_rescan_) { + if (hdr_flag_ == 0) { + block_index_handle_mgr_.reset(); + block_handle_mgr_.reset(); + if (OB_FAIL(block_handle_mgr_.init(true, true, *access_ctx.stmt_allocator_))) { + STORAGE_LOG(WARN, "failed to init block handle mgr", K(ret), K(true), K(true)); + } else if (OB_FAIL(block_index_handle_mgr_.init(true, true, *access_ctx.stmt_allocator_))) { + STORAGE_LOG(WARN, "failed to init block index handle mgr", K(ret), K(true), K(true)); + } + hdr_flag_ = 1; + } + return ret; + } else { + bool is_multi = false; + bool is_ordered = false; + if (!block_handle_mgr_.is_inited() && OB_FAIL(block_handle_mgr_.init(false, true, *access_ctx.stmt_allocator_))) { + STORAGE_LOG(WARN, "failed to init block handle mgr", K(ret), K(is_multi), K(is_ordered)); + } else if (!block_index_handle_mgr_.is_inited() && OB_FAIL(block_index_handle_mgr_.init(false, is_ordered, *access_ctx.stmt_allocator_))) { + STORAGE_LOG(WARN, "failed to init block index handle mgr", K(ret), K(is_multi), K(is_ordered)); + } + return ret; + } + // never execute int64_t range_count = 0; bool is_multi = false; bool is_ordered = false; @@ -1703,9 +1727,9 @@ int ObSSTableRowIterator::init_handle_mgr( range_count >= USE_HANDLE_CACHE_RANGE_COUNT_THRESHOLD); } if (OB_SUCC(ret)) { - if (!block_handle_mgr_.is_inited() && OB_FAIL(block_handle_mgr_.init(false, true, *access_ctx.allocator_))) { - STORAGE_LOG(WARN, "failed to init block handle mgr", K(ret), K(is_multi), K(is_ordered)); - } else if (!block_index_handle_mgr_.is_inited() && OB_FAIL(block_index_handle_mgr_.init(false, is_ordered, *access_ctx.allocator_))) { + if (!block_handle_mgr_.is_inited() && OB_FAIL(block_handle_mgr_.init(false, true, *access_ctx.stmt_allocator_))) { + STORAGE_LOG(WARN, "failed to init block handle mgr", K(ret), K(is_multi), K(is_ordered)); + } else if (!block_index_handle_mgr_.is_inited() && OB_FAIL(block_index_handle_mgr_.init(false, is_ordered, *access_ctx.stmt_allocator_))) { STORAGE_LOG(WARN, "failed to init block index handle mgr", K(ret), K(is_multi), K(is_ordered)); } } diff --git a/src/storage/ob_sstable_row_iterator.h b/src/storage/ob_sstable_row_iterator.h index ebbbfc17..c6223425 100644 --- a/src/storage/ob_sstable_row_iterator.h +++ b/src/storage/ob_sstable_row_iterator.h @@ -426,6 +426,7 @@ private: ObSSTableMicroBlockInfoIterator micro_info_iter_; int64_t prefetch_handle_depth_; int64_t prefetch_micro_depth_; + int hdr_flag_; }; } // namespace storage diff --git a/src/storage/ob_micro_block_handle_mgr.cpp b/src/storage/ob_micro_block_handle_mgr.cpp index 028a2018..bb7f5e00 100644 --- a/src/storage/ob_micro_block_handle_mgr.cpp +++ b/src/storage/ob_micro_block_handle_mgr.cpp @@ -45,6 +45,13 @@ void ObMicroBlockDataHandle::reset() io_handle_.reset(); } +void ObMicroBlockDataHandle::reuse() +{ + block_index_ = -1; + cache_handle_.reset(); + io_handle_.reset(); +} + int ObMicroBlockDataHandle::get_block_data( ObMacroBlockReader& block_reader, ObStorageFile* storage_file, ObMicroBlockData& block_data) { @@ -104,7 +111,6 @@ int ObMicroBlockHandleMgr::get_micro_block_handle(const uint64_t table_id, { int ret = OB_SUCCESS; bool found = false; - micro_block_handle.reset(); if (IS_NOT_INIT) { ret = OB_NOT_INIT; STORAGE_LOG(WARN, "block handle mgr is not inited", K(ret)); @@ -128,6 +134,7 @@ int ObMicroBlockHandleMgr::get_micro_block_handle(const uint64_t table_id, } } if (!found) { + micro_block_handle.reuse(); if (OB_FAIL(ObStorageCacheSuite::get_instance().get_block_cache().get_cache_block( table_id, block_ctx.get_macro_block_id(), file_id, offset, size, micro_block_handle.cache_handle_))) { if (OB_ENTRY_NOT_EXIST != ret) { diff --git a/src/storage/ob_micro_block_handle_mgr.h b/src/storage/ob_micro_block_handle_mgr.h index 37f6d005..1ff90688 100644 --- a/src/storage/ob_micro_block_handle_mgr.h +++ b/src/storage/ob_micro_block_handle_mgr.h @@ -30,6 +30,7 @@ struct ObMicroBlockDataHandle { ObMicroBlockDataHandle(); virtual ~ObMicroBlockDataHandle(); void reset(); + void reuse(); int get_block_data(blocksstable::ObMacroBlockReader& block_reader, blocksstable::ObStorageFile* storage_file, blocksstable::ObMicroBlockData& block_data); TO_STRING_KV( diff --git a/src/storage/ob_micro_block_index_handle_mgr.cpp b/src/storage/ob_micro_block_index_handle_mgr.cpp index 83beb4e0..4e938a81 100644 --- a/src/storage/ob_micro_block_index_handle_mgr.cpp +++ b/src/storage/ob_micro_block_index_handle_mgr.cpp @@ -37,6 +37,13 @@ void ObMicroBlockIndexHandle::reset() io_handle_.reset(); } +void ObMicroBlockIndexHandle::reuse() +{ + block_index_mgr_ = NULL; + cache_handle_.reset(); + io_handle_.reuse(); +} + int ObMicroBlockIndexHandle::search_blocks(const ObStoreRange& range, const bool is_left_border, const bool is_right_border, ObIArray<ObMicroBlockInfo>& infos, const ObIArray<ObRowkeyObjComparer*>* cmp_funcs) { @@ -107,7 +114,6 @@ int ObMicroBlockIndexHandleMgr::get_block_index_handle(const uint64_t table_id, { int ret = OB_SUCCESS; bool found = false; - block_idx_handle.reset(); if (IS_NOT_INIT) { ret = OB_NOT_INIT; STORAGE_LOG(WARN, "index handle mgr is not inited", K(ret)); @@ -127,6 +133,7 @@ int ObMicroBlockIndexHandleMgr::get_block_index_handle(const uint64_t table_id, } } if (!found) { + block_idx_handle.reuse(); if (OB_FAIL(ObStorageCacheSuite::get_instance().get_micro_index_cache().get_cache_block_index( table_id, block_ctx.get_macro_block_id(), file_id, block_idx_handle.cache_handle_))) { if (OB_ENTRY_NOT_EXIST != ret) { diff --git a/src/storage/ob_micro_block_index_handle_mgr.h b/src/storage/ob_micro_block_index_handle_mgr.h index 2aea9dcf..89a19ac0 100644 --- a/src/storage/ob_micro_block_index_handle_mgr.h +++ b/src/storage/ob_micro_block_index_handle_mgr.h @@ -23,6 +23,7 @@ struct ObMicroBlockIndexHandle { ObMicroBlockIndexHandle(); virtual ~ObMicroBlockIndexHandle(); void reset(); + void reuse(); int search_blocks(const common::ObStoreRange& range, const bool is_left_border, const bool is_right_border, common::ObIArray<blocksstable::ObMicroBlockInfo>& infos, const common::ObIArray<ObRowkeyObjComparer*>* cmp_funcs = nullptr);
-
减少冗余的代码 & 逻辑优化(部分内容)
prefetch数据预取逻辑冗余。
int ObSSTableRowIterator::inner_open( const ObTableIterParam& iter_param, ObTableAccessContext& access_ctx, ObITable* table, const void* query_range) { ... else if (OB_FAIL(prefetch())) { STORAGE_LOG(WARN, "Fail to prefetch data, ", K(ret)); } ... } int ObMultipleGetMerge::construct_sstable_iter() { for (int64_t i = 0; OB_SUCC(ret) && i < prefetch_cnt; ++i) { if (OB_FAIL(prefetch())) { STORAGE_LOG(WARN, "fail to prefetch", K(ret)); } } ... }
索引回表时去掉多余的reuse。
void ObTableScanStoreRowIterator::reuse_row_iters() { ... if (NULL != index_merge_) { index_merge_->reuse(); // 每次 rescan 都会进⾏ reuse } ... } void ObIndexMerge::reuse() { // table_iter_.reuse(); index_range_array_cursor_ = 0; } int ObIndexMerge::get_next_row(ObStoreRow*& row) { ...... table_iter_.reuse(); // 在这⾥ reuse if (OB_FAIL(table_iter_.open(rowkeys_))) {
优化refresh table on demand逻辑
diff --git a/src/storage/ob_multiple_merge.cpp b/src/storage/ob_multiple_merge.cpp index 9aa5cb01..be8de75f 100644 --- a/src/storage/ob_multiple_merge.cpp +++ b/src/storage/ob_multiple_merge.cpp @@ -922,7 +922,7 @@ int ObMultipleMerge::prepare_read_tables() } if (OB_SUCC(ret)) { - relocate_cnt_ = access_ctx_->store_ctx_->mem_ctx_->get_relocate_cnt(); +// relocate_cnt_ = access_ctx_->store_ctx_->mem_ctx_->get_relocate_cnt(); if (OB_UNLIKELY(nullptr != row_filter_)) { const ObPartitionKey& pkey = partition_store.get_partition_key(); row_filter_ = tables_handle_.has_split_source_table(pkey) ? row_filter_ : NULL; @@ -987,24 +987,20 @@ int ObMultipleMerge::release_table_ref() int ObMultipleMerge::check_need_refresh_table(bool &need_refresh) { int ret = OB_SUCCESS; - if (OB_UNLIKELY(!inited_)) { - ret = OB_NOT_INIT; - STORAGE_LOG(WARN, "ObMultipleMerge has not been inited", K(ret)); + if (NULL != access_ctx_->store_ctx_->mem_ctx_) { + temp = relocate_cnt_; + relocate_cnt_ = access_ctx_->store_ctx_->mem_ctx_->get_relocate_cnt(); + need_refresh = relocate_cnt_ > temp; } else { - const bool relocated = NULL == access_ctx_->store_ctx_->mem_ctx_ - ? false - : access_ctx_->store_ctx_->mem_ctx_->get_relocate_cnt() > relocate_cnt_; - const bool memtable_retired = tables_handle_.check_store_expire(); - const int64_t relocate_cnt = access_ctx_->store_ctx_->mem_ctx_->get_relocate_cnt(); - need_refresh = relocated || memtable_retired; + need_refresh = tables_handle_.check_store_expire(); + } #ifdef ERRSIM - ret = E(EventTable::EN_FORCE_REFRESH_TABLE) ret; - if (OB_FAIL(ret)) { - ret = OB_SUCCESS; - need_refresh = true; - } -#endif + ret = E(EventTable::EN_FORCE_REFRESH_TABLE) ret; + if (OB_FAIL(ret)) { + ret = OB_SUCCESS; + need_refresh = true; } +#endif return ret; } diff --git a/src/storage/ob_multiple_merge.h b/src/storage/ob_multiple_merge.h index 12f8cdc2..ed227202 100644 --- a/src/storage/ob_multiple_merge.h +++ b/src/storage/ob_multiple_merge.h @@ -164,6 +164,7 @@ class ObMultipleMerge : public ObQueryRowIterator { int64_t range_idx_delta_; ObGetTableParam get_table_param_; int64_t relocate_cnt_; + int64_t temp; ObTableStoreStat table_stat_; bool skip_refresh_table_; bool read_memtable_only_;
正确性验证
修改代码的正确性是通过mysqltest运行测试样例来评定,OceanBase代码量庞大,逻辑复杂,自己做的修改难免会出现一些段错误之类的问题,这时候可以开vscode debug,在运行测试用例出错时就会catch住段错误的位置,方便找到问题的根源。
比如这个iter没有初始化的bug就是这样找出来的,改了代码以后会走到ObMultipleMultiScanMerge,mysqltest的测试样例正好测出了这个bug。
Previous post
Oceanbase中LSM-Tree的分层设计及其优缺点
Next post
OceanBase与TiDB优化器的基数预估准确度和连接顺序选择能力的评测