home *** CD-ROM | disk | FTP | other *** search
/ Tools / WinSN5.0Ver.iso / NETSCAP.50 / WIN1998.ZIP / ns / nsprpub / pr / src / io / prmwait.c < prev    next >
Encoding:
C/C++ Source or Header  |  1998-04-08  |  22.7 KB  |  703 lines

  1. /* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
  2. /*
  3.  * The contents of this file are subject to the Netscape Public License
  4.  * Version 1.0 (the "NPL"); you may not use this file except in
  5.  * compliance with the NPL.  You may obtain a copy of the NPL at
  6.  * http://www.mozilla.org/NPL/
  7.  * 
  8.  * Software distributed under the NPL is distributed on an "AS IS" basis,
  9.  * WITHOUT WARRANTY OF ANY KIND, either express or implied. See the NPL
  10.  * for the specific language governing rights and limitations under the
  11.  * NPL.
  12.  * 
  13.  * The Initial Developer of this code under the NPL is Netscape
  14.  * Communications Corporation.  Portions created by Netscape are
  15.  * Copyright (C) 1998 Netscape Communications Corporation.  All Rights
  16.  * Reserved.
  17.  */
  18.  
  19. #include "prlog.h"
  20. #include "prmem.h"
  21. #include "primpl.h"
  22. #include "prmwait.h"
  23. #include "prerror.h"
  24. #include "pprmwait.h"
  25.  
  26. static PRLock *mw_lock = NULL;
  27. static _PRGlobalState *mw_state = NULL;
  28.  
  29. static PRIntervalTime max_polling_interval;
  30.  
  31. /******************************************************************/
  32. /******************************************************************/
  33. /************************ The private portion *********************/
  34. /******************************************************************/
  35. /******************************************************************/
  36. static PRStatus MW_Init(void)
  37. {
  38.     if (NULL != mw_lock) return PR_SUCCESS;
  39.     if (NULL != (mw_lock = PR_NewLock()))
  40.     {
  41.         _PRGlobalState *state = PR_NEWZAP(_PRGlobalState);
  42.         if (state == NULL) goto failed;
  43.  
  44.         PR_INIT_CLIST(&state->group_list);
  45.  
  46.         PR_Lock(mw_lock);
  47.         if (NULL == mw_state)  /* is it still NULL? */
  48.         {
  49.             mw_state = state;  /* if yes, set our value */
  50.             state = NULL;  /* and indicate we've done the job */
  51.             max_polling_interval = PR_MillisecondsToInterval(MAX_POLLING_INTERVAL);
  52.         }
  53.         PR_Unlock(mw_lock);
  54.         if (NULL != state) PR_DELETE(state);
  55.         return PR_SUCCESS;
  56.     }
  57.  
  58. failed:
  59.     return PR_FAILURE;
  60. }  /* MW_Init */
  61.  
  62. static PRWaitGroup *MW_Init2(void)
  63. {
  64.     PRWaitGroup *group = mw_state->group;  /* it's the null group */
  65.     if (NULL == group)  /* there is this special case */
  66.     {
  67.         group = PR_CreateWaitGroup(_PR_DEFAULT_HASH_LENGTH);
  68.         if (NULL == group) goto failed_alloc;
  69.         PR_Lock(mw_lock);
  70.         if (NULL == mw_state->group)
  71.         {
  72.             mw_state->group = group;
  73.             group = NULL;
  74.         }
  75.         PR_Unlock(mw_lock);
  76.         if (group != NULL) (void)PR_DestroyWaitGroup(group);
  77.         group = mw_state->group;  /* somebody beat us to it */
  78.     }
  79. failed_alloc:
  80.     return group;  /* whatever */
  81. }  /* MW_Init2 */
  82.  
  83. static _PR_HashStory MW_AddHashInternal(PRRecvWait *desc, _PRWaiterHash *hash)
  84. {
  85.     /*
  86.     ** The entries are put in the table using the fd (PRFileDesc*) of
  87.     ** the receive descriptor as the key. This allows us to locate
  88.     ** the appropriate entry aqain when the poll operation finishes.
  89.     **
  90.     ** The pointer to the file descriptor object is first divided by
  91.     ** the natural alignment of a pointer in the belief that object
  92.     ** will have at least that many zeros in the low order bits.
  93.     ** This may not be a good assuption.
  94.     **
  95.     ** We try to put the entry in by rehashing three times. After
  96.     ** that we declare defeat and force the table to be reconstructed.
  97.     ** Since some fds might be added more than once, won't that cause
  98.     ** collisions even in an empty table?
  99.     */
  100.     PRIntn rehash = 11;
  101.     PRRecvWait **waiter;
  102.     PRUintn hidx = _MW_HASH(desc->fd, hash->length);
  103.     do
  104.     {
  105.         waiter = &hash->recv_wait;
  106.         if (NULL == waiter[hidx])
  107.         {
  108.             waiter[hidx] = desc;
  109.             hash->count += 1;
  110. #if 0
  111.             printf("Adding 0x%x->0x%x ", desc, desc->fd);
  112.             printf(
  113.                 "table[%u:%u:*%u]: 0x%x->0x%x\n",
  114.                 hidx, hash->count, hash->length, waiter[hidx], waiter[hidx]->fd);
  115. #endif
  116.             return _prmw_success;
  117.         }
  118.         if (desc == waiter[hidx])
  119.         {
  120.             PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0);  /* desc already in table */
  121.             return _prmw_error;
  122.         }
  123. #if 0
  124.         printf("Failing 0x%x->0x%x ", desc, desc->fd);
  125.         printf(
  126.             "table[*%u:%u:%u]: 0x%x->0x%x\n",
  127.             hidx, hash->count, hash->length, waiter[hidx], waiter[hidx]->fd);
  128. #endif
  129.         hidx = _MW_REHASH(desc->fd, hidx, hash->length);
  130.     } while (--rehash > 0);
  131.     return _prmw_rehash;    
  132. }  /* MW_AddHashInternal */
  133.  
  134. static _PR_HashStory MW_ExpandHashInternal(PRWaitGroup *group)
  135. {
  136.     PRRecvWait **desc;
  137.     PRUint32 pidx, length = 0;
  138.     _PRWaiterHash *newHash, *oldHash = group->waiter;
  139.     
  140.  
  141.     static const PRInt32 prime_number[] = {
  142.         _PR_DEFAULT_HASH_LENGTH, 179, 521, 907, 1427,
  143.         2711, 3917, 5021, 8219, 11549, 18911, 26711, 33749, 44771};
  144.     PRUintn primes = (sizeof(prime_number) / sizeof(PRIntn));
  145.  
  146.     /* look up the next size we'd like to use for the hash table */
  147.     for (pidx = 0; pidx < primes; ++pidx)
  148.     {
  149.         if (prime_number[pidx] == oldHash->length)
  150.         {
  151.             length = prime_number[pidx + 1];
  152.             break;
  153.         }
  154.     }
  155.     if (0 == length)
  156.     {
  157.         PR_SetError(PR_OUT_OF_MEMORY_ERROR, 0);
  158.         return _prmw_error;  /* we're hosed */
  159.     }
  160.  
  161.     /* allocate the new hash table and fill it in with the old */
  162.     newHash = (_PRWaiterHash*)PR_CALLOC(
  163.         sizeof(_PRWaiterHash) + (length * sizeof(PRRecvWait*)));
  164.  
  165.     newHash->length = length;
  166.     for (desc = &oldHash->recv_wait; newHash->count < oldHash->count; ++desc)
  167.     {
  168.         if (NULL != *desc)
  169.         {
  170.             if (_prmw_success != MW_AddHashInternal(*desc, newHash))
  171.             {
  172.                 PR_ASSERT(!"But, but, but ...");
  173.                 PR_DELETE(newHash);
  174.                 return _prmw_error;
  175.             }
  176.         }
  177.     }
  178.     PR_DELETE(group->waiter);
  179.     group->waiter = newHash;
  180.     return _prmw_success;
  181. }  /* MW_ExpandHashInternal */
  182.  
  183. static void _MW_DoneInternal(
  184.     PRWaitGroup *group, PRRecvWait **waiter, PRMWStatus outcome)
  185. {
  186.     /*
  187.     ** Add this receive wait object to the list of finished I/O
  188.     ** operations for this particular group. If there are other
  189.     ** threads waiting on the group, notify one. If not, arrange
  190.     ** for this thread to return.
  191.     */
  192.  
  193. #if 0
  194.     printf("Removing 0x%x->0x%x\n", *waiter, (*waiter)->fd);
  195. #endif
  196.     (*waiter)->outcome = outcome;
  197.     PR_APPEND_LINK(&((*waiter)->internal), &group->io_ready);
  198.     PR_NotifyCondVar(group->io_complete);
  199.     PR_ASSERT(0 != group->waiter->count);
  200.     group->waiter->count -= 1;
  201.     *waiter = NULL;
  202. }  /* _MW_DoneInternal */
  203.  
  204. static PRRecvWait **_MW_LookupInternal(PRWaitGroup *group, PRFileDesc *fd)
  205. {
  206.     /*
  207.     ** Find the receive wait object corresponding to the file descriptor.
  208.     ** Only search the wait group specified.
  209.     */
  210.     PRRecvWait **desc;
  211.     PRIntn rehash = 11;
  212.     _PRWaiterHash *hash = group->waiter;
  213.     PRUintn hidx = _MW_HASH(fd, hash->length);
  214.     
  215.     while (rehash-- > 0)
  216.     {
  217.         desc = (&hash->recv_wait) + hidx;
  218.         if ((*desc != NULL) && ((*desc)->fd == fd)) return desc;
  219.         hidx = _MW_REHASH(fd, hidx, hash->length);
  220.     }
  221.     return NULL;
  222. }  /* _MW_LookupInternal */
  223.  
  224. static PRStatus _MW_PollInternal(PRWaitGroup *group)
  225. {
  226.     PRRecvWait **waiter;
  227.     PRStatus rv = PR_FAILURE;
  228.     PRUintn count, count_ready;
  229.     PRIntervalTime polling_interval;
  230.  
  231.     group->poller = PR_GetCurrentThread();
  232.  
  233.     PR_Unlock(group->ml);
  234.  
  235.     while (PR_TRUE)
  236.     {
  237.         PRIntervalTime now, since_last_poll;
  238.         PRPollDesc *poll_list = group->polling_list;
  239.         /*
  240.         ** There's something to do. See if our existing polling list
  241.         ** is large enough for what we have to do?
  242.         */
  243.  
  244.         while (group->polling_count < group->waiter->count)
  245.         {
  246.             PRUint32 old_count = group->waiter->count;
  247.             PRUint32 new_count = PR_ROUNDUP(old_count, _PR_POLL_COUNT_FUDGE);
  248.             PRSize new_size = sizeof(PRPollDesc) * new_count;
  249.             poll_list = (PRPollDesc*)PR_CALLOC(new_size);
  250.             if (NULL == poll_list) goto failed_alloc;
  251.             if (NULL != group->polling_list)
  252.                 PR_DELETE(group->polling_list);
  253.             group->polling_list = poll_list;
  254.             group->polling_count = new_count;
  255.         }
  256.  
  257.         now = PR_IntervalNow();
  258.         polling_interval = max_polling_interval;
  259.         since_last_poll = now - group->last_poll;
  260.         PR_Lock(group->ml);
  261.         waiter = &group->waiter->recv_wait;
  262.  
  263.         for (count = 0; count < group->waiter->count; ++waiter)
  264.         {
  265.             if (NULL != *waiter)  /* a live one! */
  266.             {
  267.                 if (since_last_poll >= (*waiter)->timeout)
  268.                     _MW_DoneInternal(group, waiter, PR_MW_TIMEOUT);
  269.                 else
  270.                 {
  271.                     if (PR_INTERVAL_NO_TIMEOUT != (*waiter)->timeout)
  272.                     {
  273.                         (*waiter)->timeout -= since_last_poll;
  274.                         if ((*waiter)->timeout < polling_interval)
  275.                             polling_interval = (*waiter)->timeout;
  276.                     }
  277.                     poll_list->fd = (*waiter)->fd;
  278.                     poll_list->in_flags = PR_POLL_READ;
  279.                     poll_list->out_flags = 0;
  280. #if 0
  281.                     printf(
  282.                         "Polling 0x%x[%d]: [fd: 0x%x, tmo: %u]\n",
  283.                         poll_list, count, poll_list->fd, (*waiter)->timeout);
  284. #endif
  285.                     poll_list += 1;
  286.                     count += 1;
  287.                 }
  288.             }
  289.         } 
  290.  
  291.         PR_ASSERT(count == group->waiter->count);
  292.         if (0 == count) break;
  293.  
  294.         group->last_poll = now;
  295.  
  296.         PR_Unlock(group->ml);
  297.  
  298.         count_ready = PR_Poll(group->polling_list, count, polling_interval);
  299.  
  300.         PR_Lock(group->ml);
  301.  
  302.         if (-1 == count_ready) goto failed_poll;  /* that's a shame */
  303.         for (poll_list = group->polling_list; count > 0; poll_list++, count--)
  304.         {
  305.             if (poll_list->out_flags != 0)
  306.             {
  307.                 waiter = _MW_LookupInternal(group, poll_list->fd);
  308.                 if (NULL != waiter)
  309.                     _MW_DoneInternal(group, waiter, PR_MW_SUCCESS);
  310.             }
  311.         }
  312.         /*
  313.         ** If there are no more threads waiting for completion,
  314.         ** we need to return.
  315.         ** This thread was "borrowed" to do the polling, but it really
  316.         ** belongs to the client.
  317.         */
  318.         if ((_prmw_running != group->state)
  319.         || (0 == group->waiting_threads)) break;
  320.         PR_Unlock(group->ml);
  321.     }
  322.  
  323.     rv = PR_SUCCESS;
  324.  
  325. failed_poll:
  326. failed_alloc:
  327.     group->poller = NULL;  /* we were that, not we ain't */
  328.     return rv;  /* we return with the lock held */
  329. }  /* _MW_PollInternal */
  330.  
  331. static PRMWGroupState MW_TestForShutdownInternal(PRWaitGroup *group)
  332. {
  333.     PRMWGroupState rv = group->state;
  334.     /*
  335.     ** Looking at the group's fields is safe because
  336.     ** once the group's state is no longer running, it
  337.     ** cannot revert and there is a safe check on entry
  338.     ** to make sure no more threads are made to wait.
  339.     */
  340.     if ((_prmw_stopping == rv)
  341.     && (0 == group->waiting_threads)
  342.     && PR_CLIST_IS_EMPTY(&group->io_ready)
  343.     && (0 == group->waiter->count))
  344.     {
  345.         rv = group->state = _prmw_stopped;
  346.         PR_NotifyCondVar(group->mw_manage);
  347.     }
  348.     return rv;
  349. }  /* MW_TestForShutdownInternal */
  350.  
  351. static void _MW_InitialRecv(PRCList *io_ready)
  352. {
  353.     PRRecvWait *desc = (PRRecvWait*)io_ready;
  354.     if ((NULL == desc->buffer.start)
  355.     || (0 == desc->buffer.length))
  356.         desc->bytesRecv = 0;
  357.     else
  358.     {
  359.         desc->bytesRecv = desc->fd->methods->recv(
  360.             desc->fd, desc->buffer.start,
  361.             desc->buffer.length, 0, desc->timeout);
  362.         if (desc->bytesRecv < 0)  /* SetError should already be there */
  363.             desc->outcome = PR_MW_FAILURE;
  364.     }
  365. }  /* _MW_InitialRecv */
  366.  
  367. /******************************************************************/
  368. /******************************************************************/
  369. /********************** The public API portion ********************/
  370. /******************************************************************/
  371. /******************************************************************/
  372. PR_IMPLEMENT(PRStatus) PR_AddWaitFileDesc(
  373.     PRWaitGroup *group, PRRecvWait *desc)
  374. {
  375.     _PR_HashStory hrv;
  376.     PRStatus rv = PR_FAILURE;
  377.     if (PR_FAILURE == MW_Init()) goto failed_init;
  378.     if ((NULL == group) && (NULL == (group = MW_Init2()))) goto failed_init;
  379.  
  380.     PR_ASSERT(NULL != desc->fd);
  381.  
  382.     desc->outcome = PR_MW_PENDING;  /* nice, well known value */
  383.     desc->bytesRecv = 0;  /* likewise, though this value is ambiguious */
  384.  
  385.     PR_Lock(group->ml);
  386.  
  387.     if (_prmw_running != group->state)
  388.     {
  389.         /* Not allowed to add after cancelling the group */
  390.         desc->outcome = PR_MW_INTERRUPT;
  391.         PR_SetError(PR_INVALID_STATE_ERROR, 0);
  392.         goto invalid_state;
  393.     }
  394.  
  395.     /*
  396.     ** If the waiter count is zero at this point, there's no telling
  397.     ** how long we've been idle. Therefore, initialize the beginning
  398.     ** of the timing interval. As long as the list doesn't go empty,
  399.     ** it will maintain itself.
  400.     */
  401.     if (0 == group->waiter->count)
  402.         group->last_poll = PR_IntervalNow();
  403.     do
  404.     {
  405.         hrv = MW_AddHashInternal(desc, group->waiter);
  406.         if (_prmw_rehash != hrv) break;
  407.         hrv = MW_ExpandHashInternal(group);  /* gruesome */
  408.         if (_prmw_success != hrv) break;
  409.     } while (PR_TRUE);
  410.  
  411.     PR_NotifyCondVar(group->new_business);  /* tell the world */
  412.     rv = (_prmw_success == hrv) ? PR_SUCCESS : PR_FAILURE;
  413.     
  414. failed_init:
  415. invalid_state:
  416.     PR_Unlock(group->ml);
  417.     return rv;
  418. }  /* PR_AddWaitFileDesc */
  419.  
  420. PR_IMPLEMENT(PRRecvWait*) PR_WaitRecvReady(PRWaitGroup *group)
  421. {
  422.     PRStatus rv = PR_SUCCESS;
  423.     PRCList *io_ready = NULL;
  424.     if (PR_FAILURE == MW_Init()) goto failed_init;
  425.     if ((NULL == group) && (NULL == (group = MW_Init2()))) goto failed_init;
  426.  
  427.     PR_Lock(group->ml);
  428.  
  429.     if (_prmw_running != group->state)
  430.     {
  431.         PR_SetError(PR_INVALID_STATE_ERROR, 0);
  432.         goto invalid_state;
  433.     }
  434.  
  435.     group->waiting_threads += 1;  /* the polling thread is counted */
  436.  
  437.     do
  438.     {
  439.         /*
  440.         ** If the I/O ready list isn't empty, have this thread
  441.         ** return with the first receive wait object that's available.
  442.         */
  443.         if (PR_CLIST_IS_EMPTY(&group->io_ready))
  444.         {
  445.             while ((NULL == group->waiter) || (0 == group->waiter->count))
  446.             {
  447.                 if (_prmw_running != group->state) goto aborted;
  448.                 rv = PR_WaitCondVar(group->new_business, PR_INTERVAL_NO_TIMEOUT);
  449.                 if (_MW_ABORTED(rv)) goto aborted;
  450.             }
  451.  
  452.             /*
  453.             ** Is there a polling thread yet? If not, grab this thread
  454.             ** and use it.
  455.             */
  456.             if (NULL == group->poller)
  457.             {
  458.                 /*
  459.                 ** This thread will stay do polling until it becomes the only one
  460.                 ** left to service a completion. Then it will return and there will
  461.                 ** be none left to actually poll or to run completions.
  462.                 **
  463.                 ** The polling function should only return w/ failure or
  464.                 ** with some I/O ready.
  465.                 */
  466.                 if (PR_FAILURE == _MW_PollInternal(group)) goto failed_poll;
  467.                 if (PR_CLIST_IS_EMPTY(&group->io_ready)) continue;  /* timeout */
  468.             }
  469.             else
  470.             {
  471.                 while (PR_CLIST_IS_EMPTY(&group->io_ready))
  472.                 {
  473.                     rv = PR_WaitCondVar(group->io_complete, PR_INTERVAL_NO_TIMEOUT);
  474.                     if (_MW_ABORTED(rv)) goto aborted;
  475.                 }
  476.             }
  477.         }
  478.         io_ready = PR_LIST_HEAD(&group->io_ready);
  479.         PR_NotifyCondVar(group->io_taken);
  480.         PR_ASSERT(io_ready != NULL);
  481.         PR_REMOVE_LINK(io_ready);
  482.  
  483.         /* If the operation failed, record the reason why */
  484.         switch (((PRRecvWait*)io_ready)->outcome)
  485.         {
  486.             case PR_MW_PENDING:
  487.                 PR_ASSERT(PR_MW_PENDING != ((PRRecvWait*)io_ready)->outcome);
  488.                 break;
  489.             case PR_MW_SUCCESS:
  490.                 _MW_InitialRecv(io_ready);
  491.                 break;
  492.             case PR_MW_TIMEOUT:
  493.                 PR_SetError(PR_IO_TIMEOUT_ERROR, 0);
  494.                 break;
  495.             case PR_MW_INTERRUPT:
  496.                 PR_SetError(PR_PENDING_INTERRUPT_ERROR, 0);
  497.                 break;
  498.             default: break;
  499.         }
  500.     } while (NULL == io_ready);
  501.  
  502. aborted:
  503. failed_poll:
  504.     group->waiting_threads -= 1;
  505. invalid_state:
  506.     (void)MW_TestForShutdownInternal(group);
  507.     PR_Unlock(group->ml);
  508.  
  509. failed_init:
  510.  
  511.     return (PRRecvWait*)io_ready;
  512. }  /* PR_WaitRecvReady */
  513.  
  514. PR_IMPLEMENT(PRStatus) PR_CancelWaitFileDesc(PRWaitGroup *group, PRRecvWait *desc)
  515. {
  516.     PRRecvWait **recv_wait;
  517.     PRStatus rv = PR_SUCCESS;
  518.     if (PR_FAILURE == MW_Init()) return rv;
  519.     if (NULL == group) group = mw_state->group;
  520.     PR_ASSERT(NULL != group);
  521.     if (NULL == group)
  522.     {
  523.         PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0);
  524.         return PR_FAILURE;
  525.     }
  526.  
  527.     PR_Lock(group->ml);
  528.  
  529.     if (_prmw_running != group->state)
  530.     {
  531.         PR_SetError(PR_INVALID_STATE_ERROR, 0);
  532.         rv = PR_FAILURE;
  533.         goto stopping;
  534.     }
  535.  
  536.     if (NULL != (recv_wait = _MW_LookupInternal(group, desc->fd)))
  537.     {
  538.         /* it was in the wait table */
  539.         _MW_DoneInternal(group, recv_wait, PR_MW_INTERRUPT);
  540.         goto found;
  541.     }
  542.     if (!PR_CLIST_IS_EMPTY(&group->io_ready))
  543.     {
  544.         /* is it already complete? */
  545.         PRCList *head = PR_LIST_HEAD(&group->io_ready);
  546.         do
  547.         {
  548.             PRRecvWait *done = (PRRecvWait*)head;
  549.             if (done == desc) goto found;
  550.             head = PR_NEXT_LINK(head);
  551.         } while (head != &group->io_ready);
  552.     }
  553.     PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0);
  554.     rv = PR_FAILURE;
  555.  
  556. found:
  557. stopping:
  558.     PR_Unlock(group->ml);
  559.     return rv;
  560. }  /* PR_CancelWaitFileDesc */
  561.  
  562. PR_IMPLEMENT(PRRecvWait*) PR_CancelWaitGroup(PRWaitGroup *group)
  563. {
  564.     PRRecvWait **desc;
  565.     PRRecvWait *recv_wait = NULL;
  566.     if (PR_FAILURE == MW_Init())
  567.     {
  568.         PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0);
  569.         return NULL;
  570.     }
  571.     if (NULL == group) group = mw_state->group;
  572.     PR_ASSERT(NULL != group);
  573.     if (NULL == group)
  574.     {
  575.         PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0);
  576.         return NULL;
  577.     }
  578.  
  579.     PR_Lock(group->ml);
  580.     if (_prmw_stopped != group->state)
  581.     {
  582.         if (_prmw_running == group->state)
  583.             group->state = _prmw_stopping;  /* so nothing new comes in */
  584.         if (0 == group->waiting_threads)  /* is there anybody else? */
  585.             group->state = _prmw_stopped;  /* we can stop right now */
  586.         while (_prmw_stopped != group->state)
  587.             (void)PR_WaitCondVar(group->mw_manage, PR_INTERVAL_NO_TIMEOUT);
  588.  
  589.         /* make all the existing descriptors look done/interrupted */
  590.         for (desc = &group->waiter->recv_wait; group->waiter->count > 0; ++desc)
  591.         {
  592.             if (NULL != *desc)
  593.                 _MW_DoneInternal(group, desc, PR_MW_INTERRUPT);
  594.         }
  595.  
  596.         PR_NotifyAllCondVar(group->new_business);
  597.     }
  598.  
  599.     /* take first element of finished list and return it or NULL */
  600.     if (PR_CLIST_IS_EMPTY(&group->io_ready))
  601.         PR_SetError(PR_GROUP_EMPTY_ERROR, 0);
  602.     else
  603.     {
  604.         PRCList *head = PR_LIST_HEAD(&group->io_ready);
  605.         PR_REMOVE_AND_INIT_LINK(head);
  606.         recv_wait = (PRRecvWait*)head;
  607.     }
  608.     PR_Unlock(group->ml);
  609.  
  610.     return recv_wait;
  611. }  /* PR_CancelWaitGroup */
  612.  
  613. PR_IMPLEMENT(PRWaitGroup*) PR_CreateWaitGroup(PRInt32 size /* ignored */)
  614. {
  615.     PRWaitGroup *wg = NULL;
  616.     if (PR_FAILURE == MW_Init()) goto failed;
  617.  
  618.     if (NULL == (wg = PR_NEWZAP(PRWaitGroup))) goto failed;
  619.     /* the wait group itself */
  620.     wg->ml = PR_NewLock();
  621.     if (NULL == wg->ml) goto failed_lock;
  622.     wg->io_taken = PR_NewCondVar(wg->ml);
  623.     if (NULL == wg->io_taken) goto failed_cvar0;
  624.     wg->io_complete = PR_NewCondVar(wg->ml);
  625.     if (NULL == wg->io_complete) goto failed_cvar1;
  626.     wg->new_business = PR_NewCondVar(wg->ml);
  627.     if (NULL == wg->new_business) goto failed_cvar2;
  628.     wg->mw_manage = PR_NewCondVar(wg->ml);
  629.     if (NULL == wg->mw_manage) goto failed_cvar3;
  630.  
  631.     PR_INIT_CLIST(&wg->group_link);
  632.     PR_INIT_CLIST(&wg->io_ready);
  633.  
  634.     /* the waiters sequence */
  635.     wg->waiter = (_PRWaiterHash*)PR_CALLOC(
  636.         sizeof(_PRWaiterHash) +
  637.         (_PR_DEFAULT_HASH_LENGTH * sizeof(PRRecvWait*)));
  638.     if (NULL == wg->waiter) goto failed_waiter;
  639.     wg->waiter->count = 0;
  640.     wg->waiter->length = _PR_DEFAULT_HASH_LENGTH;
  641.  
  642.     PR_Lock(mw_lock);
  643.     PR_APPEND_LINK(&wg->group_link, &mw_state->group_list);
  644.     PR_Unlock(mw_lock);
  645.     return wg;
  646.  
  647. failed_waiter:
  648.     PR_DestroyCondVar(wg->mw_manage);
  649. failed_cvar3:
  650.     PR_DestroyCondVar(wg->new_business);
  651. failed_cvar2:
  652.     PR_DestroyCondVar(wg->io_taken);
  653. failed_cvar1:
  654.     PR_DestroyCondVar(wg->io_complete);
  655. failed_cvar0:
  656.     PR_DestroyLock(wg->ml);
  657. failed_lock:
  658.     PR_DELETE(wg);
  659.  
  660. failed:
  661.     return wg;
  662. }  /* MW_CreateWaitGroup */
  663.  
  664. PR_IMPLEMENT(PRStatus) PR_DestroyWaitGroup(PRWaitGroup *group)
  665. {
  666.     PRStatus rv = PR_SUCCESS;
  667.     if (NULL == group) group = mw_state->group;
  668.     PR_ASSERT(NULL != group);
  669.     if (NULL != group)
  670.     {
  671.         if (_prmw_stopped != group->state)  /* quick, unsafe test */
  672.         {
  673.             PRMWGroupState mws;
  674.             /* One shot to correct the situation */
  675.             PR_Lock(group->ml);
  676.             if (group->state < _prmw_stopped)  /* safer test */
  677.                 group->state = _prmw_stopping;
  678.             mws = MW_TestForShutdownInternal(group);
  679.             PR_Unlock(group->ml);
  680.             if (_prmw_stopped != mws)  /* quick test again */
  681.             {
  682.                 PR_SetError(PR_INVALID_STATE_ERROR, 0);
  683.                 return PR_FAILURE;
  684.             }
  685.         }
  686.  
  687.         PR_Lock(mw_lock);
  688.         PR_REMOVE_LINK(&group->group_link);
  689.         PR_Unlock(mw_lock);
  690.  
  691.         PR_DELETE(group->waiter);
  692.         PR_DestroyCondVar(group->new_business);
  693.         PR_DestroyCondVar(group->io_complete);
  694.         PR_DestroyCondVar(group->io_taken);
  695.         PR_DestroyLock(group->ml);
  696.         if (group == mw_state->group) mw_state->group = NULL;
  697.         PR_DELETE(group);
  698.     }
  699.     return rv;
  700. }  /* PR_DestroyWaitGroup */
  701.  
  702. /* prmwait.c */
  703.