home *** CD-ROM | disk | FTP | other *** search
/ OS/2 Shareware BBS: 10 Tools / 10-Tools.zip / memlink.zip / PIPES.C < prev    next >
C/C++ Source or Header  |  1995-08-09  |  16KB  |  441 lines

  1. /*mmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmm
  2. /* PIPES.C
  3. /*
  4. /* This module provides a mechanism to pass data between threads of the same
  5. /* process or between processes.  It was written as a replacement for a 
  6. /* a named pipe implementation of interprocess communications in an 
  7. /* application with 4 communicating processes.  The named pipe implementation
  8. /* was poorly written, caused lock-up problems, and proved to be more
  9. /* complicated than needed.  This type of implementation seemed cleaner and
  10. /* simpler to implement.
  11. /* 
  12. /* The following IPC mechanism is more efficient and does not involve OS/2 
  13. /* named pipes.  It uses shared memory in a DLL, with memory access and flow 
  14. /* control implemented with semaphores. 
  15. /*
  16. /* Although the name "pipe" is used in this file, this term is not referring 
  17. /* to any OS/2 pipes.  This implementation can be thought of as a homemade 
  18. /* piping mechanism using shared memory in a DLL and semaphores to 
  19. /* synchronize access to the memory.  The module is self-initializing with
  20. /* semaphores created and opened automatically with the first calls to
  21. /* the following exported routines.  The following 3 routines are the only
  22. /* routines that need to be called outside of this module. This file is the
  23. /* only C file used to create MEMLINK.DLL.  
  24. /*
  25. /* To use these 3 routines you should include pipes.h in the files that call 
  26. /* the routines.  The structure "message" must also be defined for this module. 
  27. /* "message" is the data structure that is passed in the pipes between the
  28. /* threads or processes using these pipes. 
  29. /*
  30. /* HOW TO USE BETWEEN PROCESSES (OR THREADS) "p1" AND "p2":
  31. /*
  32. /* 1) Pick a free pipe Id (i.e. PIPE_BOB, PIPE_MARY, ... defined in pipes.h).
  33. /*    In our example we'll use the first one - PIPE_BOB.
  34. /*
  35. /* 2) Designate one of the processes as PIPE_END_SERVER and the other PIPE_END_CLIENT.
  36. /*    It doesn't matter which one is which.  For this example we'll use 
  37. /*                      p1 = PIPE_END_SERVER
  38. /*                      p2 = PIPE_END_CLIENT
  39. /*
  40. /* 3) When p1 WRITES a "MyMessage" structure to p2 it makes a 
  41. /*    call like the following:     PipeWrite(PIPE_BOB, PIPE_END_SERVER, &MyMessage); 
  42. /*
  43. /* 4) When p1 READS a "MyMessage" structure from p2 it makes a 
  44. /*    call like the following:     PipeRead(PIPE_BOB, PIPE_END_SERVER, &MyMessage); 
  45. /*
  46. /* 5) When p2 WRITES a "MyMessage" structure to p1 it makes a 
  47. /*    call like the following:     PipeWrite(PIPE_BOB, PIPE_END_CLIENT, &MyMessage); 
  48. /*
  49. /* 6) When p2 READS a "MyMessage" structure from p1 it makes a 
  50. /*    call like the following:     PipeRead(PIPE_BOB, PIPE_END_CLIENT, &MyMessage); 
  51. /*
  52. /*  Note: Each write will block until the data is read from the last write. 
  53. /*        Each read will block until data is available to be read ( unless you
  54. /*        use PipeReadNoWait(), which returns without waiting for avail data).
  55. /*        This blocking provides flow control of the messages between the p's.
  56. /*
  57. /* HOW TO INCORPORATE INTO YOUR APP(S): 
  58. /*       1) Define your "struct message" data structure in pipes.h.
  59. /*       2) Recompile to create MEMLINK.LIB & MEMLINK.DLL:  (i.e. nmake memlink.mak)
  60. /*       3) Link MEMLINK.LIB into your executable(s) and make sure MEMLINK.DLL
  61. /*           is available to them during run-time.
  62. /*
  63. /* EXPORTED ROUTINES:
  64. /*    int EXPENTRY PipeRead(int PipeId, int PipeEnd, struct message *Msg)
  65. /*    int EXPENTRY PipeReadNoWait(int PipeId, int PipeEnd, 
  66. /*        struct message *Msg, int *DataReturnedFlag)
  67. /*    int EXPENTRY PipeWrite(int PipeId, int PipeEnd, struct message *Msg)
  68. /*
  69. /* NOTE All functions return 0 if no error, else a nonzero value.
  70. /*mmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmm*/
  71.  
  72. #define INCL_DOSSEMAPHORES
  73.  
  74. #include <os2.h>
  75. #include <stdlib.h>
  76. #include "pipes.h"
  77.  
  78. /* Semaphore API errors */
  79. #ifndef ERROR_TIMEOUT
  80.     #define NO_ERROR 0
  81.     #define ERROR_INVALID_HANDLE 6
  82.     #define ERROR_NOT_ENOUGH_MEMORY 8
  83.     #define ERROR_INVALID_PARAMETER 87
  84.     #define ERROR_INTERRUPT 95
  85.     #define ERROR_TOO_MANY_SEM_REQUESTS 103
  86.     #define ERROR_SEM_OWNER_DIED 105
  87.     #define ERROR_INVALID_NAME 123
  88.     #define ERROR_SEM_NOT_FOUND 187
  89.     #define ERROR_NOT_OWNER 288
  90.     #define ERROR_TOO_MANY_OPENS 291
  91.     #define ERROR_TOO_MANY_POSTS 298
  92.     #define ERROR_ALREADY_POSTED 299
  93.     #define ERROR_ALREADY_RESET 300
  94.     #define ERROR_TIMEOUT 640
  95. #endif
  96.  
  97. // semaphore name used locally in this module
  98. #define PINITMUTEXSEMNAME "\\SEM32\\PINITMTX.SEM"
  99.  
  100. // prototypes of locally called routines.
  101. static int CreatePipeSemaphores(int PipeId);
  102. static APIRET PipeQueryEventSem(HEV *SemHandle, ULONG *ulPostCt);
  103. static APIRET PipeWaitAndResetEventSem(HEV *SemHandle);
  104. static APIRET PipePostEventSem(HEV *SemHandle);
  105. static APIRET RequestPipeInitMutexSem(void);
  106. static APIRET ReleasePipeInitMutexSem(void);
  107.  
  108. // The following data is shared among all users of the DLL.
  109. #pragma data_seg(SharedSeg1)    /* start of shared data segment */
  110.   static HEV WriteEventSem[MAX_NUM_PIPES][2], ReadEventSem[MAX_NUM_PIPES][2];
  111.   static struct message PipeData[MAX_NUM_PIPES][2];
  112.   static short PipeInitFlag[MAX_NUM_PIPES];
  113.   static HMTX hPInitMtxSem;
  114. #pragma data_seg()    /* end of shared data segment */
  115.  
  116. /**************************************************************************
  117. /* PipeRead    Performs a read from one of the "pipes".  This routine blocks 
  118. /*        until a message is available to be read (when the other end
  119. /*        of the pipe writes to the pipe).
  120. /* arguments:
  121. /*        PipeId - The id assigned to this pipe (1, 2, ...).
  122. /*        PipeEnd - PIPE_END_SERVER or PIPE_END_CLIENT.
  123. /*        Msg - Pointer to buffer for returned data.
  124. /* returns:
  125. /*         0 if no error, else a nonzero value.
  126. /**************************************************************************/
  127. int EXPENTRY PipeRead(int PipeId, int PipeEnd, struct message *Msg)
  128. {
  129.     int ReturnVal=0, OtherEnd, PipeIndex;
  130.  
  131.     PipeIndex = PipeId - 1;           /* Ids start with 1, indices with 0 */
  132.     OtherEnd = (PipeEnd) ? 0 : 1;  /* Get Id of other side of pipe */
  133.  
  134.     /* Check for valid arguments */
  135.     if ((PipeIndex < 0) || (PipeIndex >= MAX_NUM_PIPES) ||  
  136.         (PipeEnd < 0) || (PipeEnd > 1)) return -1;
  137.  
  138.     /* Is this the first call?  If so then initialize the semaphores. */
  139.     else if (!PipeInitFlag[PipeIndex]) 
  140.     {
  141.         ReturnVal = CreatePipeSemaphores(PipeId);
  142.         if (ReturnVal) return ReturnVal;
  143.     }
  144.  
  145.     /* wait till a message has been written by opposite side. */
  146.     ReturnVal |= PipeWaitAndResetEventSem(&WriteEventSem[PipeIndex][OtherEnd]); 
  147.  
  148.     /* Get data from pipe. */
  149.     *Msg = PipeData[PipeIndex][PipeEnd];
  150.     
  151.     /* Indicate data has been read and pipe can be written to again. */
  152.     ReturnVal |= PipePostEventSem(&ReadEventSem[PipeIndex][PipeEnd]);
  153.  
  154.     return ReturnVal;
  155. }
  156.  
  157. /**************************************************************************
  158. /* PipeReadNoWait Performs a read from one of the "pipes".  This routine does
  159. /*        not block if a message is not available to be read.  If data
  160. /*        is available to be read the data is returned in "Msg" and
  161. /*        "DataReturnedFlag" is set to 1, else "DataReturnedFlag" is
  162. /*        set to 0 and there is no valid data returned in "Msg".
  163. /*
  164. /* arguments:
  165. /*        PipeId - The id assigned to this pipe.(1, 2, ...).
  166. /*        PipeEnd - PIPE_END_SERVER or PIPE_END_CLIENT.
  167. /*        Msg - Pointer to buffer for returned data.
  168. /*        DataReturnedFlag - Flag indicating if data is returned from
  169. /*            the pipe read.
  170. /*
  171. /* returns:
  172. /*         0 if no error, else a nonzero value.
  173. /**************************************************************************/
  174. int EXPENTRY PipeReadNoWait(int PipeId, int PipeEnd, 
  175.         struct message *Msg, int *DataReturnedFlag)
  176. {
  177.     ULONG SemPostCt;
  178.     int ReturnVal=0, OtherEnd, PipeIndex;
  179.  
  180.     PipeIndex = PipeId - 1;           /* Ids start with 1, indices with 0 */
  181.     OtherEnd = (PipeEnd) ? 0 : 1;  /* Get Id of other side of pipe */
  182.  
  183.     /* Check for valid arguments */
  184.     if ((PipeIndex < 0) || (PipeIndex >= MAX_NUM_PIPES) ||  
  185.         (PipeEnd < 0) || (PipeEnd > 1)) return -1;
  186.  
  187.     /* Is this the first call?  If so then initialize the semaphores. */
  188.     else if (!PipeInitFlag[PipeIndex]) 
  189.     {
  190.         ReturnVal = CreatePipeSemaphores(PipeId);
  191.         if (ReturnVal) return ReturnVal;
  192.     }
  193.  
  194.     /* check (without waiting) if Write Event is posted. */
  195.     ReturnVal |= PipeQueryEventSem(&WriteEventSem[PipeIndex][OtherEnd], &SemPostCt);
  196.  
  197.     /* If error or no data in pipe then do quick exit. */
  198.     if (ReturnVal || (SemPostCt == 0))
  199.     {
  200.         *DataReturnedFlag = 0;
  201.         return 0;
  202.     }
  203.  
  204.     /* check if Write Event is posted (data is available in pipe).  */
  205.     ReturnVal |= PipeWaitAndResetEventSem(&WriteEventSem[PipeIndex][OtherEnd]); 
  206.  
  207.     /* Get data from pipe. */
  208.     *Msg = PipeData[PipeIndex][PipeEnd];
  209.     
  210.     /* Indicate data has been read and pipe can be written to again. */
  211.     ReturnVal |= PipePostEventSem(&ReadEventSem[PipeIndex][PipeEnd]);
  212.  
  213.     *DataReturnedFlag = 1;
  214.  
  215.     return ReturnVal;
  216. }
  217.  
  218. /**************************************************************************
  219. /* PipeWrite    Performs a write to one of the "pipes".  This routine blocks 
  220. /*        until room is available in the pipe for the write. 
  221. /*
  222. /* arguments:
  223. /*        PipeId - The id assigned to this pipe.(1, 2, ...).
  224. /*        PipeEnd - PIPE_END_SERVER or PIPE_END_CLIENT.
  225. /*        Msg - Pointer to data being written to the pipe.
  226. /* returns:
  227. /*         0 if no error, else a nonzero value.
  228. /**************************************************************************/
  229. int EXPENTRY PipeWrite(int PipeId, int PipeEnd, struct message *Msg)
  230. {
  231.     ULONG SemPostCt;
  232.     int ReturnVal=0, OtherEnd, PipeIndex;
  233.  
  234.     PipeIndex = PipeId - 1;        /* Ids start with 1, indices with 0 */
  235.     OtherEnd = (PipeEnd) ? 0 : 1;   /* Get Id of other side of pipe */
  236.  
  237.     /* Check for valid arguments */
  238.     if ((PipeIndex < 0) || (PipeIndex >= MAX_NUM_PIPES) ||
  239.         (PipeEnd < 0) || (PipeEnd > 1)) return -1;
  240.  
  241.     /* Is this the first call?  If so then initialize the semaphores. */
  242.     else if (!PipeInitFlag[PipeIndex]) 
  243.     {
  244.         ReturnVal = CreatePipeSemaphores(PipeId);
  245.         if (ReturnVal) return ReturnVal;
  246.     }
  247.  
  248.     /* wait till old message has been read by other side of pipe */
  249.     ReturnVal |= PipeWaitAndResetEventSem(&ReadEventSem[PipeIndex][OtherEnd]); 
  250.  
  251.     /* Write data to pipe. */
  252.     PipeData[PipeIndex][OtherEnd] = *Msg;
  253.     
  254.     /* Let other thread know data has been written and can be read.  */
  255.     ReturnVal |= PipePostEventSem(&WriteEventSem[PipeIndex][PipeEnd]);
  256.  
  257.     return ReturnVal;
  258. }
  259.  
  260.  
  261. /*mmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmm*/
  262. /*mmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmm*/
  263. /* The rest of the routines in this module are used locally by routines in   */
  264. /* this module and are not meant to be called from outside this module.      */
  265. /*mmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmm*/
  266. /*mmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmm*/
  267.  
  268. /**************************************************************************
  269. /* CreatePipeSemaphores Performs creation of event semaphores used to 
  270. /*            synchronize access to a pipe. 
  271. /**************************************************************************/
  272. static int CreatePipeSemaphores(int PipeId)
  273. {
  274.     int ReturnVal=0, PipeIndex;
  275.     ULONG flattr=0;
  276.  
  277.     RequestPipeInitMutexSem();    /* block further access to routine */
  278.  
  279.     PipeIndex = PipeId - 1;        /* Ids start with 1, indices with 0 */
  280.  
  281.     if ((PipeIndex < 0) || (PipeIndex >= MAX_NUM_PIPES)) ReturnVal = -1;
  282.     else if (PipeInitFlag[PipeIndex]) ReturnVal = 0;
  283.     else
  284.     {
  285.        PipeInitFlag[PipeIndex] = 1;
  286.  
  287.        flattr |= DC_SEM_SHARED;
  288.  
  289.        ReturnVal |= DosCreateEventSem(0, 
  290.                 &WriteEventSem[PipeIndex][0], flattr, FALSE);
  291.        ReturnVal |= DosCreateEventSem(0, 
  292.                 &ReadEventSem[PipeIndex][0], flattr, TRUE);
  293.        ReturnVal |= DosCreateEventSem(0, 
  294.                 &WriteEventSem[PipeIndex][1], flattr, FALSE);
  295.        ReturnVal |= DosCreateEventSem(0, 
  296.                 &ReadEventSem[PipeIndex][1], flattr, TRUE);
  297.     }
  298.  
  299.     ReleasePipeInitMutexSem();    /* allow further access to routine */
  300.  
  301.     return ReturnVal;
  302. }
  303.  
  304. /**************************************************************************
  305. /* RequestPipeInitMutexSem  Performs Request for the mutex semaphore that
  306. /*            protects access to the "init" function. 
  307. /**************************************************************************/
  308. static APIRET RequestPipeInitMutexSem(void)
  309. {
  310. #define ERRMTX_TIMEOUT 3000        /* milliseconds */
  311.  
  312.     APIRET rc;
  313.  
  314.     rc = DosRequestMutexSem(hPInitMtxSem, ERRMTX_TIMEOUT);
  315.  
  316.     if (rc == ERROR_INVALID_HANDLE)
  317.     {
  318.         hPInitMtxSem = 0;
  319.  
  320.         rc = DosOpenMutexSem(PINITMUTEXSEMNAME, &hPInitMtxSem);
  321.  
  322.         if ((rc == ERROR_SEM_NOT_FOUND) || (rc == ERROR_INVALID_HANDLE))
  323.             rc = DosCreateMutexSem(PINITMUTEXSEMNAME, &hPInitMtxSem, 0, 0);
  324.  
  325.         if (!rc) 
  326.             rc = DosRequestMutexSem(hPInitMtxSem, ERRMTX_TIMEOUT);
  327.     }
  328.  
  329.     return rc;
  330. }
  331.  
  332. /**************************************************************************
  333. /* ReleasePipeInitMutexSem  Performs release of the mutex semaphore that
  334. /*            protects access to the "init" function. 
  335. /**************************************************************************/
  336. static APIRET ReleasePipeInitMutexSem(void)
  337. {
  338.     APIRET rc;
  339.  
  340.     rc = DosReleaseMutexSem(hPInitMtxSem);
  341.  
  342.     if (rc == ERROR_INVALID_HANDLE)
  343.     {
  344.         hPInitMtxSem = 0;
  345.  
  346.         rc = DosOpenMutexSem(PINITMUTEXSEMNAME, &hPInitMtxSem);
  347.  
  348.         if ((rc == ERROR_SEM_NOT_FOUND) || (rc == ERROR_INVALID_HANDLE))
  349.             rc = DosCreateMutexSem(PINITMUTEXSEMNAME, &hPInitMtxSem, 0, 0);
  350.  
  351.         if (!rc) 
  352.             rc = DosReleaseMutexSem(hPInitMtxSem);
  353.     }
  354.  
  355.     return rc;
  356. }
  357.  
  358. /***************************************************************************
  359. /* PipeQueryEventSem    Performs query of an event semaphore to determine
  360. /*            how many times it has been posted since the last
  361. /*            reset.  If it is 0 then a thread would block if 
  362. /*            requesting this semaphore,until the sem is posted.
  363. /*            This routine returns the post count in "ulPostCt".
  364. /**************************************************************************/
  365. static APIRET PipeQueryEventSem(HEV *SemHandle, ULONG *ulPostCt)
  366. {
  367.     APIRET rc;
  368.  
  369.     rc = DosQueryEventSem(*SemHandle, ulPostCt);
  370.  
  371.     if (rc == ERROR_INVALID_HANDLE)
  372.     {
  373.         rc = DosOpenEventSem(0, SemHandle);
  374.  
  375.         if (!rc) 
  376.             rc = DosQueryEventSem(*SemHandle, ulPostCt);
  377.     }
  378.  
  379.     return rc;
  380. }
  381.  
  382. /***************************************************************************
  383. /* PipeWaitandResetEventSem  Performs wait for a PipeWrite() to happen before
  384. /*            returning, to put a block on PipeRead() until a 
  385. /*            message is in the queue.  After the wait finishes the
  386. /*            semaphore is immediately reset again for next read. 
  387. /***************************************************************************/
  388. static APIRET PipeWaitAndResetEventSem(HEV *SemHandle)
  389. {
  390. #define EVENTSEM_TIMEOUT SEM_INDEFINITE_WAIT
  391.  
  392.     APIRET rc;
  393.     ULONG PostCt;
  394.  
  395.     rc = DosWaitEventSem(*SemHandle, EVENTSEM_TIMEOUT);
  396.     if (!rc) rc = DosResetEventSem(*SemHandle, &PostCt);
  397.  
  398.     if (rc == ERROR_INVALID_HANDLE)
  399.     {
  400.         rc = DosOpenEventSem(0, SemHandle);
  401.  
  402.         if (!rc) 
  403.         {
  404.             rc = DosWaitEventSem(*SemHandle, EVENTSEM_TIMEOUT);
  405.             if (!rc) rc = DosResetEventSem(*SemHandle, &PostCt);
  406.         }
  407.     }
  408.  
  409.         if (rc == ERROR_ALREADY_RESET) rc = 0;
  410.  
  411.     return rc;
  412. }
  413.  
  414. /***************************************************************************
  415. /* PipePostEventSem     Performs Post of the event semaphore that 
  416. /*            allows PipeRead() to go ahead and read from the queue.
  417. /***************************************************************************/
  418. static APIRET PipePostEventSem(HEV *SemHandle)
  419. {
  420.     APIRET rc;
  421.  
  422.     rc = DosPostEventSem(*SemHandle);
  423.  
  424.     if (rc)
  425.     {
  426.          if (rc == ERROR_INVALID_HANDLE)
  427.          {
  428.  
  429.         rc = DosOpenEventSem(0, SemHandle);
  430.  
  431.         if (!rc) 
  432.             rc = DosPostEventSem(*SemHandle);
  433.          }
  434.          else if (rc == ERROR_ALREADY_POSTED) 
  435.         rc = 0;
  436.     }
  437.  
  438.     return rc;
  439. }
  440.  
  441.