Hiredis: async.c:451: redisProcessCallbacks: Assertion `(c->flags & 0x20 || c->flags & 0x40)' failed.

Created on 26 Nov 2016  路  8Comments  路  Source: redis/hiredis

redis-adapter.h

#ifndef __DRAGON_REDIS_ADAPTER_H__
#define __DRAGON_REDIS_ADAPTER_H__

#include <stdlib.h>
#include <sys/types.h>
#include <sys/epoll.h>
#include <hiredis/hiredis.h>
#include <hiredis/async.h>

typedef struct redisEvents {
    redisAsyncContext *context;
    int epfd;
} redisEvents;

static void redisReadEvent(redisEvents *e) {
    redisAsyncHandleRead(e->context);
}

static void redisWriteEvent(redisEvents *e) {
    redisAsyncHandleWrite(e->context);
}

static void redisAddRead(void *privdata) {
    redisEvents *e = (redisEvents*)privdata;

    epoll_event event;
    event.data.fd = e->context->c.fd;
    event.events = EPOLLIN;
    int ret = epoll_ctl(e->epfd, EPOLL_CTL_ADD, e->context->c.fd, &event);
    if (ret == -1) {
        if (errno == EEXIST) {
            ret = epoll_ctl(e->epfd, EPOLL_CTL_MOD, e->context->c.fd, &event);
        }
        if (ret  == -1) {
            perror("redisAddRead epoll_ctl");
        }
        return ;
    }
}

static void redisDelRead(void *privdata) {
    redisEvents *e = (redisEvents*)privdata;

    int ret = epoll_ctl(e->epfd, EPOLL_CTL_DEL, e->context->c.fd, NULL);
    if (ret == -1) {
        perror("redisDelRead epoll_ctl");
        return ;
    }

}

static void redisAddWrite(void *privdata) {
    redisEvents *e = (redisEvents*)privdata;

    epoll_event event;
    event.data.fd = e->context->c.fd;
    event.events = EPOLLOUT;
    int ret = epoll_ctl(e->epfd, EPOLL_CTL_ADD, e->context->c.fd, &event);
    if (ret == -1) {
        if (errno == EEXIST) {
            ret = epoll_ctl(e->epfd, EPOLL_CTL_MOD, e->context->c.fd, &event);
        }
        if (ret  == -1) {
            perror("redisAddWrite epoll_ctl");
        }
        return ;
    }
}

static void redisDelWrite(void *privdata) {
    redisEvents *e = (redisEvents*)privdata;

    int ret = epoll_ctl(e->epfd, EPOLL_CTL_DEL, e->context->c.fd, NULL);
    if (ret == -1) {
        perror("redisDelWrite epoll_ctl");
        return ;
    }
}

static void redisCleanup(void *privdata) {
    redisEvents *e = (redisEvents*)privdata;
    redisDelRead(privdata);
    redisDelWrite(privdata);
    free(e);
}

static int redisAttach(redisAsyncContext *ac, int epfd) {
    redisContext *c = &(ac->c);
    redisEvents *e;

    /* Nothing should be attached when something is already attached */
    if (ac->ev.data != NULL)
        return REDIS_ERR;

    /* Create container for context and r/w events */
    e = (redisEvents*)malloc(sizeof(*e));
    e->context = ac;
    e->epfd = epfd;

    /* Register functions to start/stop listening for events */
    ac->ev.addRead = redisAddRead;
    ac->ev.delRead = redisDelRead;
    ac->ev.addWrite = redisAddWrite;
    ac->ev.delWrite = redisDelWrite;
    ac->ev.cleanup = redisCleanup;
    ac->ev.data = e;

    /* Initialize read/write events */
    epoll_event event;
    event.data.fd = c->fd;
    event.events = EPOLLIN|EPOLLOUT;
    int ret = epoll_ctl(epfd, EPOLL_CTL_ADD, c->fd, &event);
    if (ret == -1) {
        perror("redisAttach epoll_ctl");
        return REDIS_ERR;
    }
    return REDIS_OK;
}

#endif

server.cc

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/epoll.h>
#include <sys/socket.h>
#include <sys/time.h>
#include <netinet/in.h>
#include <fcntl.h>
#include <memory.h>
#include <errno.h>

#include "redis-adapter.h"

int sockfd;
int epfd;
epoll_event event;
redisAsyncContext *c_sub;

static void subCallback(redisAsyncContext *c, void *r, void *privdata) {
    redisReply *reply = (redisReply *)r;
    if (reply == NULL) return;
}

int CreateSocketAndBind(int port) {
    struct sockaddr_in servaddr;
    sockfd = socket(AF_INET, SOCK_STREAM, 0);
    if (sockfd == -1) {
        perror("socket:");
        return -1;
    }
    memset(&servaddr, 0, sizeof(servaddr));
    servaddr.sin_family = AF_INET;
    servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
    servaddr.sin_port = htons(port);
    int ret = bind(sockfd, (sockaddr *)&servaddr, sizeof(servaddr));
    if (ret == -1) {
        perror("bind:");
        return -1;
    }
    return 0;
}
int MakeSocketNonbloking(int fd) {
    int flags = fcntl(fd, F_GETFL, 0);
    if (flags == -1) {
        perror("fcntl");
        return -1;
    }

    flags|= O_NONBLOCK;
    int s = fcntl(fd, F_SETFL, flags);
    if (s == -1) {
        perror("fcntl");
        return-1;
    }
    return 0;
}

int InitEpoll() {
    int ret = listen(sockfd, SOMAXCONN);
    if (ret != 0) {
        perror("listen:");
        exit(1);
    }
    ret = MakeSocketNonbloking(sockfd);
    if (ret != 0) {
        perror("MakeSocketNonbloking:");
        exit(1);
    }
    epfd = epoll_create(65535);
    if (epfd == -1) {
        perror("epoll_create:");
        exit(1);
    }

    event.data.fd = sockfd;
    event.events = EPOLLIN;
    ret = epoll_ctl(epfd, EPOLL_CTL_ADD, sockfd, &event);
    if (ret == -1) {
        perror("epoll_ctl");
        exit(1);
    }

    c_sub = redisAsyncConnect("127.0.0.1", 6379);
    if (c_sub->err) {
        printf("Error: %s\n", c_sub->errstr);
        exit(1);
    }
    redisAttach(c_sub, epfd);
}

int Run() {
    epoll_event events[64];
    for (int z=0; z<10; ++z) {
        redisAsyncCommand(c_sub, subCallback, NULL, "SUBSCRIBE %d", z);
        redisAsyncCommand(c_sub, subCallback, NULL, "UNSUBSCRIBE %d", z);
    }
    while (1) {
        int count = epoll_wait(epfd, events, 64, 0);
        if (count == -1) {
            perror("epoll_wait:");
            exit(1);
        } 

        for (int i=0; i<count; i++) {
             if (events[i].data.fd == c_sub->c.fd) {
                printf("Redis sub event %d\n", getpid());
                if (events[i].events & EPOLLIN) {
                    redisAsyncHandleRead(c_sub);
                } else if (events[i].events & EPOLLOUT) {
                    redisAsyncHandleWrite(c_sub);
                }
            }
        }
    }
    close(epfd);
}

int main() {
    CreateSocketAndBind(9090);
    InitEpoll();
    Run();
    return 0;
}

I am coding a IM Server using redis sub/pub pattern, server.cc crash on assert occasionally while client forking 100 client processes and use ctrl + c to close client processes. if u comment server.cc redisAsyncCommand(c_sub, subCallback, NULL, "SUBSCRIBE %s", name); and redisAsyncCommand(c_sub, subCallback, NULL, "UNSUBSCRIBE %s", c->id.c_str()); problem will go away.

Most helpful comment

I'm experiencing the same failed assertion. My code is nearly identical to the pub / sub sample in the wiki, with only an additional call to unsubscribe.

Once I call:

auto status = redisAsyncCommand(context_, nullptr, nullptr, "UNSUBSCRIBE %s", key.c_str());

After the onMessage function executes for the unsubscribe response, I get the failed assertion:

async.c:463: redisProcessCallbacks: Assertion(c->flags & REDIS_SUBSCRIBED || c->flags & REDIS_MONITORING)' failed.`

