Skip to content

Commit

Permalink
addressed PR comments
Browse files Browse the repository at this point in the history
Signed-off-by: Alexander Shabanov <[email protected]>
  • Loading branch information
alexander-shabanov committed Dec 24, 2024
1 parent ae57ba2 commit cff89de
Show file tree
Hide file tree
Showing 5 changed files with 100 additions and 113 deletions.
4 changes: 2 additions & 2 deletions src/cluster_slot_stats.c
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,6 @@ void clusterSlotStatsCommand(client *c) {
}

int clusterSlotStatsEnabled(void) {
return server.cluster_slot_stats_enabled && /* Config should be enabled. */
server.cluster_enabled; /* Cluster mode should be enabled. */
return server.cluster_slot_stats_enabled && /* Config should be enabled. */
server.cluster_enabled; /* Cluster mode should be enabled. */
}
2 changes: 1 addition & 1 deletion src/memory_prefetch.c
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ static void prefetchEntry(KeyPrefetchInfo *info) {
moveToNextKey();
/* If reply offload enabled no need to prefetch value because main thread will not access it */
} else if (server.reply_offload_enabled) {
markKeyAsdone(info);
markKeyAsdone(info);
} else {
info->state = PREFETCH_VALUE;
}
Expand Down
133 changes: 59 additions & 74 deletions src/networking.c
Original file line number Diff line number Diff line change
Expand Up @@ -40,20 +40,19 @@
#include <sys/uio.h>
#include <math.h>
#include <ctype.h>
#include <stdbool.h>
#include <stdatomic.h>

typedef enum {
CLIENT_REPLY_PAYLOAD_DATA = 0,
CLIENT_REPLY_PAYLOAD_BULK_OFFLOAD,
} clientReplyPayloadType;
CLIENT_REPLY_PLAIN = 0,
CLIENT_REPLY_BULK_OFFLOAD,
} clientReplyType;

/* Reply payload header */
typedef struct __attribute__((__packed__)) payloadHeader {
size_t len; /* payload length in a reply buffer */
size_t actual_len; /* actual reply length after offload expanding */
uint8_t type; /* one of clientReplyPayloadType */
int16_t slot; /* to report network-bytes-out for offloads */
typedef struct payloadHeader {
size_t len; /* payload length in a reply buffer */
size_t actual_len; /* actual reply length after offload expanding */
uint8_t type; /* one of clientReplyType */
int16_t slot; /* to report network-bytes-out for offloads */

} payloadHeader;

Expand Down Expand Up @@ -141,17 +140,17 @@ static inline int isReplicaReadyForReplData(client *replica) {
* Reply offload can be allowed only for regular Valkey clients
* that use _writeToClient handler to write replies to client connection
*/
static bool isReplyOffloadAllowable(client *c) {
static int isReplyOffloadAllowable(client *c) {
if (c->flag.fake) {
return false;
return 0;
}

switch (getClientType(c)) {
case CLIENT_TYPE_NORMAL:
case CLIENT_TYPE_PUBSUB:
return true;
default:
return false;
case CLIENT_TYPE_NORMAL:
case CLIENT_TYPE_PUBSUB:
return 1;
default:
return 0;
}
}

Expand Down Expand Up @@ -448,7 +447,7 @@ static inline int updatePayloadHeader(payloadHeader *last_header, uint8_t type,

static size_t upsertPayloadHeader(char *buf, size_t *bufpos, payloadHeader **last_header, uint8_t type, size_t len, int slot, size_t available) {
/* Enforce min len for offloads as whole pointers must be written to the buffer */
size_t min_len = (type == CLIENT_REPLY_PAYLOAD_BULK_OFFLOAD ? len : 1);
size_t min_len = (type == CLIENT_REPLY_BULK_OFFLOAD ? len : 1);
if (min_len > available) return 0;
size_t reply_len = min(available, len);

Expand All @@ -460,13 +459,13 @@ static size_t upsertPayloadHeader(char *buf, size_t *bufpos, payloadHeader **las
if (updatePayloadHeader(*last_header, type, reply_len, slot) == C_OK) return reply_len;
}

/* Recheck min len condition and recalcuate allowed len with a new header to be added */
/* Recheck min len condition and recalculate allowed len with a new header to be added */
if (sizeof(payloadHeader) + min_len > available) return 0;
available -= sizeof(payloadHeader);
if (len > available) reply_len = available;

/* Start a new payload chunk */
insertPayloadHeader(buf, bufpos, type, reply_len, slot, last_header);
insertPayloadHeader(buf, bufpos, type, reply_len, slot, last_header);
return reply_len;
}

Expand Down Expand Up @@ -497,11 +496,11 @@ static size_t _addReplyPayloadToBuffer(client *c, const void *payload, size_t le
}

size_t _addReplyToBuffer(client *c, const char *s, size_t len) {
return _addReplyPayloadToBuffer(c, s, len, CLIENT_REPLY_PAYLOAD_DATA);
return _addReplyPayloadToBuffer(c, s, len, CLIENT_REPLY_PLAIN);
}

size_t _addBulkOffloadToBuffer(client *c, robj *obj) {
return _addReplyPayloadToBuffer(c, &obj, sizeof(void*), CLIENT_REPLY_PAYLOAD_BULK_OFFLOAD);
return _addReplyPayloadToBuffer(c, &obj, sizeof(void *), CLIENT_REPLY_BULK_OFFLOAD);
}

/* Adds the reply to the reply linked list.
Expand Down Expand Up @@ -554,11 +553,11 @@ static void _addReplyPayloadToList(client *c, list *reply_list, const char *payl
}

void _addReplyProtoToList(client *c, list *reply_list, const char *s, size_t len) {
_addReplyPayloadToList(c, reply_list, s, len, CLIENT_REPLY_PAYLOAD_DATA);
_addReplyPayloadToList(c, reply_list, s, len, CLIENT_REPLY_PLAIN);
}

void _addBulkOffloadToList(client *c, robj *obj) {
_addReplyPayloadToList(c, c->reply, (char*) &obj, sizeof(void*), CLIENT_REPLY_PAYLOAD_BULK_OFFLOAD);
_addReplyPayloadToList(c, c->reply, (char *)&obj, sizeof(void *), CLIENT_REPLY_BULK_OFFLOAD);
}

/* The subscribe / unsubscribe command family has a push as a reply,
Expand Down Expand Up @@ -970,7 +969,7 @@ void setDeferredReply(client *c, void *node, const char *s, size_t length) {
* - It has enough room already allocated
* - And not too large (avoid large memmove)
* - And the client is not in a pending I/O state */
if (ln->prev != NULL && (prev = listNodeValue(ln->prev)) && prev->size - prev->used > 0 &&
if (ln->prev != NULL && (prev = listNodeValue(ln->prev)) && prev->size > prev->used &&
c->io_write_state != CLIENT_PENDING_IO && !c->flag.reply_offload) {
size_t len_to_copy = prev->size - prev->used;
if (len_to_copy > length) len_to_copy = length;
Expand Down Expand Up @@ -1000,7 +999,7 @@ void setDeferredReply(client *c, void *node, const char *s, size_t length) {
buf->used = 0;
buf->last_header = 0;
if (c->flag.reply_offload) {
upsertPayloadHeader(buf->buf, &buf->used, &buf->last_header, CLIENT_REPLY_PAYLOAD_DATA, length, c->slot, buf->size);
upsertPayloadHeader(buf->buf, &buf->used, &buf->last_header, CLIENT_REPLY_PLAIN, length, c->slot, buf->size);
}
memcpy(buf->buf + buf->used, s, length);
buf->used += length;
Expand Down Expand Up @@ -2210,8 +2209,8 @@ typedef struct replyIOV {
int limit_reached; /* Non zero if either max iov count or NET_MAX_WRITES_PER_EVENT limit
* reached during iov prepearation */
int offload_active;
int prfxcnt; /* prfxcnt, prefixes and clrf are auxiliary fields
* for expanding reply offloads */
int prfxcnt; /* prfxcnt, prefixes and clrf are auxiliary fields
* for expanding reply offloads */
char (*prefixes)[LONG_STR_SIZE + 3];
char *crlf;
} replyIOV;
Expand Down Expand Up @@ -2251,10 +2250,11 @@ static void addPlainBufferToReplyIOV(char *buf, size_t buf_len, replyIOV *reply,
return;
}

/* Aggregate data len from the beginning of the buffer even though
* part of the data should be skipped in this round due to last_written_len */
/* Aggregate data length from the beginning of the buffer even though
* part of the data can be skipped in this writevToClient invocation due to last_written_len */
metadata->data_len += buf_len;

/* Skip data written in the previous writevToClient invocation(s) */
if (reply->last_written_len >= buf_len) {
reply->last_written_len -= buf_len;
return;
Expand All @@ -2268,38 +2268,38 @@ static void addPlainBufferToReplyIOV(char *buf, size_t buf_len, replyIOV *reply,
}

static void addOffloadedBulkToReplyIOV(char *buf, size_t buf_len, replyIOV *reply, bufWriteMetadata *metadata) {
robj **objv = (robj **)buf;
while (buf_len > 0 && !reply->limit_reached) {
robj **obj = (robj **)buf;
char *str = (*obj)->ptr;
size_t str_len = stringObjectLen(*obj);
char *str = (*objv)->ptr;
size_t str_len = stringObjectLen(*objv);

char* prefix = reply->prefixes[reply->prfxcnt];
char *prefix = reply->prefixes[reply->prfxcnt];
prefix[0] = '$';
size_t num_len = ll2string(prefix + 1, sizeof(reply->prefixes[0]) - 3, str_len);
prefix[num_len + 1] = '\r';
prefix[num_len + 2] = '\n';

int cnt = reply->cnt;
addPlainBufferToReplyIOV(reply->prefixes[reply->prfxcnt], num_len + 3, reply, metadata);
/* Increment prfxcnt only if prefix was added to reply in this round */
/* Increment prfxcnt only if prefix was added to reply in this writevToClient invocation */
if (reply->cnt > cnt) reply->prfxcnt++;
addPlainBufferToReplyIOV(str, str_len, reply, metadata);
addPlainBufferToReplyIOV(reply->crlf, 2, reply, metadata);

buf += sizeof(void*);
buf_len -= sizeof(void*);
objv++;
buf_len -= sizeof(void *);
}
}

static void addCompoundBufferToReplyIOV(char *buf, size_t bufpos, replyIOV *reply, bufWriteMetadata *metadata) {
char *ptr = buf;
while (ptr < buf + bufpos && !reply->limit_reached) {
payloadHeader *header = (payloadHeader*)ptr;
payloadHeader *header = (payloadHeader *)ptr;
ptr += sizeof(payloadHeader);
if (header->type == CLIENT_REPLY_PAYLOAD_DATA) {
if (header->type == CLIENT_REPLY_PLAIN) {
addPlainBufferToReplyIOV(ptr, header->len, reply, metadata);
} else {
serverAssert(header->type == CLIENT_REPLY_PAYLOAD_BULK_OFFLOAD);
serverAssert(header->type == CLIENT_REPLY_BULK_OFFLOAD);
uint64_t data_len = metadata->data_len;
addOffloadedBulkToReplyIOV(ptr, header->len, reply, metadata);
/* Store actual reply len for cluster slot stats */
Expand All @@ -2325,6 +2325,19 @@ static void addBufferToReplyIOV(char *buf, size_t bufpos, replyIOV *reply, bufWr
metadata->bufpos = bufpos;
}

/*
* This function calculates and stores on the client next:
* io_last_written_buf - Last buffer that has been written to the client connection
* io_last_written_bufpos - The buffer has been written until this position
* io_last_written_data_len - The actual length of the data written from this buffer
* This length differs from written bufpos in case of reply offload
*
* The io_last_written_buf and io_last_written_bufpos are used by _postWriteToClient
* to detect last client reply buffer that can be released
*
* The io_last_written_data_len is used by writevToClient for resuming write from the point
* where previous writevToClient invocation stopped
**/
static void saveLastWrittenBuf(client *c, bufWriteMetadata *metadata, int bufcnt, size_t totlen, size_t totwritten) {
int last = bufcnt - 1;
if (totwritten == totlen) {
Expand Down Expand Up @@ -2363,35 +2376,7 @@ void proceedToUnwritten(replyIOV *reply, int nwritten) {
* it gathers the scattered buffers from reply list and sends them away with connWritev.
* If we write successfully, it returns C_OK, otherwise, C_ERR is returned.
* Sets the c->nwritten to the number of bytes the server wrote to the client.
* Can be called from the main thread or an I/O thread
*
* INTERNALS
* The writevToClient strives to write all client reply buffers to the client connection.
* However, it may encounter NET_MAX_WRITES_PER_EVENT or IOV_MAX or socket limit. In such case,
* some client reply buffers will be written completely and some partially.
* In next invocation writevToClient should resume from the exact position where it stopped.
* Also writevToClient should communicate to _postWriteToClient which buffers written completely
* and can be released. It is intricate in case of reply offloading as length of reply buffer does not match
* to network bytes out.
*
* For this purpose, writevToClient uses 3 data members on the client struct as input/output paramaters:
* io_last_written_buf - Last buffer that has been written to the client connection
* io_last_written_bufpos - The buffer has been written until this position
* io_last_written_data_len - The actual length of the data written from this buffer
* This length differs from written bufpos in case of reply offload
*
* The writevToClient uses addBufferToReplyIOV, addCompoundBufferToReplyIOV, addOffloadedBulkToReplyIOV, addPlainBufferToReplyIOV
* to build reply iovec array. These functions know to skip io_last_written_data_len, specifically addPlainBufferToReplyIOV
*
* In the end of execution writevToClient calls saveLastWrittenBuf for calculating "last written" buf/pos/data_len
* and storing on the client. While building reply iov, writevToClient gathers auxiliary bufWriteMetadata that
* helps in this calculation. In some cases, It may take several (> 2) invocations for writevToClient to write reply
* from a single buffer but saveLastWrittenBuf knows to calculate "last written" buf/pos/data_len properly
*
* The _postWriteToClient uses io_last_written_buf and io_last_written_bufpos in order to detect completely written buffers
* and release them
*
* */
* Can be called from the main thread or an I/O thread */
static int writevToClient(client *c) {
int iovmax = min(IOV_MAX, c->conn->iovcnt);
struct iovec iov_arr[iovmax];
Expand Down Expand Up @@ -2437,7 +2422,7 @@ static int writevToClient(client *c) {
continue;
}

addBufferToReplyIOV(o->buf, used, &reply, &metadata[bufcnt]);
addBufferToReplyIOV(o->buf, used, &reply, &metadata[bufcnt]);
if (!metadata[bufcnt].data_len) break;
bufcnt++;

Expand Down Expand Up @@ -2539,15 +2524,15 @@ static void releaseBufOffloads(char *buf, size_t bufpos) {
payloadHeader *header = (payloadHeader *)ptr;
ptr += sizeof(payloadHeader);

if (header->type == CLIENT_REPLY_PAYLOAD_BULK_OFFLOAD) {
if (header->type == CLIENT_REPLY_BULK_OFFLOAD) {
clusterSlotStatsAddNetworkBytesOutForSlot(header->slot, header->actual_len);

robj** obj_ptr = (robj**)ptr;
robj **objv = (robj **)ptr;
size_t len = header->len;
while (len > 0) {
decrRefCount(*obj_ptr);
obj_ptr++;
len -= sizeof(obj_ptr);
decrRefCount(*objv);
objv++;
len -= sizeof(void *);
}
}

Expand Down
8 changes: 4 additions & 4 deletions src/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -924,7 +924,7 @@ typedef struct payloadHeader payloadHeader; /* Defined in networking.c */
* which is actually a linked list of blocks like that, that is: client->reply. */
typedef struct clientReplyBlock {
size_t size, used;
payloadHeader* last_header;
payloadHeader *last_header;
char buf[];
} clientReplyBlock;

Expand Down Expand Up @@ -1291,7 +1291,7 @@ typedef struct client {
list *reply; /* List of reply objects to send to the client. */
listNode *io_last_reply_block; /* Last client reply block when sent to IO thread */
size_t io_last_bufpos; /* The client's bufpos at the time it was sent to the IO thread */
char* io_last_written_buf; /* Last buffer that has been written to the client connection */
char *io_last_written_buf; /* Last buffer that has been written to the client connection */
size_t io_last_written_bufpos; /* The buffer has been written until this position */
size_t io_last_written_data_len; /* The actual length of the data written from this buffer
This length differs from written bufpos in case of reply offload */
Expand Down Expand Up @@ -1384,8 +1384,8 @@ typedef struct client {
size_t buf_peak; /* Peak used size of buffer in last 5 sec interval. */
mstime_t buf_peak_last_reset_time; /* keeps the last time the buffer peak value was reset */
size_t bufpos;
payloadHeader* last_header; /* Pointer to the last header in a buffer in reply offload mode */
size_t buf_usable_size; /* Usable size of buffer. */
payloadHeader *last_header; /* Pointer to the last header in a buffer in reply offload mode */
size_t buf_usable_size; /* Usable size of buffer. */
char *buf;
#ifdef LOG_REQ_RES
clientReqResInfo reqres;
Expand Down
Loading

0 comments on commit cff89de

Please sign in to comment.