diff --git a/src/networking.c b/src/networking.c index 0cc61ff9d2..bcea051bc2 100644 --- a/src/networking.c +++ b/src/networking.c @@ -40,19 +40,18 @@ #include #include #include -#include #include typedef enum { - CLIENT_REPLY_PAYLOAD_DATA = 0, - CLIENT_REPLY_PAYLOAD_BULK_OFFLOAD, -} clientReplyPayloadType; + CLIENT_REPLY_PLAIN = 0, + CLIENT_REPLY_BULK_OFFLOAD, +} clientReplyType; /* Reply payload header */ -typedef struct __attribute__((__packed__)) payloadHeader { +typedef struct payloadHeader { size_t len; /* payload length in a reply buffer */ size_t actual_len; /* actual reply length after offload expanding */ - uint8_t type; /* one of clientReplyPayloadType */ + uint8_t type; /* one of clientReplyType */ int16_t slot; /* to report network-bytes-out for offloads */ } payloadHeader; @@ -141,17 +140,17 @@ static inline int isReplicaReadyForReplData(client *replica) { * Reply offload can be allowed only for regular Valkey clients * that use _writeToClient handler to write replies to client connection */ -static bool isReplyOffloadAllowable(client *c) { +static int isReplyOffloadAllowable(client *c) { if (c->flag.fake) { - return false; + return 0; } switch (getClientType(c)) { case CLIENT_TYPE_NORMAL: case CLIENT_TYPE_PUBSUB: - return true; + return 1; default: - return false; + return 0; } } @@ -448,7 +447,7 @@ static inline int updatePayloadHeader(payloadHeader *last_header, uint8_t type, static size_t upsertPayloadHeader(char *buf, size_t *bufpos, payloadHeader **last_header, uint8_t type, size_t len, int slot, size_t available) { /* Enforce min len for offloads as whole pointers must be written to the buffer */ - size_t min_len = (type == CLIENT_REPLY_PAYLOAD_BULK_OFFLOAD ? len : 1); + size_t min_len = (type == CLIENT_REPLY_BULK_OFFLOAD ? len : 1); if (min_len > available) return 0; size_t reply_len = min(available, len); @@ -460,7 +459,7 @@ static size_t upsertPayloadHeader(char *buf, size_t *bufpos, payloadHeader **las if (updatePayloadHeader(*last_header, type, reply_len, slot) == C_OK) return reply_len; } - /* Recheck min len condition and recalcuate allowed len with a new header to be added */ + /* Recheck min len condition and recalculate allowed len with a new header to be added */ if (sizeof(payloadHeader) + min_len > available) return 0; available -= sizeof(payloadHeader); if (len > available) reply_len = available; @@ -497,11 +496,11 @@ static size_t _addReplyPayloadToBuffer(client *c, const void *payload, size_t le } size_t _addReplyToBuffer(client *c, const char *s, size_t len) { - return _addReplyPayloadToBuffer(c, s, len, CLIENT_REPLY_PAYLOAD_DATA); + return _addReplyPayloadToBuffer(c, s, len, CLIENT_REPLY_PLAIN); } size_t _addBulkOffloadToBuffer(client *c, robj *obj) { - return _addReplyPayloadToBuffer(c, &obj, sizeof(void*), CLIENT_REPLY_PAYLOAD_BULK_OFFLOAD); + return _addReplyPayloadToBuffer(c, &obj, sizeof(void*), CLIENT_REPLY_BULK_OFFLOAD); } /* Adds the reply to the reply linked list. @@ -554,11 +553,11 @@ static void _addReplyPayloadToList(client *c, list *reply_list, const char *payl } void _addReplyProtoToList(client *c, list *reply_list, const char *s, size_t len) { - _addReplyPayloadToList(c, reply_list, s, len, CLIENT_REPLY_PAYLOAD_DATA); + _addReplyPayloadToList(c, reply_list, s, len, CLIENT_REPLY_PLAIN); } void _addBulkOffloadToList(client *c, robj *obj) { - _addReplyPayloadToList(c, c->reply, (char*) &obj, sizeof(void*), CLIENT_REPLY_PAYLOAD_BULK_OFFLOAD); + _addReplyPayloadToList(c, c->reply, (char*) &obj, sizeof(void*), CLIENT_REPLY_BULK_OFFLOAD); } /* The subscribe / unsubscribe command family has a push as a reply, @@ -1000,7 +999,7 @@ void setDeferredReply(client *c, void *node, const char *s, size_t length) { buf->used = 0; buf->last_header = 0; if (c->flag.reply_offload) { - upsertPayloadHeader(buf->buf, &buf->used, &buf->last_header, CLIENT_REPLY_PAYLOAD_DATA, length, c->slot, buf->size); + upsertPayloadHeader(buf->buf, &buf->used, &buf->last_header, CLIENT_REPLY_PLAIN, length, c->slot, buf->size); } memcpy(buf->buf + buf->used, s, length); buf->used += length; @@ -2251,10 +2250,11 @@ static void addPlainBufferToReplyIOV(char *buf, size_t buf_len, replyIOV *reply, return; } - /* Aggregate data len from the beginning of the buffer even though - * part of the data should be skipped in this round due to last_written_len */ + /* Aggregate data length from the beginning of the buffer even though + * part of the data can be skipped in this writevToClient invocation due to last_written_len */ metadata->data_len += buf_len; + /* Skip data written in the previous writevToClient invocation(s) */ if (reply->last_written_len >= buf_len) { reply->last_written_len -= buf_len; return; @@ -2268,10 +2268,10 @@ static void addPlainBufferToReplyIOV(char *buf, size_t buf_len, replyIOV *reply, } static void addOffloadedBulkToReplyIOV(char *buf, size_t buf_len, replyIOV *reply, bufWriteMetadata *metadata) { + robj **objv = (robj **)buf; while (buf_len > 0 && !reply->limit_reached) { - robj **obj = (robj **)buf; - char *str = (*obj)->ptr; - size_t str_len = stringObjectLen(*obj); + char *str = (*objv)->ptr; + size_t str_len = stringObjectLen(*objv); char* prefix = reply->prefixes[reply->prfxcnt]; prefix[0] = '$'; @@ -2281,12 +2281,12 @@ static void addOffloadedBulkToReplyIOV(char *buf, size_t buf_len, replyIOV *repl int cnt = reply->cnt; addPlainBufferToReplyIOV(reply->prefixes[reply->prfxcnt], num_len + 3, reply, metadata); - /* Increment prfxcnt only if prefix was added to reply in this round */ + /* Increment prfxcnt only if prefix was added to reply in this writevToClient invocation */ if (reply->cnt > cnt) reply->prfxcnt++; addPlainBufferToReplyIOV(str, str_len, reply, metadata); addPlainBufferToReplyIOV(reply->crlf, 2, reply, metadata); - buf += sizeof(void*); + objv++; buf_len -= sizeof(void*); } } @@ -2296,10 +2296,10 @@ static void addCompoundBufferToReplyIOV(char *buf, size_t bufpos, replyIOV *repl while (ptr < buf + bufpos && !reply->limit_reached) { payloadHeader *header = (payloadHeader*)ptr; ptr += sizeof(payloadHeader); - if (header->type == CLIENT_REPLY_PAYLOAD_DATA) { + if (header->type == CLIENT_REPLY_PLAIN) { addPlainBufferToReplyIOV(ptr, header->len, reply, metadata); } else { - serverAssert(header->type == CLIENT_REPLY_PAYLOAD_BULK_OFFLOAD); + serverAssert(header->type == CLIENT_REPLY_BULK_OFFLOAD); uint64_t data_len = metadata->data_len; addOffloadedBulkToReplyIOV(ptr, header->len, reply, metadata); /* Store actual reply len for cluster slot stats */ @@ -2325,6 +2325,19 @@ static void addBufferToReplyIOV(char *buf, size_t bufpos, replyIOV *reply, bufWr metadata->bufpos = bufpos; } +/* + * This function calculates and stores on the client next: + * io_last_written_buf - Last buffer that has been written to the client connection + * io_last_written_bufpos - The buffer has been written until this position + * io_last_written_data_len - The actual length of the data written from this buffer + * This length differs from written bufpos in case of reply offload + * + * The io_last_written_buf and io_last_written_bufpos are used by _postWriteToClient + * to detect last client reply buffer that can be released + * + * The io_last_written_data_len is used by writevToClient for resuming write from the point + * where previous writevToClient invocation stopped + **/ static void saveLastWrittenBuf(client *c, bufWriteMetadata *metadata, int bufcnt, size_t totlen, size_t totwritten) { int last = bufcnt - 1; if (totwritten == totlen) { @@ -2363,35 +2376,7 @@ void proceedToUnwritten(replyIOV *reply, int nwritten) { * it gathers the scattered buffers from reply list and sends them away with connWritev. * If we write successfully, it returns C_OK, otherwise, C_ERR is returned. * Sets the c->nwritten to the number of bytes the server wrote to the client. - * Can be called from the main thread or an I/O thread - * - * INTERNALS - * The writevToClient strives to write all client reply buffers to the client connection. - * However, it may encounter NET_MAX_WRITES_PER_EVENT or IOV_MAX or socket limit. In such case, - * some client reply buffers will be written completely and some partially. - * In next invocation writevToClient should resume from the exact position where it stopped. - * Also writevToClient should communicate to _postWriteToClient which buffers written completely - * and can be released. It is intricate in case of reply offloading as length of reply buffer does not match - * to network bytes out. - * - * For this purpose, writevToClient uses 3 data members on the client struct as input/output paramaters: - * io_last_written_buf - Last buffer that has been written to the client connection - * io_last_written_bufpos - The buffer has been written until this position - * io_last_written_data_len - The actual length of the data written from this buffer - * This length differs from written bufpos in case of reply offload - * - * The writevToClient uses addBufferToReplyIOV, addCompoundBufferToReplyIOV, addOffloadedBulkToReplyIOV, addPlainBufferToReplyIOV - * to build reply iovec array. These functions know to skip io_last_written_data_len, specifically addPlainBufferToReplyIOV - * - * In the end of execution writevToClient calls saveLastWrittenBuf for calculating "last written" buf/pos/data_len - * and storing on the client. While building reply iov, writevToClient gathers auxiliary bufWriteMetadata that - * helps in this calculation. In some cases, It may take several (> 2) invocations for writevToClient to write reply - * from a single buffer but saveLastWrittenBuf knows to calculate "last written" buf/pos/data_len properly - * - * The _postWriteToClient uses io_last_written_buf and io_last_written_bufpos in order to detect completely written buffers - * and release them - * - * */ + * Can be called from the main thread or an I/O thread */ static int writevToClient(client *c) { int iovmax = min(IOV_MAX, c->conn->iovcnt); struct iovec iov_arr[iovmax]; @@ -2539,15 +2524,15 @@ static void releaseBufOffloads(char *buf, size_t bufpos) { payloadHeader *header = (payloadHeader *)ptr; ptr += sizeof(payloadHeader); - if (header->type == CLIENT_REPLY_PAYLOAD_BULK_OFFLOAD) { + if (header->type == CLIENT_REPLY_BULK_OFFLOAD) { clusterSlotStatsAddNetworkBytesOutForSlot(header->slot, header->actual_len); - robj** obj_ptr = (robj**)ptr; + robj **objv = (robj **)ptr; size_t len = header->len; while (len > 0) { - decrRefCount(*obj_ptr); - obj_ptr++; - len -= sizeof(obj_ptr); + decrRefCount(*objv); + objv++; + len -= sizeof(void*); } } diff --git a/src/unit/test_networking.c b/src/unit/test_networking.c index 6eeb20302a..6511bc6308 100644 --- a/src/unit/test_networking.c +++ b/src/unit/test_networking.c @@ -284,6 +284,8 @@ int test_addRepliesWithOffloadsToList(int argc, char **argv, int flags) { decrRefCount(obj); decrRefCount(obj); + zfree(reply); + freeReplyOffloadClient(c); return 0; @@ -366,4 +368,5 @@ int test_addBufferToReplyIOV(int argc, char **argv, int flags) { freeReplyOffloadClient(c); return 0; -} \ No newline at end of file +} +