home *** CD-ROM | disk | FTP | other *** search
/ Tools / WinSN5.0Ver.iso / NETSCAP.50 / WIN1998.ZIP / ns / nsprpub / pr / tests / multiwait.c < prev    next >
Encoding:
C/C++ Source or Header  |  1998-04-08  |  21.1 KB  |  662 lines

  1. /* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
  2. /*
  3.  * The contents of this file are subject to the Netscape Public License
  4.  * Version 1.0 (the "NPL"); you may not use this file except in
  5.  * compliance with the NPL.  You may obtain a copy of the NPL at
  6.  * http://www.mozilla.org/NPL/
  7.  * 
  8.  * Software distributed under the NPL is distributed on an "AS IS" basis,
  9.  * WITHOUT WARRANTY OF ANY KIND, either express or implied. See the NPL
  10.  * for the specific language governing rights and limitations under the
  11.  * NPL.
  12.  * 
  13.  * The Initial Developer of this code under the NPL is Netscape
  14.  * Communications Corporation.  Portions created by Netscape are
  15.  * Copyright (C) 1998 Netscape Communications Corporation.  All Rights
  16.  * Reserved.
  17.  */
  18.  
  19. #include "prio.h"
  20. #include "prprf.h"
  21. #include "prlog.h"
  22. #include "prmem.h"
  23. #include "pratom.h"
  24. #include "prlock.h"
  25. #include "prmwait.h"
  26. #include "prclist.h"
  27. #include "prerror.h"
  28. #include "prinrval.h"
  29. #include "prnetdb.h"
  30. #include "prthread.h"
  31.  
  32. #include "plstr.h"
  33. #include "plerror.h"
  34. #include "plgetopt.h"
  35.  
  36. typedef struct Shared
  37. {
  38.     const char *title;
  39.     PRLock *list_lock;
  40.     PRWaitGroup *group;
  41.     PRIntervalTime timeout;
  42. } Shared;
  43.  
  44. typedef enum Verbosity {silent, quiet, chatty, noisy} Verbosity;
  45.  
  46. static PRUint32 identity = 0;
  47. static PRFileDesc *debug = NULL;
  48. static PRInt32 desc_allocated = 0;
  49. static PRUint16 default_port = 12273;
  50. static enum Verbosity verbosity = quiet;
  51. static PRInt32 ops_required = 1000, ops_done = 0;
  52. static PRThreadScope thread_scope = PR_LOCAL_THREAD;
  53. static PRIntn client_threads = 20, worker_threads = 2, wait_objects = 50;
  54.  
  55. #if defined(DEBUG)
  56. #define MW_ASSERT(_expr) \
  57.     ((_expr)?((void)0):_MW_Assert(# _expr,__FILE__,__LINE__))
  58. static void _MW_Assert(const char *s, const char *file, PRIntn ln)
  59. {
  60.     if (NULL != debug) PL_FPrintError(debug, NULL);
  61.     PR_Assert(s, file, ln);
  62. }  /* _MW_Assert */
  63. #else
  64. #define MW_ASSERT(_expr)
  65. #endif
  66.  
  67. static void PrintRecvDesc(PRRecvWait *desc, const char *msg)
  68. {
  69.     const char *tag[] = {
  70.         "PR_MW_INTERRUPT", "PR_MW_TIMEOUT",
  71.         "PR_MW_FAILURE", "PR_MW_SUCCESS", "PR_MW_PENDING"};
  72.     PR_fprintf(
  73.         debug, "%s: PRRecvWait(@0x%x): {fd: 0x%x, outcome: %s, tmo: %u}\n",
  74.         msg, desc, desc->fd, tag[desc->outcome + 3], desc->timeout);
  75. }  /* PrintRecvDesc */
  76.  
  77. static Shared *MakeShared(const char *title)
  78. {
  79.     Shared *shared = PR_NEWZAP(Shared);
  80.     shared->group = PR_CreateWaitGroup(1);
  81.     shared->timeout = PR_SecondsToInterval(1);
  82.     shared->list_lock = PR_NewLock();
  83.     shared->title = title;
  84.     return shared;
  85. }  /* MakeShared */
  86.  
  87. static void DestroyShared(Shared *shared)
  88. {
  89.     PRStatus rv;
  90.     if (verbosity > quiet)
  91.         PR_fprintf(debug, "%s: destroying group\n", shared->title);
  92.     rv = PR_DestroyWaitGroup(shared->group);
  93.     MW_ASSERT(PR_SUCCESS == rv);
  94.     PR_DestroyLock(shared->list_lock);
  95.     PR_DELETE(shared);
  96. }  /* DestroyShared */
  97.  
  98. static PRRecvWait *CreateRecvWait(PRFileDesc *fd, PRIntervalTime timeout)
  99. {
  100.     PRRecvWait *desc_out = PR_NEWZAP(PRRecvWait);
  101.     MW_ASSERT(NULL != desc_out);
  102.  
  103.     MW_ASSERT(NULL != fd);
  104.     desc_out->fd = fd;
  105.     desc_out->timeout = timeout;
  106.     desc_out->buffer.length = 120;
  107.     desc_out->buffer.start = PR_CALLOC(120);
  108.  
  109.     PR_AtomicIncrement(&desc_allocated);
  110.  
  111.     if (verbosity > chatty)
  112.         PrintRecvDesc(desc_out, "Allocated");
  113.     return desc_out;
  114. }  /* CreateRecvWait */
  115.  
  116. static void DestroyRecvWait(Shared *shared, PRRecvWait *desc_out)
  117. {
  118.     if (verbosity > chatty)
  119.         PrintRecvDesc(desc_out, "Destroying");
  120.     PR_Close(desc_out->fd);
  121.     if (NULL != desc_out->buffer.start)
  122.         PR_DELETE(desc_out->buffer.start);
  123.     PR_Free(desc_out);
  124.     (void)PR_AtomicDecrement(&desc_allocated);
  125. }  /* DestroyRecvWait */
  126.  
  127. static void CancelGroup(Shared *shared)
  128. {
  129.     PRRecvWait *desc_out;
  130.  
  131.     if (verbosity > quiet)
  132.         PR_fprintf(debug, "%s Reclaiming wait descriptors\n", shared->title);
  133.  
  134.     do
  135.     {
  136.         desc_out = PR_CancelWaitGroup(shared->group);
  137.         if (NULL != desc_out) DestroyRecvWait(shared, desc_out);
  138.     } while (NULL != desc_out);
  139.  
  140.     MW_ASSERT(0 == desc_allocated);
  141.     MW_ASSERT(PR_GROUP_EMPTY_ERROR == PR_GetError());
  142. }  /* CancelGroup */
  143.  
  144. static void PR_CALLBACK ClientThread(void* arg)
  145. {
  146.     PRStatus rv;
  147.     PRInt32 bytes;
  148.     PRIntn empty_flags = 0;
  149.     PRNetAddr server_address;
  150.     unsigned char buffer[100];
  151.     Shared *shared = (Shared*)arg;
  152.     PRFileDesc *server = PR_NewTCPSocket();
  153.     if ((NULL == server)
  154.     && (PR_PENDING_INTERRUPT_ERROR == PR_GetError())) return;
  155.     MW_ASSERT(NULL != server);
  156.  
  157.     if (verbosity > chatty)
  158.         PR_fprintf(debug, "%s: Server socket @0x%x\n", shared->title, server);
  159.  
  160.     rv = PR_InitializeNetAddr(PR_IpAddrLoopback, default_port, &server_address);
  161.     MW_ASSERT(PR_SUCCESS == rv);
  162.  
  163.     if (verbosity > quiet)
  164.         PR_fprintf(debug, "%s: Client opening connection\n", shared->title);
  165.     rv = PR_Connect(server, &server_address, PR_INTERVAL_NO_TIMEOUT);
  166.  
  167.     if (PR_FAILURE == rv)
  168.     {
  169.         if (verbosity > silent) PL_FPrintError(debug, "Client connect failed");
  170.         return;
  171.     }
  172.  
  173.     while (ops_done < ops_required)
  174.     {
  175.         bytes = PR_Send(
  176.             server, buffer, sizeof(buffer), empty_flags, PR_INTERVAL_NO_TIMEOUT);
  177.         if ((-1 == bytes) && (PR_PENDING_INTERRUPT_ERROR == PR_GetError())) break;
  178.         MW_ASSERT(sizeof(buffer) == bytes);
  179.         if (verbosity > chatty)
  180.             PR_fprintf(
  181.                 debug, "%s: Client sent %d bytes\n",
  182.                 shared->title, sizeof(buffer));
  183.         bytes = PR_Recv(
  184.             server, buffer, sizeof(buffer), empty_flags, PR_INTERVAL_NO_TIMEOUT);
  185.         if (verbosity > chatty)
  186.             PR_fprintf(
  187.                 debug, "%s: Client received %d bytes\n",
  188.                 shared->title, sizeof(buffer));
  189.         if ((-1 == bytes) && (PR_PENDING_INTERRUPT_ERROR == PR_GetError())) break;
  190.         MW_ASSERT(sizeof(buffer) == bytes);
  191.         PR_Sleep(shared->timeout);
  192.     }
  193.     rv = PR_Close(server);
  194.     MW_ASSERT(PR_SUCCESS == rv);
  195.  
  196. }  /* ClientThread */
  197.  
  198. static void OneInThenCancelled(Shared *shared)
  199. {
  200.     PRStatus rv;
  201.     PRRecvWait *desc_out, *desc_in = PR_NEWZAP(PRRecvWait);
  202.  
  203.     shared->timeout = PR_INTERVAL_NO_TIMEOUT;
  204.  
  205.     desc_in->fd = PR_NewTCPSocket();
  206.     desc_in->timeout = shared->timeout;
  207.  
  208.     if (verbosity > chatty) PrintRecvDesc(desc_in, "Adding desc");
  209.  
  210.     rv = PR_AddWaitFileDesc(shared->group, desc_in);
  211.     MW_ASSERT(PR_SUCCESS == rv);
  212.  
  213.     if (verbosity > chatty) PrintRecvDesc(desc_in, "Cancelling");
  214.     rv = PR_CancelWaitFileDesc(shared->group, desc_in);
  215.     MW_ASSERT(PR_SUCCESS == rv);
  216.  
  217.     desc_out = PR_WaitRecvReady(shared->group);
  218.     MW_ASSERT(desc_out == desc_in);
  219.     MW_ASSERT(PR_MW_INTERRUPT == desc_out->outcome);
  220.     MW_ASSERT(PR_PENDING_INTERRUPT_ERROR == PR_GetError());
  221.     if (verbosity > chatty) PrintRecvDesc(desc_out, "Ready");
  222.  
  223.     rv = PR_Close(desc_in->fd);
  224.     MW_ASSERT(PR_SUCCESS == rv);
  225.  
  226.     if (verbosity > quiet)
  227.         PR_fprintf(debug, "%s: destroying group\n", shared->title);
  228.  
  229.     PR_DELETE(desc_in);
  230. }  /* OneInThenCancelled */
  231.  
  232. static void OneOpOneThread(Shared *shared)
  233. {
  234.     PRStatus rv;
  235.     PRRecvWait *desc_out, *desc_in = PR_NEWZAP(PRRecvWait);
  236.  
  237.     desc_in->fd = PR_NewTCPSocket();
  238.     desc_in->timeout = shared->timeout;
  239.  
  240.     if (verbosity > chatty) PrintRecvDesc(desc_in, "Adding desc");
  241.  
  242.     rv = PR_AddWaitFileDesc(shared->group, desc_in);
  243.     MW_ASSERT(PR_SUCCESS == rv);
  244.     desc_out = PR_WaitRecvReady(shared->group);
  245.     MW_ASSERT(desc_out == desc_in);
  246.     MW_ASSERT(PR_MW_TIMEOUT == desc_out->outcome);
  247.     MW_ASSERT(PR_IO_TIMEOUT_ERROR == PR_GetError());
  248.     if (verbosity > chatty) PrintRecvDesc(desc_out, "Ready");
  249.  
  250.     rv = PR_Close(desc_in->fd);
  251.     MW_ASSERT(PR_SUCCESS == rv);
  252.  
  253.     PR_DELETE(desc_in);
  254. }  /* OneOpOneThread */
  255.  
  256. static void ManyOpOneThread(Shared *shared)
  257. {
  258.     PRStatus rv;
  259.     PRIntn index;
  260.     PRRecvWait *desc_in;
  261.     PRRecvWait *desc_out;
  262.  
  263.     desc_in = (PRRecvWait*)PR_CALLOC(sizeof(PRRecvWait*) * wait_objects);
  264.  
  265.     if (verbosity > quiet)
  266.         PR_fprintf(debug, "%s: adding %d descs\n", shared->title, wait_objects);
  267.  
  268.     for (index = 0; index < wait_objects; ++index)
  269.     {
  270.         desc_in = CreateRecvWait(PR_NewTCPSocket(), shared->timeout);
  271.  
  272.         rv = PR_AddWaitFileDesc(shared->group, desc_in);
  273.         MW_ASSERT(PR_SUCCESS == rv);
  274.     }
  275.  
  276.     while (ops_done < ops_required)
  277.     {
  278.         desc_out = PR_WaitRecvReady(shared->group);
  279.         MW_ASSERT(PR_MW_TIMEOUT == desc_out->outcome);
  280.         MW_ASSERT(PR_IO_TIMEOUT_ERROR == PR_GetError());
  281.         if (verbosity > chatty) PrintRecvDesc(desc_out, "Ready/readding");
  282.         rv = PR_AddWaitFileDesc(shared->group, desc_out);
  283.         MW_ASSERT(PR_SUCCESS == rv);
  284.         (void)PR_AtomicIncrement(&ops_done);
  285.     }
  286.  
  287.     CancelGroup(shared);
  288. }  /* ManyOpOneThread */
  289.  
  290. static void PR_CALLBACK SomeOpsThread(void *arg)
  291. {
  292.     PRRecvWait *desc_out;
  293.     PRStatus rv = PR_SUCCESS;
  294.     Shared *shared = (Shared*)arg;
  295.     do  /* until interrupted */
  296.     {
  297.         desc_out = PR_WaitRecvReady(shared->group);
  298.         if (NULL == desc_out)
  299.         {
  300.             MW_ASSERT(PR_PENDING_INTERRUPT_ERROR == PR_GetError());
  301.             if (verbosity > quiet) PrintRecvDesc(desc_out, "Aborted");
  302.             break;
  303.         }
  304.         MW_ASSERT(PR_MW_TIMEOUT == desc_out->outcome);
  305.         MW_ASSERT(PR_IO_TIMEOUT_ERROR == PR_GetError());
  306.         if (verbosity > chatty) PrintRecvDesc(desc_out, "Ready");
  307.  
  308.         if (verbosity > chatty) PrintRecvDesc(desc_out, "Re-Adding");
  309.         desc_out->timeout = shared->timeout;
  310.         rv = PR_AddWaitFileDesc(shared->group, desc_out);
  311.         PR_AtomicIncrement(&ops_done);
  312.         if (ops_done > ops_required) break;
  313.     } while (PR_SUCCESS == rv);
  314.     MW_ASSERT(PR_SUCCESS == rv);
  315. }  /* SomeOpsThread */
  316.  
  317. static void SomeOpsSomeThreads(Shared *shared)
  318. {
  319.     PRStatus rv;
  320.     PRThread **thread;
  321.     PRIntn index;
  322.     PRRecvWait *desc_in;
  323.  
  324.     thread = (PRThread**)PR_CALLOC(sizeof(PRThread*) * worker_threads);
  325.  
  326.     /* Create some threads */
  327.  
  328.     if (verbosity > quiet)
  329.         PR_fprintf(debug, "%s: creating threads\n", shared->title);
  330.     for (index = 0; index < worker_threads; ++index)
  331.     {
  332.         thread[index] = PR_CreateThread(
  333.             PR_USER_THREAD, SomeOpsThread, shared,
  334.             PR_PRIORITY_HIGH, thread_scope,
  335.             PR_JOINABLE_THREAD, 16 * 1024);
  336.     }
  337.  
  338.     /* then create some operations */
  339.     if (verbosity > quiet)
  340.         PR_fprintf(debug, "%s: creating desc\n", shared->title);
  341.     for (index = 0; index < wait_objects; ++index)
  342.     {
  343.         desc_in = CreateRecvWait(PR_NewTCPSocket(), shared->timeout);
  344.         rv = PR_AddWaitFileDesc(shared->group, desc_in);
  345.         MW_ASSERT(PR_SUCCESS == rv);
  346.     }
  347.  
  348.     if (verbosity > quiet)
  349.         PR_fprintf(debug, "%s: sleeping\n", shared->title);
  350.     while (ops_done < ops_required) PR_Sleep(shared->timeout);
  351.  
  352.     if (verbosity > quiet)
  353.         PR_fprintf(debug, "%s: interrupting/joining threads\n", shared->title);
  354.     for (index = 0; index < worker_threads; ++index)
  355.     {
  356.         rv = PR_Interrupt(thread[index]);
  357.         MW_ASSERT(PR_SUCCESS == rv);
  358.         rv = PR_JoinThread(thread[index]);
  359.         MW_ASSERT(PR_SUCCESS == rv);
  360.     }
  361.     PR_DELETE(thread);
  362.  
  363.     CancelGroup(shared);
  364. }  /* SomeOpsSomeThreads */
  365.  
  366. static PRStatus ServiceRequest(Shared *shared, PRRecvWait *desc)
  367. {
  368.     PRInt32 bytes_out;
  369.  
  370.     if (verbosity > chatty)
  371.         PR_fprintf(
  372.             debug, "%s: Service received %d bytes\n",
  373.             shared->title, desc->bytesRecv);
  374.  
  375.     if (0 == desc->bytesRecv) goto quitting;
  376.     if ((-1 == desc->bytesRecv)
  377.     && (PR_PENDING_INTERRUPT_ERROR == PR_GetError())) goto aborted;
  378.  
  379.     bytes_out = PR_Send(
  380.         desc->fd, desc->buffer.start, desc->bytesRecv, 0, shared->timeout);
  381.     if (verbosity > chatty)
  382.         PR_fprintf(
  383.             debug, "%s: Service sent %d bytes\n",
  384.             shared->title, bytes_out);
  385.  
  386.     if ((-1 == bytes_out)
  387.     && (PR_PENDING_INTERRUPT_ERROR == PR_GetError())) goto aborted;
  388.     MW_ASSERT(bytes_out == desc->bytesRecv);
  389.  
  390.     return PR_SUCCESS;
  391.  
  392. aborted:
  393. quitting:
  394.     return PR_FAILURE;
  395. }  /* ServiceRequest */
  396.  
  397. static void PR_CALLBACK ServiceThread(void *arg)
  398. {
  399.     PRStatus rv = PR_SUCCESS;
  400.     PRRecvWait *desc_out = NULL;
  401.     Shared *shared = (Shared*)arg;
  402.     do  /* until interrupted */
  403.     {
  404.         if (NULL != desc_out)
  405.         {
  406.             desc_out->timeout = PR_INTERVAL_NO_TIMEOUT;
  407.             if (verbosity > chatty)
  408.                 PrintRecvDesc(desc_out, "Service re-adding");
  409.             rv = PR_AddWaitFileDesc(shared->group, desc_out);
  410.             MW_ASSERT(PR_SUCCESS == rv);
  411.         }
  412.  
  413.         desc_out = PR_WaitRecvReady(shared->group);
  414.         if (NULL == desc_out)
  415.         {
  416.             MW_ASSERT(PR_PENDING_INTERRUPT_ERROR == PR_GetError());
  417.             break;
  418.         }
  419.  
  420.         switch (desc_out->outcome)
  421.         {
  422.             case PR_MW_SUCCESS:
  423.             {
  424.                 PR_AtomicIncrement(&ops_done);
  425.                 if (verbosity > quiet)
  426.                     PR_fprintf(
  427.                         debug, "%s: Servicing %u\n", shared->title, ops_done);
  428.                 if (verbosity > chatty)
  429.                     PrintRecvDesc(desc_out, "Service ready");
  430.                 rv = ServiceRequest(shared, desc_out);
  431.                 break;
  432.             }
  433.             case PR_MW_INTERRUPT:
  434.                 MW_ASSERT(PR_PENDING_INTERRUPT_ERROR == PR_GetError());
  435.                 rv = PR_FAILURE;  /* if interrupted, then exit */
  436.                 break;
  437.             case PR_MW_TIMEOUT:
  438.                 MW_ASSERT(PR_IO_TIMEOUT_ERROR == PR_GetError());
  439.             case PR_MW_FAILURE:
  440.                 if (verbosity > silent)
  441.                     PL_FPrintError(debug, "RecvReady failure");
  442.                 break;
  443.             default:
  444.                 break;
  445.         }
  446.     } while (PR_SUCCESS == rv);
  447.  
  448.     if (NULL != desc_out) DestroyRecvWait(shared, desc_out);
  449.  
  450. }  /* ServiceThread */
  451.  
  452.  
  453. static void PR_CALLBACK ServerThread(void *arg)
  454. {
  455.     PRStatus rv;
  456.     PRIntn index;
  457.     PRRecvWait *desc_in;
  458.     PRThread **worker_thread;
  459.     Shared *shared = (Shared*)arg;
  460.     PRFileDesc *listener, *service;
  461.     PRNetAddr server_address, client_address;
  462.  
  463.     worker_thread = (PRThread**)PR_CALLOC(sizeof(PRThread*) * worker_threads);
  464.     if (verbosity > quiet)
  465.         PR_fprintf(debug, "%s: Server creating worker_threads\n", shared->title);
  466.     for (index = 0; index < worker_threads; ++index)
  467.     {
  468.         worker_thread[index] = PR_CreateThread(
  469.             PR_USER_THREAD, ServiceThread, shared,
  470.             PR_PRIORITY_HIGH, thread_scope,
  471.             PR_JOINABLE_THREAD, 16 * 1024);
  472.     }
  473.  
  474.     rv = PR_InitializeNetAddr(PR_IpAddrAny, default_port, &server_address);
  475.     MW_ASSERT(PR_SUCCESS == rv);
  476.  
  477.     listener = PR_NewTCPSocket(); MW_ASSERT(NULL != listener);
  478.     if (verbosity > chatty)
  479.         PR_fprintf(
  480.             debug, "%s: Server listener socket @0x%x\n",
  481.             shared->title, listener);
  482.     rv = PR_Bind(listener, &server_address); MW_ASSERT(PR_SUCCESS == rv);
  483.     rv = PR_Listen(listener, 10); MW_ASSERT(PR_SUCCESS == rv);
  484.     while (ops_done < ops_required)
  485.     {
  486.         if (verbosity > quiet)
  487.             PR_fprintf(debug, "%s: Server accepting connection\n", shared->title);
  488.         service = PR_Accept(listener, &client_address, PR_INTERVAL_NO_TIMEOUT);
  489.         if (NULL == service)
  490.         {
  491.             if (PR_PENDING_INTERRUPT_ERROR == PR_GetError()) break;
  492.             PL_PrintError("Accept failed");
  493.             MW_ASSERT(!"Accept failed");
  494.         }
  495.         else
  496.         {
  497.             desc_in = CreateRecvWait(service, shared->timeout);
  498.             desc_in->timeout = PR_INTERVAL_NO_TIMEOUT;
  499.             if (verbosity > chatty)
  500.                 PrintRecvDesc(desc_in, "Service adding");
  501.             rv = PR_AddWaitFileDesc(shared->group, desc_in);
  502.             MW_ASSERT(PR_SUCCESS == rv);
  503.         }
  504.     }
  505.  
  506.     if (verbosity > quiet)
  507.         PR_fprintf(debug, "%s: Server interrupting worker_threads\n", shared->title);
  508.     for (index = 0; index < worker_threads; ++index)
  509.     {
  510.         rv = PR_Interrupt(worker_thread[index]);
  511.         MW_ASSERT(PR_SUCCESS == rv);
  512.         rv = PR_JoinThread(worker_thread[index]);
  513.         MW_ASSERT(PR_SUCCESS == rv);
  514.     }
  515.     PR_DELETE(worker_thread);
  516.  
  517.     PR_Close(listener);
  518.  
  519.     CancelGroup(shared);
  520.  
  521. }  /* ServerThread */
  522.  
  523. static void RealOneGroupIO(Shared *shared)
  524. {
  525.     /*
  526.     ** Create a server that listens for connections and then services
  527.     ** requests that come in over those connections. The server never
  528.     ** deletes a connection and assumes a basic RPC model of operation.
  529.     **
  530.     ** Use worker_threads threads to service how every many open ports
  531.     ** there might be.
  532.     **
  533.     ** Oh, ya. Almost forget. Create (some) clients as well.
  534.     */
  535.     PRStatus rv;
  536.     PRIntn index;
  537.     PRThread *server_thread, **client_thread;
  538.  
  539.     if (verbosity > quiet)
  540.         PR_fprintf(debug, "%s: creating server_thread\n", shared->title);
  541.  
  542.     server_thread = PR_CreateThread(
  543.         PR_USER_THREAD, ServerThread, shared,
  544.         PR_PRIORITY_HIGH, thread_scope,
  545.         PR_JOINABLE_THREAD, 16 * 1024);
  546.  
  547.     if (verbosity > quiet)
  548.         PR_fprintf(debug, "%s: snoozing before creating clients\n", shared->title);
  549.     PR_Sleep(5 * shared->timeout);
  550.  
  551.     if (verbosity > quiet)
  552.         PR_fprintf(debug, "%s: creating client_threads\n", shared->title);
  553.     client_thread = (PRThread**)PR_CALLOC(sizeof(PRThread*) * client_threads);
  554.     for (index = 0; index < client_threads; ++index)
  555.     {
  556.         client_thread[index] = PR_CreateThread(
  557.             PR_USER_THREAD, ClientThread, shared,
  558.             PR_PRIORITY_NORMAL, thread_scope,
  559.             PR_JOINABLE_THREAD, 16 * 1024);
  560.     }
  561.  
  562.     while (ops_done < ops_required) PR_Sleep(shared->timeout);
  563.  
  564.     if (verbosity > quiet)
  565.         PR_fprintf(debug, "%s: interrupting/joining client_threads\n", shared->title);
  566.     for (index = 0; index < client_threads; ++index)
  567.     {
  568.         rv = PR_Interrupt(client_thread[index]);
  569.         MW_ASSERT(PR_SUCCESS == rv);
  570.         rv = PR_JoinThread(client_thread[index]);
  571.         MW_ASSERT(PR_SUCCESS == rv);
  572.     }
  573.  
  574.     if (verbosity > quiet)
  575.         PR_fprintf(debug, "%s: interrupting/joining server_thread\n", shared->title);
  576.     rv = PR_Interrupt(server_thread);
  577.     MW_ASSERT(PR_SUCCESS == rv);
  578.     rv = PR_JoinThread(server_thread);
  579.     MW_ASSERT(PR_SUCCESS == rv);
  580. }  /* RealOneGroupIO */
  581.  
  582. static void RunThisOne(
  583.     void (*func)(Shared*), const char *name, const char *test_name)
  584. {
  585.     Shared *shared;
  586.     if ((NULL == test_name) || (0 == PL_strcmp(name, test_name)))
  587.     {
  588.         if (verbosity > silent)
  589.             PR_fprintf(debug, "%s()\n", name);
  590.         shared = MakeShared(name);
  591.         ops_done = 0;
  592.         func(shared);  /* run the test */
  593.         MW_ASSERT(0 == desc_allocated);
  594.         DestroyShared(shared);
  595.     }
  596. }  /* RunThisOne */
  597.  
  598. static Verbosity ChangeVerbosity(Verbosity verbosity, PRIntn delta)
  599. {
  600.     PRIntn verbage = (PRIntn)verbosity;
  601.     return (Verbosity)(verbage += 1);
  602. }  /* ChangeVerbosity */
  603.  
  604. PRIntn main(PRIntn argc, char **argv)
  605. {
  606.     PLOptStatus os;
  607.     const char *test_name = NULL;
  608.     PLOptState *opt = PL_CreateOptState(argc, argv, "dqGc:o:p:t:w:");
  609.  
  610.     while (PL_OPT_EOL != (os = PL_GetNextOpt(opt)))
  611.     {
  612.         if (PL_OPT_BAD == os) continue;
  613.         switch (opt->option)
  614.         {
  615.         case 0:
  616.             test_name = opt->value;
  617.             break;
  618.         case 'd':  /* debug mode */
  619.             if (verbosity < noisy)
  620.                 verbosity = ChangeVerbosity(verbosity, 1);
  621.             break;
  622.         case 'q':  /* debug mode */
  623.             if (verbosity > silent)
  624.                 verbosity = ChangeVerbosity(verbosity, -1);
  625.             break;
  626.         case 'G':  /* use global threads */
  627.             thread_scope = PR_GLOBAL_THREAD;
  628.             break;
  629.         case 'c':  /* number of client threads */
  630.             client_threads = atoi(opt->value);
  631.             break;
  632.         case 'o':  /* operations to compelete */
  633.             ops_required = atoi(opt->value);
  634.             break;
  635.         case 'p':  /* default port */
  636.             default_port = atoi(opt->value);
  637.             break;
  638.         case 't':  /* number of threads waiting */
  639.             worker_threads = atoi(opt->value);
  640.             break;
  641.         case 'w':  /* number of wait objects */
  642.             wait_objects = atoi(opt->value);
  643.             break;
  644.         default:
  645.             break;
  646.         }
  647.     }
  648.     PL_DestroyOptState(opt);
  649.  
  650.     if (verbosity > 0)
  651.         debug = PR_GetSpecialFD(PR_StandardError);
  652.  
  653.     RunThisOne(OneInThenCancelled, "OneInThenCancelled", test_name);
  654.     RunThisOne(OneOpOneThread, "OneOpOneThread", test_name);
  655.     RunThisOne(ManyOpOneThread, "ManyOpOneThread", test_name);
  656.     RunThisOne(SomeOpsSomeThreads, "SomeOpsSomeThreads", test_name);
  657.     RunThisOne(RealOneGroupIO, "RealOneGroupIO", test_name);
  658.     return 0;
  659. }  /* main */
  660.  
  661. /* multwait.c */
  662.