Is there anything I need to do before unsubscribing to prevent this?

All 8 comments

This is a huge example of code I absolutely don't know. Plus on first sight it's completely unclear what the client.cc is for, it doesn't touch Redis/Hiredis at all.

Please provide a reduced example triggering the error.

code reduced.
I use redis as a message broker.

I find problem that is UNSUBSCRIBE command call. The problem gone away when comment redisAsyncCommand(c_sub, subCallback, NULL, "UNSUBSCRIBE %s", c->id.c_str());.

There are still hundreds of lines of code. What exactly did you reduce?

Reduced again

I'm experiencing the same failed assertion. My code is nearly identical to the pub / sub sample in the wiki, with only an additional call to unsubscribe.

Once I call:

auto status = redisAsyncCommand(context_, nullptr, nullptr, "UNSUBSCRIBE %s", key.c_str());

After the onMessage function executes for the unsubscribe response, I get the failed assertion:

async.c:463: redisProcessCallbacks: Assertion(c->flags & REDIS_SUBSCRIBED || c->flags & REDIS_MONITORING)' failed.`

Is there anything I need to do before unsubscribing to prevent this?

After some digging I found a potential cause of the issue. I found the issue occurs when my program does the following:

  • Subscribe to a channel
  • Poll libevent for messages, process the subscribe and message replies
  • Unsubscribe from the channel
  • Subscribe to a different channel
  • Poll libevent, assert fails after processing the unsubscribe reply

It seems the only way I can prevent the failed assertion is by not subscribing to a new channel until I've processed a unsubscribe reply from the previous.

Is this intended? I was under the impression that you could have multiple subscriptions on a single context.

I also faced the same problem. After I have unsubscribed all the channel and then call redisAsyncHandleRead, hiredis assert failed at async.c: 478.
I use hiredis 0.14.1 version.
And I didn't use multithread.

Was this page helpful?
0 / 5 - 0 ratings

Related issues

glennWang picture glennWang  路  3Comments

fenglonz picture fenglonz  路  15Comments

yudataguy picture yudataguy  路  10Comments

gameyin picture gameyin  路  6Comments

ShePastAway0 picture ShePastAway0  路  7Comments