Skip to content

Commit

Permalink
Add scoped RDB loading context and immediate abort flag (#1173)
Browse files Browse the repository at this point in the history
This PR introduces a new mechanism for temporarily changing the
server's loading_rio context during RDB loading operations. The new
`RDB_SCOPED_LOADING_RIO` macro allows for a scoped change of the
`server.loading_rio` value, ensuring that it's automatically restored
to its original value when the scope ends.

Introduces a dedicated flag to `rio` to signal immediate abort,
preventing
potential use-after-free scenarios during replication disconnection in 
dual-channel load. This ensures proper termination of
`rdbLoadRioWithLoadingCtx`
when replication is cancelled due to connection loss on main connection.

Fixes #1152

---------

Signed-off-by: naglera <[email protected]>
Signed-off-by: Madelyn Olson <[email protected]>
Signed-off-by: Amit Nagler <[email protected]>
Co-authored-by: Madelyn Olson <[email protected]>
Co-authored-by: ranshid <[email protected]>
  • Loading branch information
3 people authored Dec 24, 2024
1 parent f1b7f30 commit 9f4503c
Show file tree
Hide file tree
Showing 7 changed files with 95 additions and 17 deletions.
15 changes: 14 additions & 1 deletion src/rdb.c
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ char *rdbFileBeingLoaded = NULL; /* used for rdb checking on read error */
extern int rdbCheckMode;
void rdbCheckError(const char *fmt, ...);
void rdbCheckSetError(const char *fmt, ...);
int rdbLoadRioWithLoadingCtx(rio *rdb, int rdbflags, rdbSaveInfo *rsi, rdbLoadingCtx *rdb_loading_ctx);

#ifdef __GNUC__
void rdbReportError(int corruption_error, int linenum, char *reason, ...) __attribute__((format(printf, 3, 4)));
Expand Down Expand Up @@ -2991,7 +2992,19 @@ int rdbFunctionLoad(rio *rdb, int ver, functionsLibCtx *lib_ctx, int rdbflags, s
int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) {
functionsLibCtx *functions_lib_ctx = functionsLibCtxGetCurrent();
rdbLoadingCtx loading_ctx = {.dbarray = server.db, .functions_lib_ctx = functions_lib_ctx};
int retval = rdbLoadRioWithLoadingCtx(rdb, rdbflags, rsi, &loading_ctx);
int retval = rdbLoadRioWithLoadingCtxScopedRdb(rdb, rdbflags, rsi, &loading_ctx);
return retval;
}

/* Wrapper for rdbLoadRioWithLoadingCtx that manages a scoped RDB context.
* This method wraps the rdbLoadRioWithLoadingCtx function, providing temporary
* RDB context management. It sets a new current loading RDB, calls the wrapped
* function, and then restores the previous loading RDB context. */
int rdbLoadRioWithLoadingCtxScopedRdb(rio *rdb, int rdbflags, rdbSaveInfo *rsi, rdbLoadingCtx *rdb_loading_ctx) {
rio *prev_rio = server.loading_rio;
server.loading_rio = rdb;
int retval = rdbLoadRioWithLoadingCtx(rdb, rdbflags, rsi, rdb_loading_ctx);
server.loading_rio = prev_rio;
return retval;
}

Expand Down
2 changes: 1 addition & 1 deletion src/rdb.h
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ int rdbLoadBinaryDoubleValue(rio *rdb, double *val);
int rdbSaveBinaryFloatValue(rio *rdb, float val);
int rdbLoadBinaryFloatValue(rio *rdb, float *val);
int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi);
int rdbLoadRioWithLoadingCtx(rio *rdb, int rdbflags, rdbSaveInfo *rsi, rdbLoadingCtx *rdb_loading_ctx);
int rdbLoadRioWithLoadingCtxScopedRdb(rio *rdb, int rdbflags, rdbSaveInfo *rsi, rdbLoadingCtx *rdb_loading_ctx);
int rdbFunctionLoad(rio *rdb, int ver, functionsLibCtx *lib_ctx, int rdbflags, sds *err);
int rdbSaveRio(int req, rio *rdb, int *error, int rdbflags, rdbSaveInfo *rsi);
ssize_t rdbSaveFunctions(rio *rdb);
Expand Down
15 changes: 6 additions & 9 deletions src/replication.c
Original file line number Diff line number Diff line change
Expand Up @@ -2254,7 +2254,7 @@ void readSyncBulkPayload(connection *conn) {

int loadingFailed = 0;
rdbLoadingCtx loadingCtx = {.dbarray = dbarray, .functions_lib_ctx = functions_lib_ctx};
if (rdbLoadRioWithLoadingCtx(&rdb, RDBFLAGS_REPLICATION, &rsi, &loadingCtx) != C_OK) {
if (rdbLoadRioWithLoadingCtxScopedRdb(&rdb, RDBFLAGS_REPLICATION, &rsi, &loadingCtx) != C_OK) {
/* RDB loading failed. */
serverLog(LL_WARNING, "Failed trying to load the PRIMARY synchronization DB "
"from socket, check server logs.");
Expand Down Expand Up @@ -2831,18 +2831,15 @@ typedef struct replDataBufBlock {
* Reads replication data from primary into specified repl buffer block */
int readIntoReplDataBlock(connection *conn, replDataBufBlock *data_block, size_t read) {
int nread = connRead(conn, data_block->buf + data_block->used, read);
if (nread == -1) {
if (connGetState(conn) != CONN_STATE_CONNECTED) {
dualChannelServerLog(LL_NOTICE, "Error reading from primary: %s", connGetLastError(conn));
if (nread <= 0) {
if (nread == 0 || connGetState(conn) != CONN_STATE_CONNECTED) {
dualChannelServerLog(LL_WARNING, "Provisional primary closed connection");
/* Signal ongoing RDB load to terminate gracefully */
if (server.loading_rio) rioCloseASAP(server.loading_rio);
cancelReplicationHandshake(1);
}
return C_ERR;
}
if (nread == 0) {
dualChannelServerLog(LL_VERBOSE, "Provisional primary closed connection");
cancelReplicationHandshake(1);
return C_ERR;
}
data_block->used += nread;
server.stat_total_reads_processed++;
return read - nread;
Expand Down
16 changes: 13 additions & 3 deletions src/rio.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@

#define RIO_FLAG_READ_ERROR (1 << 0)
#define RIO_FLAG_WRITE_ERROR (1 << 1)
#define RIO_FLAG_CLOSE_ASAP (1 << 2) /* Rio was closed asynchronously during the current rio operation. */

#define RIO_TYPE_FILE (1 << 0)
#define RIO_TYPE_BUFFER (1 << 1)
Expand Down Expand Up @@ -115,7 +116,7 @@ typedef struct _rio rio;
* if needed. */

static inline size_t rioWrite(rio *r, const void *buf, size_t len) {
if (r->flags & RIO_FLAG_WRITE_ERROR) return 0;
if (r->flags & RIO_FLAG_WRITE_ERROR || r->flags & RIO_FLAG_CLOSE_ASAP) return 0;
while (len) {
size_t bytes_to_write =
(r->max_processing_chunk && r->max_processing_chunk < len) ? r->max_processing_chunk : len;
Expand All @@ -132,7 +133,7 @@ static inline size_t rioWrite(rio *r, const void *buf, size_t len) {
}

static inline size_t rioRead(rio *r, void *buf, size_t len) {
if (r->flags & RIO_FLAG_READ_ERROR) return 0;
if (r->flags & RIO_FLAG_READ_ERROR || r->flags & RIO_FLAG_CLOSE_ASAP) return 0;
while (len) {
size_t bytes_to_read =
(r->max_processing_chunk && r->max_processing_chunk < len) ? r->max_processing_chunk : len;
Expand All @@ -156,6 +157,10 @@ static inline int rioFlush(rio *r) {
return r->flush(r);
}

static inline void rioCloseASAP(rio *r) {
r->flags |= RIO_FLAG_CLOSE_ASAP;
}

/* This function allows to know if there was a read error in any past
* operation, since the rio stream was created or since the last call
* to rioClearError(). */
Expand All @@ -168,8 +173,13 @@ static inline int rioGetWriteError(rio *r) {
return (r->flags & RIO_FLAG_WRITE_ERROR) != 0;
}

/* Like rioGetReadError() but for async close errors. */
static inline int rioGetAsyncCloseError(rio *r) {
return (r->flags & RIO_FLAG_CLOSE_ASAP) != 0;
}

static inline void rioClearErrors(rio *r) {
r->flags &= ~(RIO_FLAG_READ_ERROR | RIO_FLAG_WRITE_ERROR);
r->flags &= ~(RIO_FLAG_READ_ERROR | RIO_FLAG_WRITE_ERROR | RIO_FLAG_CLOSE_ASAP);
}

void rioInitWithFile(rio *r, FILE *fp);
Expand Down
1 change: 1 addition & 0 deletions src/server.c
Original file line number Diff line number Diff line change
Expand Up @@ -2218,6 +2218,7 @@ void initServerConfig(void) {
server.fsynced_reploff_pending = 0;
server.rdb_client_id = -1;
server.loading_process_events_interval_ms = LOADING_PROCESS_EVENTS_INTERVAL_DEFAULT;
server.loading_rio = NULL;

/* Replication partial resync backlog */
server.repl_backlog = NULL;
Expand Down
1 change: 1 addition & 0 deletions src/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -2088,6 +2088,7 @@ struct valkeyServer {
int dbid;
} repl_provisional_primary;
client *cached_primary; /* Cached primary to be reused for PSYNC. */
rio *loading_rio; /* Pointer to the rio object currently used for loading data. */
int repl_syncio_timeout; /* Timeout for synchronous I/O calls */
int repl_state; /* Replication status if the instance is a replica */
int repl_rdb_channel_state; /* State of the replica's rdb channel during dual-channel-replication */
Expand Down
62 changes: 59 additions & 3 deletions tests/integration/dual-channel-replication.tcl
Original file line number Diff line number Diff line change
Expand Up @@ -1158,8 +1158,8 @@ start_server {tags {"dual-channel-replication external:skip"}} {
$primary config set repl-diskless-sync-delay 0

# Generating RDB will cost 100 sec to generate
$primary debug populate 10000 primary 1
$primary config set rdb-key-save-delay 10000
$primary debug populate 100000 primary 1
$primary config set rdb-key-save-delay 1000

start_server {} {
set replica [srv 0 client]
Expand Down Expand Up @@ -1222,7 +1222,7 @@ start_server {tags {"dual-channel-replication external:skip"}} {
fail "replica didn't start sync session in time"
}
$primary debug log "killing replica main connection"
set replica_main_conn_id [get_client_id_by_last_cmd $primary "sync"]
set replica_main_conn_id [get_client_id_by_last_cmd $primary "psync"]
assert {$replica_main_conn_id != ""}
set loglines [count_log_lines -1]
$primary config set repl-diskless-sync-delay 5; # allow catch failed sync before retry
Expand All @@ -1247,3 +1247,59 @@ start_server {tags {"dual-channel-replication external:skip"}} {
stop_write_load $load_handle
}
}


start_server {tags {"dual-channel-replication external:skip"}} {
set primary [srv 0 client]
set primary_host [srv 0 host]
set primary_port [srv 0 port]

$primary config set repl-diskless-sync yes
$primary config set dual-channel-replication-enabled yes
$primary config set repl-diskless-sync-delay 5; # allow catch failed sync before retry

# Generating RDB will take 100 sec to generate
$primary debug populate 1000000 primary 1
$primary config set rdb-key-save-delay -10

start_server {} {
set replica [srv 0 client]
set replica_host [srv 0 host]
set replica_port [srv 0 port]
set replica_log [srv 0 stdout]

$replica config set dual-channel-replication-enabled yes
$replica config set loglevel debug
$replica config set repl-timeout 10
$replica config set repl-diskless-load flush-before-load

test "Replica notice main-connection killed during rdb load callback" {; # https://github.com/valkey-io/valkey/issues/1152
set loglines [count_log_lines 0]
$replica replicaof $primary_host $primary_port
# Wait for sync session to start
wait_for_condition 500 1000 {
[string match "*slave*,state=wait_bgsave*,type=rdb-channel*" [$primary info replication]] &&
[string match "*slave*,state=bg_transfer*,type=main-channel*" [$primary info replication]] &&
[s -1 rdb_bgsave_in_progress] eq 1
} else {
fail "replica didn't start sync session in time"
}
wait_for_log_messages 0 {"*Loading RDB produced by Valkey version*"} $loglines 1000 10
$primary set key val
set replica_main_conn_id [get_client_id_by_last_cmd $primary "psync"]
$primary debug log "killing replica main connection $replica_main_conn_id"
assert {$replica_main_conn_id != ""}
set loglines [count_log_lines 0]
$primary config set rdb-key-save-delay 0; # disable delay to allow next sync to succeed
$primary client kill id $replica_main_conn_id
# Wait for primary to abort the sync
wait_for_condition 50 1000 {
[string match {*replicas_waiting_psync:0*} [$primary info replication]]
} else {
fail "Primary did not free repl buf block after sync failure"
}
wait_for_log_messages 0 {"*Failed trying to load the PRIMARY synchronization DB from socket*"} $loglines 1000 10
verify_replica_online $primary 0 500
}
}
}

0 comments on commit 9f4503c

Please sign in to comment.