From cff89de276110278307c2608e87b33c5613124e6 Mon Sep 17 00:00:00 2001 From: Alexander Shabanov Date: Thu, 19 Dec 2024 12:16:34 +0000 Subject: [PATCH] addressed PR comments Signed-off-by: Alexander Shabanov --- src/cluster_slot_stats.c | 4 +- src/memory_prefetch.c | 2 +- src/networking.c | 133 ++++++++++++++++--------------------- src/server.h | 8 +-- src/unit/test_networking.c | 66 +++++++++--------- 5 files changed, 100 insertions(+), 113 deletions(-) diff --git a/src/cluster_slot_stats.c b/src/cluster_slot_stats.c index 997f3c646e..68628c1dae 100644 --- a/src/cluster_slot_stats.c +++ b/src/cluster_slot_stats.c @@ -339,6 +339,6 @@ void clusterSlotStatsCommand(client *c) { } int clusterSlotStatsEnabled(void) { - return server.cluster_slot_stats_enabled && /* Config should be enabled. */ - server.cluster_enabled; /* Cluster mode should be enabled. */ + return server.cluster_slot_stats_enabled && /* Config should be enabled. */ + server.cluster_enabled; /* Cluster mode should be enabled. */ } diff --git a/src/memory_prefetch.c b/src/memory_prefetch.c index 7726749ad0..358e7706b4 100644 --- a/src/memory_prefetch.c +++ b/src/memory_prefetch.c @@ -121,7 +121,7 @@ static void prefetchEntry(KeyPrefetchInfo *info) { moveToNextKey(); /* If reply offload enabled no need to prefetch value because main thread will not access it */ } else if (server.reply_offload_enabled) { - markKeyAsdone(info); + markKeyAsdone(info); } else { info->state = PREFETCH_VALUE; } diff --git a/src/networking.c b/src/networking.c index 0cc61ff9d2..cc84a8c3e9 100644 --- a/src/networking.c +++ b/src/networking.c @@ -40,20 +40,19 @@ #include #include #include -#include #include typedef enum { - CLIENT_REPLY_PAYLOAD_DATA = 0, - CLIENT_REPLY_PAYLOAD_BULK_OFFLOAD, -} clientReplyPayloadType; + CLIENT_REPLY_PLAIN = 0, + CLIENT_REPLY_BULK_OFFLOAD, +} clientReplyType; /* Reply payload header */ -typedef struct __attribute__((__packed__)) payloadHeader { - size_t len; /* payload length in a reply buffer */ - size_t actual_len; /* actual reply length after offload expanding */ - uint8_t type; /* one of clientReplyPayloadType */ - int16_t slot; /* to report network-bytes-out for offloads */ +typedef struct payloadHeader { + size_t len; /* payload length in a reply buffer */ + size_t actual_len; /* actual reply length after offload expanding */ + uint8_t type; /* one of clientReplyType */ + int16_t slot; /* to report network-bytes-out for offloads */ } payloadHeader; @@ -141,17 +140,17 @@ static inline int isReplicaReadyForReplData(client *replica) { * Reply offload can be allowed only for regular Valkey clients * that use _writeToClient handler to write replies to client connection */ -static bool isReplyOffloadAllowable(client *c) { +static int isReplyOffloadAllowable(client *c) { if (c->flag.fake) { - return false; + return 0; } switch (getClientType(c)) { - case CLIENT_TYPE_NORMAL: - case CLIENT_TYPE_PUBSUB: - return true; - default: - return false; + case CLIENT_TYPE_NORMAL: + case CLIENT_TYPE_PUBSUB: + return 1; + default: + return 0; } } @@ -448,7 +447,7 @@ static inline int updatePayloadHeader(payloadHeader *last_header, uint8_t type, static size_t upsertPayloadHeader(char *buf, size_t *bufpos, payloadHeader **last_header, uint8_t type, size_t len, int slot, size_t available) { /* Enforce min len for offloads as whole pointers must be written to the buffer */ - size_t min_len = (type == CLIENT_REPLY_PAYLOAD_BULK_OFFLOAD ? len : 1); + size_t min_len = (type == CLIENT_REPLY_BULK_OFFLOAD ? len : 1); if (min_len > available) return 0; size_t reply_len = min(available, len); @@ -460,13 +459,13 @@ static size_t upsertPayloadHeader(char *buf, size_t *bufpos, payloadHeader **las if (updatePayloadHeader(*last_header, type, reply_len, slot) == C_OK) return reply_len; } - /* Recheck min len condition and recalcuate allowed len with a new header to be added */ + /* Recheck min len condition and recalculate allowed len with a new header to be added */ if (sizeof(payloadHeader) + min_len > available) return 0; available -= sizeof(payloadHeader); if (len > available) reply_len = available; /* Start a new payload chunk */ - insertPayloadHeader(buf, bufpos, type, reply_len, slot, last_header); + insertPayloadHeader(buf, bufpos, type, reply_len, slot, last_header); return reply_len; } @@ -497,11 +496,11 @@ static size_t _addReplyPayloadToBuffer(client *c, const void *payload, size_t le } size_t _addReplyToBuffer(client *c, const char *s, size_t len) { - return _addReplyPayloadToBuffer(c, s, len, CLIENT_REPLY_PAYLOAD_DATA); + return _addReplyPayloadToBuffer(c, s, len, CLIENT_REPLY_PLAIN); } size_t _addBulkOffloadToBuffer(client *c, robj *obj) { - return _addReplyPayloadToBuffer(c, &obj, sizeof(void*), CLIENT_REPLY_PAYLOAD_BULK_OFFLOAD); + return _addReplyPayloadToBuffer(c, &obj, sizeof(void *), CLIENT_REPLY_BULK_OFFLOAD); } /* Adds the reply to the reply linked list. @@ -554,11 +553,11 @@ static void _addReplyPayloadToList(client *c, list *reply_list, const char *payl } void _addReplyProtoToList(client *c, list *reply_list, const char *s, size_t len) { - _addReplyPayloadToList(c, reply_list, s, len, CLIENT_REPLY_PAYLOAD_DATA); + _addReplyPayloadToList(c, reply_list, s, len, CLIENT_REPLY_PLAIN); } void _addBulkOffloadToList(client *c, robj *obj) { - _addReplyPayloadToList(c, c->reply, (char*) &obj, sizeof(void*), CLIENT_REPLY_PAYLOAD_BULK_OFFLOAD); + _addReplyPayloadToList(c, c->reply, (char *)&obj, sizeof(void *), CLIENT_REPLY_BULK_OFFLOAD); } /* The subscribe / unsubscribe command family has a push as a reply, @@ -970,7 +969,7 @@ void setDeferredReply(client *c, void *node, const char *s, size_t length) { * - It has enough room already allocated * - And not too large (avoid large memmove) * - And the client is not in a pending I/O state */ - if (ln->prev != NULL && (prev = listNodeValue(ln->prev)) && prev->size - prev->used > 0 && + if (ln->prev != NULL && (prev = listNodeValue(ln->prev)) && prev->size > prev->used && c->io_write_state != CLIENT_PENDING_IO && !c->flag.reply_offload) { size_t len_to_copy = prev->size - prev->used; if (len_to_copy > length) len_to_copy = length; @@ -1000,7 +999,7 @@ void setDeferredReply(client *c, void *node, const char *s, size_t length) { buf->used = 0; buf->last_header = 0; if (c->flag.reply_offload) { - upsertPayloadHeader(buf->buf, &buf->used, &buf->last_header, CLIENT_REPLY_PAYLOAD_DATA, length, c->slot, buf->size); + upsertPayloadHeader(buf->buf, &buf->used, &buf->last_header, CLIENT_REPLY_PLAIN, length, c->slot, buf->size); } memcpy(buf->buf + buf->used, s, length); buf->used += length; @@ -2210,8 +2209,8 @@ typedef struct replyIOV { int limit_reached; /* Non zero if either max iov count or NET_MAX_WRITES_PER_EVENT limit * reached during iov prepearation */ int offload_active; - int prfxcnt; /* prfxcnt, prefixes and clrf are auxiliary fields - * for expanding reply offloads */ + int prfxcnt; /* prfxcnt, prefixes and clrf are auxiliary fields + * for expanding reply offloads */ char (*prefixes)[LONG_STR_SIZE + 3]; char *crlf; } replyIOV; @@ -2251,10 +2250,11 @@ static void addPlainBufferToReplyIOV(char *buf, size_t buf_len, replyIOV *reply, return; } - /* Aggregate data len from the beginning of the buffer even though - * part of the data should be skipped in this round due to last_written_len */ + /* Aggregate data length from the beginning of the buffer even though + * part of the data can be skipped in this writevToClient invocation due to last_written_len */ metadata->data_len += buf_len; + /* Skip data written in the previous writevToClient invocation(s) */ if (reply->last_written_len >= buf_len) { reply->last_written_len -= buf_len; return; @@ -2268,12 +2268,12 @@ static void addPlainBufferToReplyIOV(char *buf, size_t buf_len, replyIOV *reply, } static void addOffloadedBulkToReplyIOV(char *buf, size_t buf_len, replyIOV *reply, bufWriteMetadata *metadata) { + robj **objv = (robj **)buf; while (buf_len > 0 && !reply->limit_reached) { - robj **obj = (robj **)buf; - char *str = (*obj)->ptr; - size_t str_len = stringObjectLen(*obj); + char *str = (*objv)->ptr; + size_t str_len = stringObjectLen(*objv); - char* prefix = reply->prefixes[reply->prfxcnt]; + char *prefix = reply->prefixes[reply->prfxcnt]; prefix[0] = '$'; size_t num_len = ll2string(prefix + 1, sizeof(reply->prefixes[0]) - 3, str_len); prefix[num_len + 1] = '\r'; @@ -2281,25 +2281,25 @@ static void addOffloadedBulkToReplyIOV(char *buf, size_t buf_len, replyIOV *repl int cnt = reply->cnt; addPlainBufferToReplyIOV(reply->prefixes[reply->prfxcnt], num_len + 3, reply, metadata); - /* Increment prfxcnt only if prefix was added to reply in this round */ + /* Increment prfxcnt only if prefix was added to reply in this writevToClient invocation */ if (reply->cnt > cnt) reply->prfxcnt++; addPlainBufferToReplyIOV(str, str_len, reply, metadata); addPlainBufferToReplyIOV(reply->crlf, 2, reply, metadata); - buf += sizeof(void*); - buf_len -= sizeof(void*); + objv++; + buf_len -= sizeof(void *); } } static void addCompoundBufferToReplyIOV(char *buf, size_t bufpos, replyIOV *reply, bufWriteMetadata *metadata) { char *ptr = buf; while (ptr < buf + bufpos && !reply->limit_reached) { - payloadHeader *header = (payloadHeader*)ptr; + payloadHeader *header = (payloadHeader *)ptr; ptr += sizeof(payloadHeader); - if (header->type == CLIENT_REPLY_PAYLOAD_DATA) { + if (header->type == CLIENT_REPLY_PLAIN) { addPlainBufferToReplyIOV(ptr, header->len, reply, metadata); } else { - serverAssert(header->type == CLIENT_REPLY_PAYLOAD_BULK_OFFLOAD); + serverAssert(header->type == CLIENT_REPLY_BULK_OFFLOAD); uint64_t data_len = metadata->data_len; addOffloadedBulkToReplyIOV(ptr, header->len, reply, metadata); /* Store actual reply len for cluster slot stats */ @@ -2325,6 +2325,19 @@ static void addBufferToReplyIOV(char *buf, size_t bufpos, replyIOV *reply, bufWr metadata->bufpos = bufpos; } +/* + * This function calculates and stores on the client next: + * io_last_written_buf - Last buffer that has been written to the client connection + * io_last_written_bufpos - The buffer has been written until this position + * io_last_written_data_len - The actual length of the data written from this buffer + * This length differs from written bufpos in case of reply offload + * + * The io_last_written_buf and io_last_written_bufpos are used by _postWriteToClient + * to detect last client reply buffer that can be released + * + * The io_last_written_data_len is used by writevToClient for resuming write from the point + * where previous writevToClient invocation stopped + **/ static void saveLastWrittenBuf(client *c, bufWriteMetadata *metadata, int bufcnt, size_t totlen, size_t totwritten) { int last = bufcnt - 1; if (totwritten == totlen) { @@ -2363,35 +2376,7 @@ void proceedToUnwritten(replyIOV *reply, int nwritten) { * it gathers the scattered buffers from reply list and sends them away with connWritev. * If we write successfully, it returns C_OK, otherwise, C_ERR is returned. * Sets the c->nwritten to the number of bytes the server wrote to the client. - * Can be called from the main thread or an I/O thread - * - * INTERNALS - * The writevToClient strives to write all client reply buffers to the client connection. - * However, it may encounter NET_MAX_WRITES_PER_EVENT or IOV_MAX or socket limit. In such case, - * some client reply buffers will be written completely and some partially. - * In next invocation writevToClient should resume from the exact position where it stopped. - * Also writevToClient should communicate to _postWriteToClient which buffers written completely - * and can be released. It is intricate in case of reply offloading as length of reply buffer does not match - * to network bytes out. - * - * For this purpose, writevToClient uses 3 data members on the client struct as input/output paramaters: - * io_last_written_buf - Last buffer that has been written to the client connection - * io_last_written_bufpos - The buffer has been written until this position - * io_last_written_data_len - The actual length of the data written from this buffer - * This length differs from written bufpos in case of reply offload - * - * The writevToClient uses addBufferToReplyIOV, addCompoundBufferToReplyIOV, addOffloadedBulkToReplyIOV, addPlainBufferToReplyIOV - * to build reply iovec array. These functions know to skip io_last_written_data_len, specifically addPlainBufferToReplyIOV - * - * In the end of execution writevToClient calls saveLastWrittenBuf for calculating "last written" buf/pos/data_len - * and storing on the client. While building reply iov, writevToClient gathers auxiliary bufWriteMetadata that - * helps in this calculation. In some cases, It may take several (> 2) invocations for writevToClient to write reply - * from a single buffer but saveLastWrittenBuf knows to calculate "last written" buf/pos/data_len properly - * - * The _postWriteToClient uses io_last_written_buf and io_last_written_bufpos in order to detect completely written buffers - * and release them - * - * */ + * Can be called from the main thread or an I/O thread */ static int writevToClient(client *c) { int iovmax = min(IOV_MAX, c->conn->iovcnt); struct iovec iov_arr[iovmax]; @@ -2437,7 +2422,7 @@ static int writevToClient(client *c) { continue; } - addBufferToReplyIOV(o->buf, used, &reply, &metadata[bufcnt]); + addBufferToReplyIOV(o->buf, used, &reply, &metadata[bufcnt]); if (!metadata[bufcnt].data_len) break; bufcnt++; @@ -2539,15 +2524,15 @@ static void releaseBufOffloads(char *buf, size_t bufpos) { payloadHeader *header = (payloadHeader *)ptr; ptr += sizeof(payloadHeader); - if (header->type == CLIENT_REPLY_PAYLOAD_BULK_OFFLOAD) { + if (header->type == CLIENT_REPLY_BULK_OFFLOAD) { clusterSlotStatsAddNetworkBytesOutForSlot(header->slot, header->actual_len); - robj** obj_ptr = (robj**)ptr; + robj **objv = (robj **)ptr; size_t len = header->len; while (len > 0) { - decrRefCount(*obj_ptr); - obj_ptr++; - len -= sizeof(obj_ptr); + decrRefCount(*objv); + objv++; + len -= sizeof(void *); } } diff --git a/src/server.h b/src/server.h index 22edc395f4..7c409e0d91 100644 --- a/src/server.h +++ b/src/server.h @@ -924,7 +924,7 @@ typedef struct payloadHeader payloadHeader; /* Defined in networking.c */ * which is actually a linked list of blocks like that, that is: client->reply. */ typedef struct clientReplyBlock { size_t size, used; - payloadHeader* last_header; + payloadHeader *last_header; char buf[]; } clientReplyBlock; @@ -1291,7 +1291,7 @@ typedef struct client { list *reply; /* List of reply objects to send to the client. */ listNode *io_last_reply_block; /* Last client reply block when sent to IO thread */ size_t io_last_bufpos; /* The client's bufpos at the time it was sent to the IO thread */ - char* io_last_written_buf; /* Last buffer that has been written to the client connection */ + char *io_last_written_buf; /* Last buffer that has been written to the client connection */ size_t io_last_written_bufpos; /* The buffer has been written until this position */ size_t io_last_written_data_len; /* The actual length of the data written from this buffer This length differs from written bufpos in case of reply offload */ @@ -1384,8 +1384,8 @@ typedef struct client { size_t buf_peak; /* Peak used size of buffer in last 5 sec interval. */ mstime_t buf_peak_last_reset_time; /* keeps the last time the buffer peak value was reset */ size_t bufpos; - payloadHeader* last_header; /* Pointer to the last header in a buffer in reply offload mode */ - size_t buf_usable_size; /* Usable size of buffer. */ + payloadHeader *last_header; /* Pointer to the last header in a buffer in reply offload mode */ + size_t buf_usable_size; /* Usable size of buffer. */ char *buf; #ifdef LOG_REQ_RES clientReqResInfo reqres; diff --git a/src/unit/test_networking.c b/src/unit/test_networking.c index 6eeb20302a..300cb46540 100644 --- a/src/unit/test_networking.c +++ b/src/unit/test_networking.c @@ -130,7 +130,7 @@ int test_rewriteClientCommandArgument(int argc, char **argv, int flags) { return 0; } -static client* createTestClient(void) { +static client *createTestClient(void) { client *c = zcalloc(sizeof(client)); c->buf = zmalloc_usable(PROTO_REPLY_CHUNK_BYTES, &c->buf_usable_size); @@ -154,18 +154,18 @@ int test_addRepliesWithOffloadsToBuffer(int argc, char **argv, int flags) { UNUSED(argv); UNUSED(flags); - client * c = createTestClient(); + client *c = createTestClient(); /* Test 1: Add bulk offloads to the buffer */ robj *obj = createObject(OBJ_STRING, sdscatfmt(sdsempty(), "test")); _addBulkOffloadToBufferOrList(c, obj); TEST_ASSERT(obj->refcount == 2); - TEST_ASSERT(c->bufpos == sizeof(payloadHeader) + sizeof(void*)); + TEST_ASSERT(c->bufpos == sizeof(payloadHeader) + sizeof(void *)); payloadHeader *header1 = c->last_header; - TEST_ASSERT(header1->type == CLIENT_REPLY_PAYLOAD_BULK_OFFLOAD); - TEST_ASSERT(header1->len == sizeof(void*)); + TEST_ASSERT(header1->type == CLIENT_REPLY_BULK_OFFLOAD); + TEST_ASSERT(header1->len == sizeof(void *)); robj **ptr = (robj **)(c->buf + sizeof(payloadHeader)); TEST_ASSERT(obj == *ptr); @@ -173,37 +173,37 @@ int test_addRepliesWithOffloadsToBuffer(int argc, char **argv, int flags) { robj *obj2 = createObject(OBJ_STRING, sdscatfmt(sdsempty(), "test2")); _addBulkOffloadToBufferOrList(c, obj2); - TEST_ASSERT(c->bufpos == sizeof(payloadHeader) + 2 * sizeof(void*)); - TEST_ASSERT(header1->type == CLIENT_REPLY_PAYLOAD_BULK_OFFLOAD); - TEST_ASSERT(header1->len == 2 * sizeof(void*)); + TEST_ASSERT(c->bufpos == sizeof(payloadHeader) + 2 * sizeof(void *)); + TEST_ASSERT(header1->type == CLIENT_REPLY_BULK_OFFLOAD); + TEST_ASSERT(header1->len == 2 * sizeof(void *)); - ptr = (robj **)(c->buf + sizeof(payloadHeader) + sizeof(void*)); + ptr = (robj **)(c->buf + sizeof(payloadHeader) + sizeof(void *)); TEST_ASSERT(obj2 == *ptr); /* Test 2: Add plain reply to the buffer */ - const char* plain = "+OK\r\n"; + const char *plain = "+OK\r\n"; size_t plain_len = strlen(plain); _addReplyToBufferOrList(c, plain, plain_len); - TEST_ASSERT(c->bufpos == 2 * sizeof(payloadHeader) + 2 * sizeof(void*) + plain_len); - TEST_ASSERT(header1->type == CLIENT_REPLY_PAYLOAD_BULK_OFFLOAD); - TEST_ASSERT(header1->len == 2 * sizeof(void*)); + TEST_ASSERT(c->bufpos == 2 * sizeof(payloadHeader) + 2 * sizeof(void *) + plain_len); + TEST_ASSERT(header1->type == CLIENT_REPLY_BULK_OFFLOAD); + TEST_ASSERT(header1->len == 2 * sizeof(void *)); payloadHeader *header2 = c->last_header; - TEST_ASSERT(header2->type == CLIENT_REPLY_PAYLOAD_DATA); + TEST_ASSERT(header2->type == CLIENT_REPLY_PLAIN); TEST_ASSERT(header2->len == plain_len); for (int i = 0; i < 9; ++i) _addReplyToBufferOrList(c, plain, plain_len); - TEST_ASSERT(c->bufpos == 2 * sizeof(payloadHeader) + 2 * sizeof(void*) + 10 * plain_len); - TEST_ASSERT(header2->type == CLIENT_REPLY_PAYLOAD_DATA); + TEST_ASSERT(c->bufpos == 2 * sizeof(payloadHeader) + 2 * sizeof(void *) + 10 * plain_len); + TEST_ASSERT(header2->type == CLIENT_REPLY_PLAIN); TEST_ASSERT(header2->len == plain_len * 10); /* Test 3: Add one more bulk offload to the buffer */ _addBulkOffloadToBufferOrList(c, obj); TEST_ASSERT(obj->refcount == 3); - TEST_ASSERT(c->bufpos == 3 * sizeof(payloadHeader) + 3 * sizeof(void*) + 10 * plain_len); + TEST_ASSERT(c->bufpos == 3 * sizeof(payloadHeader) + 3 * sizeof(void *) + 10 * plain_len); payloadHeader *header3 = c->last_header; - TEST_ASSERT(header3->type == CLIENT_REPLY_PAYLOAD_BULK_OFFLOAD); - ptr = (robj **)((char*)c->last_header + sizeof(payloadHeader)); + TEST_ASSERT(header3->type == CLIENT_REPLY_BULK_OFFLOAD); + ptr = (robj **)((char *)c->last_header + sizeof(payloadHeader)); TEST_ASSERT(obj == *ptr); decrRefCount(obj); @@ -248,10 +248,10 @@ int test_addRepliesWithOffloadsToList(int argc, char **argv, int flags) { listNode *next = listNext(&iter); clientReplyBlock *blk = listNodeValue(next); - TEST_ASSERT(blk->used == sizeof(payloadHeader) + sizeof(void*)); + TEST_ASSERT(blk->used == sizeof(payloadHeader) + sizeof(void *)); payloadHeader *header1 = blk->last_header; - TEST_ASSERT(header1->type == CLIENT_REPLY_PAYLOAD_BULK_OFFLOAD); - TEST_ASSERT(header1->len == sizeof(void*)); + TEST_ASSERT(header1->type == CLIENT_REPLY_BULK_OFFLOAD); + TEST_ASSERT(header1->len == sizeof(void *)); robj **ptr = (robj **)(blk->buf + sizeof(payloadHeader)); TEST_ASSERT(obj == *ptr); @@ -260,9 +260,9 @@ int test_addRepliesWithOffloadsToList(int argc, char **argv, int flags) { _addBulkOffloadToBufferOrList(c, obj); TEST_ASSERT(obj->refcount == 3); TEST_ASSERT(listLength(c->reply) == 1); - TEST_ASSERT(blk->used == sizeof(payloadHeader) + 2 * sizeof(void*)); - TEST_ASSERT(header1->type == CLIENT_REPLY_PAYLOAD_BULK_OFFLOAD); - TEST_ASSERT(header1->len == 2 * sizeof(void*)); + TEST_ASSERT(blk->used == sizeof(payloadHeader) + 2 * sizeof(void *)); + TEST_ASSERT(header1->type == CLIENT_REPLY_BULK_OFFLOAD); + TEST_ASSERT(header1->len == 2 * sizeof(void *)); /* Test 3: Add plain replies to cause reply list grow */ while (reply_len < blk->size - blk->used) _addReplyToBufferOrList(c, reply, reply_len); @@ -277,13 +277,15 @@ int test_addRepliesWithOffloadsToList(int argc, char **argv, int flags) { clientReplyBlock *blk2 = listNodeValue(next); /* last header in 2nd block */ payloadHeader *header3 = blk2->last_header; - TEST_ASSERT(header2->type == CLIENT_REPLY_PAYLOAD_DATA && header3->type == CLIENT_REPLY_PAYLOAD_DATA); + TEST_ASSERT(header2->type == CLIENT_REPLY_PLAIN && header3->type == CLIENT_REPLY_PLAIN); TEST_ASSERT((header2->len + header3->len) % reply_len == 0); decrRefCount(obj); decrRefCount(obj); decrRefCount(obj); + zfree(reply); + freeReplyOffloadClient(c); return 0; @@ -294,7 +296,7 @@ int test_addBufferToReplyIOV(int argc, char **argv, int flags) { UNUSED(argv); UNUSED(flags); - const char* expected_reply = "$5\r\nhello\r\n"; + const char *expected_reply = "$5\r\nhello\r\n"; ssize_t total_len = strlen(expected_reply); const int iovmax = 16; char crlf[2] = {'\r', '\n'}; @@ -314,7 +316,7 @@ int test_addBufferToReplyIOV(int argc, char **argv, int flags) { TEST_ASSERT(reply.iov_len_total == total_len); TEST_ASSERT(reply.cnt == 3); - const char* ptr = expected_reply; + const char *ptr = expected_reply; for (int i = 0; i < reply.cnt; ++i) { TEST_ASSERT(memcmp(ptr, reply.iov[i].iov_base, reply.iov[i].iov_len) == 0); ptr += reply.iov[i].iov_len; @@ -335,12 +337,12 @@ int test_addBufferToReplyIOV(int argc, char **argv, int flags) { initReplyIOV(c, iovmax, iov_arr2, prefixes2, crlf, &reply2); addBufferToReplyIOV(c->buf, c->bufpos, &reply2, &metadata2[0]); TEST_ASSERT(reply2.iov_len_total == total_len - 1); - TEST_ASSERT((*(char*)reply2.iov[0].iov_base) == '5'); + TEST_ASSERT((*(char *)reply2.iov[0].iov_base) == '5'); /* Test 4: Last written buf/pos/data_len after 2nd invocation */ saveLastWrittenBuf(c, metadata2, 1, reply2.iov_len_total, 4); /* 4 more bytes has been written */ TEST_ASSERT(c->io_last_written_buf == c->buf); - TEST_ASSERT(c->io_last_written_bufpos == 0); /* incomplete write */ + TEST_ASSERT(c->io_last_written_bufpos == 0); /* incomplete write */ TEST_ASSERT(c->io_last_written_data_len == 5); /* 1 + 4 */ /* Test 5: 3rd writevToclient invocation */ @@ -352,7 +354,7 @@ int test_addBufferToReplyIOV(int argc, char **argv, int flags) { initReplyIOV(c, iovmax, iov_arr3, prefixes3, crlf, &reply3); addBufferToReplyIOV(c->buf, c->bufpos, &reply3, &metadata3[0]); TEST_ASSERT(reply3.iov_len_total == total_len - 5); - TEST_ASSERT((*(char*)reply3.iov[0].iov_base) == 'e'); + TEST_ASSERT((*(char *)reply3.iov[0].iov_base) == 'e'); /* Test 6: Last written buf/pos/data_len after 3rd invocation */ saveLastWrittenBuf(c, metadata3, 1, reply3.iov_len_total, reply3.iov_len_total); /* everything has been written */ @@ -366,4 +368,4 @@ int test_addBufferToReplyIOV(int argc, char **argv, int flags) { freeReplyOffloadClient(c); return 0; -} \ No newline at end of file +}