Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[bug] stream_engine may terminate prematurely with data unsent (under insufficient tcp rcvbuf) #4763

Open
lu-lie opened this issue Dec 23, 2024 · 0 comments

Comments

@lu-lie
Copy link

lu-lie commented Dec 23, 2024

Please use this template for reporting suspected bugs or requests for help.

Issue description

If tcp rcvbuf is insufficient, stream_engine may terminate with some messages unsent.

There is a out-of-sync problem when stream_engine terminates when I test with push/pull message mode in v4.1.3. In particular, stream_engine tries to close the socket fd when outpipe reads the delimiter, which means all data are dequeued from the pipe. However, at that time there may be some data left in the buffer created by the stream_engine in its out_event. As a result, these data cannot be received by peer though the sender exit normally.

I found the stream_engine's _outsize > 0 before it close the socket fd in its destructor. I also found that using additional out_event in the destructor can fix this. But I don't know how to fix it decently.

For more details, please see the following reproducing process.

Environment

  • libzmq version (commit hash if unreleased): I mainly test it in v4.1.3, but it can be reproduced in v4.3.5 too.
  • OS: CentOS Linux release 7.9.2009 (also reproduced in my vm with linux-5.10)

Minimal test code / Steps to reproduce the issue

2 conditions are needed for easier reproducing: (1) no enough buffer; (2) some special msg size and count.

  1. decrease the tcp snd/rcvbuf:
sysctl "net.ipv4.tcp_wmem=4096 8192 8192"
sysctl "net.ipv4.tcp_rmem=4096 8192 8192"
  1. use the attached code:
#include <stdio.h>
#include <stdbool.h>
#include <stdlib.h>
#include <string.h>
#include <assert.h>
#include <zmq.h>
#include <unistd.h>
int main (int argc, char **argv)
{
    int server = 0;
    int rc;

    if (argc <= 1) /* without arg -> server */
        server = 1;

    void *ctx = zmq_ctx_new();
    assert(ctx);

    // server:bind pull
    // client: connect push
    void *sock = zmq_socket(ctx, server ? ZMQ_PULL : ZMQ_PUSH);
    assert(sock);

    // set rcv hwm
    int opt = 1;
    assert(0 == zmq_setsockopt(sock, ZMQ_RCVHWM, &opt, sizeof(int)));

    zmq_msg_t msg;
    if (server) {
        printf("server\n");
        assert(0 == zmq_bind(sock, "tcp://127.0.0.1:12345"));

        // recv slowly to mimic insufficient rcvbuf
        sleep(5);
        int recv_time = 0;

        // keep receiving until the ending-msg with size==0
        do {
            zmq_msg_close(&msg);
            zmq_msg_init(&msg);

            assert(0 <= zmq_msg_recv(&msg, sock, 0));
            recv_time++;
            printf("Got a msg %d: size = %d\n", recv_time, zmq_msg_size(&msg));

        } while (zmq_msg_size(&msg) != 0);
        assert(0 == zmq_msg_close(&msg));

    } else {
        printf("client\n");
        assert(0 == zmq_connect(sock, "tcp://127.0.0.1:12345"));

        int max_sendtime = atoi(argv[1]);
        printf("max_sendtime = %d\n", max_sendtime);
        assert(max_sendtime > 0);

        int msglen = atoi(argv[2]);
        printf("msglen = %d\n", msglen);
        assert(msglen > 0);

        zmq_msg_t msg_data;
        size_t len = msglen;
        for (int i = 0; i < max_sendtime; ++i) {
            assert(0 == zmq_msg_init_size(&msg_data, len));
            memset(zmq_msg_data(&msg_data), 'a', len);

            rc = zmq_msg_send(&msg_data, sock, 0);
            assert(0 < rc);
            printf("Sent a msg %d: size = %d\n", i + 1, rc);
        }

        /* finished: build the ending-msg with size==0 and send */
        int dummy = 0;
        assert(0 == zmq_msg_init_size(&msg, 0));
        assert(0 == zmq_msg_send(&msg, sock, 0));
        printf("Sent final msg: size = 0\n");
    }

    zmq_close(sock);
    zmq_ctx_term(ctx);
    printf("Closed\n");
    return 0;
}

The client sends some data to the server, and ends with a "ending message" (payload size == 0), while the server keeps receiving until the ending message. To mimic the lacking tcp buffer, the server sleeps for 5s before actual receiving.

cc zmq_test.c -o zt -lzmq

# server: keep receiving until the last ending msg is visited.
./zt

# client: send 3 data msg with payload size 14500 and 1 ending msg.
./zt 3 14500

What's the actual result? (include assertion message & call stack if applicable)

In about 5 seconds after launching, the server receives all the 4 messages (3 data msg and 1 ending msg) and exits.

What's the expected result?

The server only receives the first 2 messages, and thus doesn't exit.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant