Skip to content

Commit

Permalink
[fix] add ttl compact metrics && modify sst-age-limit initial value
Browse files Browse the repository at this point in the history
  • Loading branch information
VCgege committed Dec 10, 2024
1 parent 1b50ab3 commit 5ce5c10
Show file tree
Hide file tree
Showing 5 changed files with 66 additions and 11 deletions.
2 changes: 2 additions & 0 deletions src/ctrip_swap.h
Original file line number Diff line number Diff line change
Expand Up @@ -1972,6 +1972,8 @@ typedef struct swapTtlCompactCtx {
swapExpireStatus *expire_stats;
redisAtomic unsigned long long stat_request_compact_times;
redisAtomic unsigned long long stat_request_sst_count;
redisAtomic unsigned long long stat_expired_sst_count;
redisAtomic unsigned long long stat_compacted_data_size;
} swapTtlCompactCtx;

swapTtlCompactCtx *swapTtlCompactCtxNew();
Expand Down
25 changes: 16 additions & 9 deletions src/ctrip_swap_compact.c
Original file line number Diff line number Diff line change
Expand Up @@ -403,14 +403,14 @@ static int getExpiredSstInfo(rocksdb_level_metadata_t* level_meta, long long sst
/*
* sort the sst to get oldest sst range with the continous index(ascending or descending order).
*/
static uint sortExpiredSstInfo(uint *sst_index_arr, uint64_t *sst_age_arr, uint sst_recorded_num, bool *is_ascending_order) {
static uint sortExpiredSstInfo(uint *sst_index_arr, uint64_t *sst_age_arr, uint expired_sst_num, bool *is_ascending_order) {

uint arranged_cursor = 0;

/* pruning bubble algorithm */
/* sort in place */
for (uint i = 0; i < sst_recorded_num - 1; i++) {
for (uint j = sst_recorded_num - 1; j > i; j--) {
for (uint i = 0; i < expired_sst_num - 1; i++) {
for (uint j = expired_sst_num - 1; j > i; j--) {

if (sst_age_arr[j] > sst_age_arr[j - 1]) {
uint64_t tmp_exist_time;
Expand Down Expand Up @@ -521,13 +521,15 @@ void genServerTtlCompactTask(void *result, void *pd, int errcode) {
uint64_t *sst_age_arr = zmalloc(sizeof(uint64_t) * highest_level_sst_num);
memset(sst_age_arr, 0, sizeof(uint64_t) * highest_level_sst_num);

uint sst_recorded_num = getExpiredSstInfo(level_meta, sst_age_limit, sst_index_arr, sst_age_arr);
if (sst_recorded_num == 0) {
uint expired_sst_num = getExpiredSstInfo(level_meta, sst_age_limit, sst_index_arr, sst_age_arr);
if (expired_sst_num == 0) {
goto end;
}

atomicSet(server.swap_ttl_compact_ctx->stat_expired_sst_count, expired_sst_num);

bool is_ascending_order = true; /* record the order of sst index of compact range. */
uint arranged_cursor = sortExpiredSstInfo(sst_index_arr, sst_age_arr, sst_recorded_num, &is_ascending_order);
uint arranged_cursor = sortExpiredSstInfo(sst_index_arr, sst_age_arr, expired_sst_num, &is_ascending_order);

if (server.swap_ttl_compact_ctx->task != NULL) {
compactTaskFree(server.swap_ttl_compact_ctx->task);
Expand All @@ -552,7 +554,7 @@ swapExpireStatus *swapExpireStatusNew() {
stats->expire_wt = wtdigestCreate(WTD_DEFAULT_NUM_BUCKETS, getServerMstime);
wtdigestSetWindow(stats->expire_wt, SWAP_TTL_COMPACT_DEFAULT_EXPIRE_WT_WINDOW);

stats->sst_age_limit = 0;
stats->sst_age_limit = SWAP_TTL_COMPACT_INVALID_EXPIRE;
return stats;
}

Expand All @@ -565,7 +567,7 @@ void swapExpireStatusFree(swapExpireStatus *stats) {

void swapExpireStatusReset(swapExpireStatus *stats) {
wtdigestReset(stats->expire_wt);
stats->sst_age_limit = 0;
stats->sst_age_limit = SWAP_TTL_COMPACT_INVALID_EXPIRE;
}

swapTtlCompactCtx *swapTtlCompactCtxNew() {
Expand All @@ -575,6 +577,8 @@ swapTtlCompactCtx *swapTtlCompactCtxNew() {
ctx->expire_stats = swapExpireStatusNew();
ctx->stat_request_compact_times = 0;
ctx->stat_request_sst_count = 0;
ctx->stat_expired_sst_count = 0;
ctx->stat_compacted_data_size = 0;
return ctx;
}

Expand Down Expand Up @@ -626,9 +630,12 @@ void cfMetasFree(cfMetas *metas) {
sds genSwapTtlCompactInfoString(sds info) {
info = sdscatprintf(info,
"swap_ttl_compact:times=%llu,request_sst_count=%llu,"
"expired_sst_count=%llu,compacted_data_size=%llu,"
"sst_age_limit=%lld\r\n",
server.swap_ttl_compact_ctx->stat_request_compact_times,
server.swap_ttl_compact_ctx->stat_request_sst_count,
server.swap_ttl_compact_ctx->stat_expired_sst_count,
server.swap_ttl_compact_ctx->stat_compacted_data_size,
server.swap_ttl_compact_ctx->expire_stats->sst_age_limit);
return info;
}
Expand Down Expand Up @@ -1096,7 +1103,7 @@ int swapFilterTest(int argc, char **argv, int accurate) {
swapTtlCompactCtxReset(server.swap_ttl_compact_ctx);

test_assert(server.swap_ttl_compact_ctx->task == NULL);
test_assert(server.swap_ttl_compact_ctx->expire_stats->sst_age_limit == 0);
test_assert(server.swap_ttl_compact_ctx->expire_stats->sst_age_limit == SWAP_TTL_COMPACT_INVALID_EXPIRE);
test_assert(wtdigestSize(server.swap_ttl_compact_ctx->expire_stats->expire_wt) == 0);

swapTtlCompactCtxFree(server.swap_ttl_compact_ctx);
Expand Down
12 changes: 10 additions & 2 deletions src/ctrip_swap_exec.c
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,9 @@ void swapRequestExecuteUtil_CompactRange(swapRequest *req) {
char dir[ROCKS_DIR_MAX_LEN];
rocks *rocks = serverRocksGetReadLock();
snprintf(dir, ROCKS_DIR_MAX_LEN, "%s/%d", ROCKS_DATA, rocks->rocksdb_epoch);
serverLog(LL_WARNING, "[rocksdb compact range before] dir(%s) size(%ld)", dir, get_dir_size(dir));

long size_before = get_dir_size(dir);
serverLog(LL_WARNING, "[rocksdb compact range before] dir(%s) size(%ld)", dir, size_before);

rocksdbUtilTaskCtx *utilctx = req->finish_pd;
compactTask *task = (compactTask*)utilctx->argument;
Expand All @@ -85,7 +87,13 @@ void swapRequestExecuteUtil_CompactRange(swapRequest *req) {
task->key_range[i]->start_key, task->key_range[i]->start_key_size, task->key_range[i]->end_key,
task->key_range[i]->end_key_size);
}
serverLog(LL_WARNING, "[rocksdb compact range after] dir(%s) size(%ld)", dir, get_dir_size(dir));

long size_after = get_dir_size(dir);
serverLog(LL_WARNING, "[rocksdb compact range after] dir(%s) size(%ld)", dir, size_after);

if (server.swap_ttl_compact_ctx && task->compact_type == TYPE_TTL_COMPACT) {
atomicIncr(server.swap_ttl_compact_ctx->stat_compacted_data_size, size_after - size_before);
}
serverRocksUnlock(rocks);
}

Expand Down
2 changes: 2 additions & 0 deletions src/db.c
Original file line number Diff line number Diff line change
Expand Up @@ -496,6 +496,8 @@ long long emptyDb(int dbnum, int flags, void(callback)(void*)) {

if (server.swap_ttl_compact_ctx) {
swapTtlCompactCtxReset(server.swap_ttl_compact_ctx);
/* after flushdb, all sst in data CF is supposed to be deleted. */
server.swap_ttl_compact_ctx->expire_stats->sst_age_limit = 0;
}

/* Flush slots to keys map if enable cluster, we can flush entire
Expand Down
36 changes: 36 additions & 0 deletions tests/swap/unit/ttl_compact.tcl
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,12 @@ start_server {tags {"ttl compact 0"}

set sst_age_limit [get_info_property r Swap swap_ttl_compact sst_age_limit]
assert_equal $sst_age_limit 0

set expired_sst_count [get_info_property r Swap swap_ttl_compact expired_sst_count]
assert_equal $expired_sst_count 0

set compacted_data_size [get_info_property r Swap swap_ttl_compact compacted_data_size]
assert_equal $compacted_data_size 0
}
}

Expand Down Expand Up @@ -52,6 +58,12 @@ start_server {tags {"ttl compact 1"}

set compact_times [get_info_property r Swap swap_ttl_compact times]
assert_equal $compact_times 0

set expired_sst_count [get_info_property r Swap swap_ttl_compact expired_sst_count]
assert_equal $expired_sst_count 0

set compacted_data_size [get_info_property r Swap swap_ttl_compact compacted_data_size]
assert_equal $compacted_data_size 0
}
}

Expand Down Expand Up @@ -86,6 +98,12 @@ start_server {tags {"ttl compact 2"}

set sst_age_limit [get_info_property r Swap swap_ttl_compact sst_age_limit]
assert_range $sst_age_limit 900000 1000000

set expired_sst_count [get_info_property r Swap swap_ttl_compact expired_sst_count]
assert_equal $expired_sst_count 0

set compacted_data_size [get_info_property r Swap swap_ttl_compact compacted_data_size]
assert_equal $compacted_data_size 0
}
}

Expand Down Expand Up @@ -122,6 +140,12 @@ start_server {tags {"ttl compact 3"}
set request_sst_count [get_info_property r Swap swap_ttl_compact request_sst_count]
assert_range $request_sst_count 0 2

set expired_sst_count [get_info_property r Swap swap_ttl_compact expired_sst_count]
assert_range $expired_sst_count 0 2

set compacted_data_size [get_info_property r Swap swap_ttl_compact compacted_data_size]
assert_morethan_equal $compacted_data_size 0

# set new keys again, check info
for {set j 100} { $j < 200} {incr j} {
set mybitmap "mybitmap-$j"
Expand All @@ -147,6 +171,12 @@ start_server {tags {"ttl compact 3"}

set compact_times [get_info_property r Swap swap_ttl_compact times]
assert_range $compact_times 0 3

set expired_sst_count [get_info_property r Swap swap_ttl_compact expired_sst_count]
assert_range $expired_sst_count 1 3

set compacted_data_size [get_info_property r Swap swap_ttl_compact compacted_data_size]
assert_morethan_equal $compacted_data_size 0
}
}

Expand Down Expand Up @@ -223,6 +253,12 @@ start_server {tags {"ttl compact 5"}

set compact_times [get_info_property r Swap swap_ttl_compact times]
assert_equal $compact_times 0

set expired_sst_count [get_info_property r Swap swap_ttl_compact expired_sst_count]
assert_equal $expired_sst_count 0

set compacted_data_size [get_info_property r Swap swap_ttl_compact compacted_data_size]
assert_equal $compacted_data_size 0
}
}

Expand Down

0 comments on commit 5ce5c10

Please sign in to comment.