AIX

 View Only
Expand all | Collapse all

POLLEXCL issue - only one process ever is notified when message queue is full and several processes are polling

  • 1.  POLLEXCL issue - only one process ever is notified when message queue is full and several processes are polling

    Posted Wed January 26, 2022 10:25 AM
    Edited by Michael Williams Wed January 26, 2022 12:31 PM
    Hi folks,

    I am testing new feature of IBM AIX 7.3 - POLLEXCL flag for poll(). And we face the issue during following test:

    - There are several processes polling on single message queue, waiting for POLLIN event with new POLLEXCL flag.
    - There are several processes which are sending messages to message queue.

    The result is that only one process/pid gets ever notifications on this message queue, even this pid does not poll on the resource (pid is busy), the other PIDs does not get notifications of new messages, even those other processes are listening with poll() on the same queue.

    We have prepared example: 

    $ gcc test.c -pthread

    test.c:

    #define _MSGQSUPPORT 1
    #define _THREAD_SAFE
    
    #include <sys/msg.h>
    #include <unistd.h>
    #include <sys/types.h>
    #include <stdio.h>
    #include <stdlib.h>
    #include <string.h>
    #include <errno.h>
    #include <pthread.h>
    #include <poll.h>
    #include <sys/mman.h>
    #include <fcntl.h> 
    
    
    #define MAX_THREADS 5000
    #define THREAD_POLL 0
    #define THREAD_RUNNING  1
    
    struct msgbuftest {
       long mType;
       char mText[50];
    };
    
    int *M_msg_proc = NULL;
    pthread_mutex_t *M_mutex = NULL;
    int *M_thread_stat=NULL;
    int num_msg=1000000;
    int  M_qid = -1;
    
    static void * waiter_thread_start(void *arg)
    {
        char txtBuf[50];
        long tid=(long)arg;
        struct msgbuftest msg, buf;
    
        struct pollist {
            struct pollmsg msgs[1];
        } list;
    
        int nmsgs = 1;
    
        list.msgs[0].msgid = M_qid;
        list.msgs[0].reqevents = POLLIN | POLLEXCL;
    
        printf( "thread: Message queue id = %u\n", M_qid );
    
        /**
         * Should be one event per message per process with POLLEXCL
         * in reallity we get 1x event per process, even queue is full
         * other processes are not notified.
         */
        printf("Polling....\n");
        while (poll( &list, (nmsgs<<16), -1) > 0)
        {
            pthread_mutex_lock(M_mutex);
            M_thread_stat[tid]=THREAD_RUNNING;
            pthread_mutex_unlock(M_mutex);
    
            if (msgrcv( M_qid, &buf, sizeof msg.mText, 1, 0 )>=0)
            {
                struct msqid_ds buf;
                if (0!=msgctl(M_qid, IPC_STAT, &buf))
                {
                    perror("Failed to msgctl");
                    exit(-1);
                }
    
                printf("Got msg tid=[%ld] msgs in q=[%d]....\n", tid, buf.msg_qnum);
                pthread_mutex_lock(M_mutex);
                (*M_msg_proc)++;
                pthread_mutex_unlock(M_mutex);
                sleep(1);
            }
            else if (errno!=ENOMSG)
            {
                perror("Failed to msgrcv");
                exit(-1);
            }
            pthread_mutex_lock(M_mutex);
            M_thread_stat[tid]=THREAD_POLL;
            pthread_mutex_unlock(M_mutex);
        }
    
        printf("Polling.... END\n");
    
    }
    
    /**
     * Load the message queue full
     */
    static void * sender_thread_start(void *arg)
    {
        char txtBuf[50];
        int qId;
        key_t key;
        struct msgbuftest msg, buf;
        struct msqid_ds msgCtlBuf;
        int i, ret;
        int nmsgs = 1;
    
        for (i=0; i<num_msg; i++)
        {
            strcpy( msg.mText, "This is a message" );
            msg.mType = 1;
    
            if ( msgsnd( M_qid, &msg, sizeof msg.mText, 0 ) == -1 )
            {
                perror( "server: msgsnd failed:" );
                exit( 3 );
            }
        }        
    }
    
    int main(int argc, char **argv) 
    {
        int i,j, sum;
        int num_threads=1;
        int compl;
        pthread_mutexattr_t  mattr;
        struct msqid_ds msgCtlBuf;
    
    
        M_qid = msgget(IPC_PRIVATE,  0666 | IPC_CREAT |  IPC_EXCL);
        if (-1==M_qid)
        {
                fprintf(stderr, "Failed to crate Q\n"); 
                exit(EXIT_FAILURE);
        }
    
        M_thread_stat = mmap(NULL, sizeof(int)*MAX_THREADS, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_ANONYMOUS, -1, 0);     
        if (M_thread_stat == MAP_FAILED)
        {     
            fprintf(stderr, "mmap() failed 1\n"); 
            exit(EXIT_FAILURE);
        }  
    
        M_msg_proc = mmap(NULL, sizeof(int), PROT_READ | PROT_WRITE, MAP_SHARED | MAP_ANONYMOUS, -1, 0);     
        if (M_msg_proc == MAP_FAILED)
        {     
            fprintf(stderr, "mmap() failed 2\n"); 
            exit(EXIT_FAILURE);
        }  
    
        M_mutex = mmap(NULL, sizeof(*M_mutex), PROT_READ | PROT_WRITE, MAP_SHARED | MAP_ANONYMOUS, -1, 0);     
        if (M_mutex == MAP_FAILED)
        {     
            fprintf(stderr, "mmap() failed 3\n"); 
            exit(EXIT_FAILURE);
        }
    
        if ( 0!=pthread_mutexattr_init(&mattr))
        {
            fprintf(stderr, "pthread_mutexattr_init() failed\n"); 
            exit(EXIT_FAILURE);
        }
    
        if ((i=pthread_mutexattr_setpshared(&mattr, PTHREAD_PROCESS_SHARED)) < 0)
        {
            fprintf(stderr, "Failed to set attribute PTHREAD_PROCESS_SHARED: %s\n", strerror(i));
            exit(EXIT_FAILURE);
        }
    
        pthread_mutex_init(M_mutex, &mattr);
    
    
        memset(M_thread_stat, 0, sizeof(M_thread_stat));
    
        if (argc<3)
        {
            fprintf(stderr, "Expected: %s <number_of_threads> <msg_num>\n", argv[0]);
            exit(-1);
        }
    
        num_threads=atoi(argv[1]);
        num_msg=atoi(argv[2]);
    
        if (num_threads>MAX_THREADS)
        {
            num_threads=MAX_THREADS;
        }
    
        /* waiters */        
        for (i=0; i<num_threads; i++)
        {
            if (0==fork())
            {
                waiter_thread_start((void *)(long)i);
                exit(-1);
            }
        }
    
        sleep(2);
        /* senders */        
        for (i=0; i<num_threads; i++)
        {
            if (0==fork())
            {
                sender_thread_start((void *)(long)i);
                exit(-1);
            }
        }
    
        while (1)
        {
            sleep(1);
    
            /* print number of threads busy... */
            sum=0;
            for (i=0; i<num_threads; i++)
            {
                pthread_mutex_lock(M_mutex);
                sum+=M_thread_stat[i];
                pthread_mutex_unlock(M_mutex);
            }
    
            pthread_mutex_lock(M_mutex);
            if ( (*M_msg_proc)==num_msg*num_threads)
            {
                pthread_mutex_unlock(M_mutex);
                break;
            }
            compl=*M_msg_proc;
            pthread_mutex_unlock(M_mutex);
            printf("Threads busy: %d msg %d/%d\n", sum, *M_msg_proc, num_msg*num_threads);
            
        }
    
        fprintf( stderr, "server: Messages %d sent successfully\n", *M_msg_proc);
        msgctl( M_qid, IPC_RMID, &msgCtlBuf );
        munmap(M_thread_stat, sizeof(int)*MAX_THREADS);
        munmap(M_msg_proc, sizeof(int));
        munmap(M_mutex, sizeof(*M_mutex));
    
    }
    
    ​

    The output is following, basically 30 messages was processed by on process, tid=0, even the number of messages in Q>0, and number of pollers on Q is 9, i.e. it could be seen that only one PID is busy at the same time (Threads busy: 1).

     time ./a.out 10 3
    thread: Message queue id = 1881
    Polling....
    thread: Message queue id = 1881
    Polling....
    thread: Message queue id = 1881
    Polling....
    thread: Message queue id = 1881
    Polling....
    thread: Message queue id = 1881
    Polling....
    thread: Message queue id = 1881
    Polling....
    thread: Message queue id = 1881
    Polling....
    thread: Message queue id = 1881
    Polling....
    thread: Message queue id = 1881
    Polling....
    thread: Message queue id = 1881
    Polling....
    Got msg tid=[0] msgs in q=[8]....
    Got msg tid=[0] msgs in q=[28]....
    Threads busy: 1 msg 2/30
    Got msg tid=[0] msgs in q=[27]....
    Threads busy: 1 msg 3/30
    Got msg tid=[0] msgs in q=[26]....
    Threads busy: 1 msg 4/30
    Got msg tid=[0] msgs in q=[25]....
    Threads busy: 1 msg 5/30
    Got msg tid=[0] msgs in q=[24]....
    Threads busy: 1 msg 6/30
    Got msg tid=[0] msgs in q=[23]....
    Threads busy: 1 msg 7/30
    Got msg tid=[0] msgs in q=[22]....
    Threads busy: 1 msg 8/30
    Got msg tid=[0] msgs in q=[21]....
    Threads busy: 1 msg 9/30
    Got msg tid=[0] msgs in q=[20]....
    Threads busy: 1 msg 10/30
    Got msg tid=[0] msgs in q=[19]....
    Threads busy: 1 msg 11/30
    Got msg tid=[0] msgs in q=[18]....
    Threads busy: 1 msg 12/30
    Got msg tid=[0] msgs in q=[17]....
    Threads busy: 1 msg 13/30
    Got msg tid=[0] msgs in q=[16]....
    Threads busy: 1 msg 14/30
    Got msg tid=[0] msgs in q=[15]....
    Threads busy: 1 msg 15/30
    Got msg tid=[0] msgs in q=[14]....
    Threads busy: 1 msg 16/30
    Got msg tid=[0] msgs in q=[13]....
    Threads busy: 1 msg 17/30
    Got msg tid=[0] msgs in q=[12]....
    Threads busy: 1 msg 18/30
    Got msg tid=[0] msgs in q=[11]....
    Threads busy: 1 msg 19/30
    Got msg tid=[0] msgs in q=[10]....
    Threads busy: 1 msg 20/30
    Got msg tid=[0] msgs in q=[9]....
    Threads busy: 1 msg 21/30
    Got msg tid=[0] msgs in q=[8]....
    Threads busy: 1 msg 22/30
    Got msg tid=[0] msgs in q=[7]....
    Threads busy: 1 msg 23/30
    Got msg tid=[0] msgs in q=[6]....
    Threads busy: 1 msg 24/30
    Got msg tid=[0] msgs in q=[5]....
    Threads busy: 1 msg 25/30
    Got msg tid=[0] msgs in q=[4]....
    Threads busy: 1 msg 26/30
    Got msg tid=[0] msgs in q=[3]....
    Threads busy: 1 msg 27/30
    Got msg tid=[0] msgs in q=[2]....
    Threads busy: 1 msg 28/30
    Got msg tid=[0] msgs in q=[1]....
    Threads busy: 1 msg 29/30
    Got msg tid=[0] msgs in q=[0]....
    server: Messages 30 sent successfully
    
    real    0m31.030s
    user    0m0.002s
    sys     0m0.003s
    
    ​

    I tried to disable POLLEXCL flag (commented out), and test returned following, it could be seen that all 10 poll() pids were busy (thought this now introduces thundering herd issue, but for sake of testing / test verification, such scenario was tried). Now it could be seen that Threads busy is *10*, so all listeners got notifications and started the processing (i.e. sleep)).

    time ./a.out 10 3
    thread: Message queue id = 3147610
    Polling....
    thread: Message queue id = 3147610
    Polling....
    thread: Message queue id = 3147610
    Polling....
    thread: Message queue id = 3147610
    Polling....
    thread: Message queue id = 3147610
    Polling....
    thread: Message queue id = 3147610
    Polling....
    thread: Message queue id = 3147610
    Polling....
    thread: Message queue id = 3147610
    Polling....
    thread: Message queue id = 3147610
    Polling....
    thread: Message queue id = 3147610
    Polling....
    Got msg tid=[9] msgs in q=[8]....
    Got msg tid=[8] msgs in q=[7]....
    Got msg tid=[7] msgs in q=[6]....
    Got msg tid=[6] msgs in q=[5]....
    Got msg tid=[4] msgs in q=[4]....
    Got msg tid=[5] msgs in q=[3]....
    Got msg tid=[3] msgs in q=[2]....
    Got msg tid=[2] msgs in q=[1]....
    Got msg tid=[1] msgs in q=[0]....
    Got msg tid=[0] msgs in q=[5]....
    Got msg tid=[9] msgs in q=[19]....
    Got msg tid=[8] msgs in q=[18]....
    Got msg tid=[7] msgs in q=[17]....
    Got msg tid=[6] msgs in q=[16]....
    Got msg tid=[4] msgs in q=[15]....
    Got msg tid=[5] msgs in q=[14]....
    Got msg tid=[3] msgs in q=[13]....
    Got msg tid=[2] msgs in q=[12]....
    Got msg tid=[1] msgs in q=[11]....
    Got msg tid=[0] msgs in q=[10]....
    Threads busy: 10 msg 20/30
    Got msg tid=[9] msgs in q=[9]....
    Got msg tid=[8] msgs in q=[8]....
    Got msg tid=[7] msgs in q=[7]....
    Got msg tid=[6] msgs in q=[6]....
    Got msg tid=[4] msgs in q=[5]....
    Got msg tid=[5] msgs in q=[4]....
    Got msg tid=[3] msgs in q=[3]....
    Got msg tid=[2] msgs in q=[2]....
    Got msg tid=[1] msgs in q=[1]....
    Got msg tid=[0] msgs in q=[0]....
    server: Messages 30 sent successfully
    
    real    0m4.029s
    user    0m0.002s
    sys     0m0.003s
    ​

    It is expected that with POLLEXCL flag, all 10 processes would be busy too, because in queue there are messages, messages should be balanced across other pids which are actually doing poll() on the same queue. Notification delivery shall happen in one-by-one fashion to polling PIDs, at each queue level change, new waiter is notified with POLLIN event.

    During testing environment setting POLLEXCL_POLICY=ONE was used.

    Test system:

    $ oslevel
    7.3.0.0


  • 2.  RE: POLLEXCL issue - only one process ever is notified when message queue is full and several processes are polling

    Posted Wed February 02, 2022 09:57 AM
    IBM is looking into this issue with the provided testcase.

    ------------------------------
    Matt Ochs
    ------------------------------



  • 3.  RE: POLLEXCL issue - only one process ever is notified when message queue is full and several processes are polling

    Posted Tue February 15, 2022 03:43 PM
    A fix for this issue will be included in the next AIX v7.3 Service Pack. Please reference APAR IJ37827.

    ------------------------------
    Matt Ochs
    ------------------------------



  • 4.  RE: POLLEXCL issue - only one process ever is notified when message queue is full and several processes are polling

    Posted Thu February 17, 2022 01:49 PM
    Thanks Matt, really appreciate quick response & solution.

    ------------------------------
    Madars Vitolins
    ------------------------------