home *** CD-ROM | disk | FTP | other *** search
/ OS/2 Shareware BBS: 10 Tools / 10-Tools.zip / mitsch75.zip / scheme-7_5_17-src.zip / scheme-7.5.17 / src / microcode / os2cthrd.c < prev    next >
C/C++ Source or Header  |  1999-01-02  |  9KB  |  274 lines

  1. /* -*-C-*-
  2.  
  3. $Id: os2cthrd.c,v 1.10 1999/01/02 06:11:34 cph Exp $
  4.  
  5. Copyright (c) 1994-1999 Massachusetts Institute of Technology
  6.  
  7. This program is free software; you can redistribute it and/or modify
  8. it under the terms of the GNU General Public License as published by
  9. the Free Software Foundation; either version 2 of the License, or (at
  10. your option) any later version.
  11.  
  12. This program is distributed in the hope that it will be useful, but
  13. WITHOUT ANY WARRANTY; without even the implied warranty of
  14. MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
  15. General Public License for more details.
  16.  
  17. You should have received a copy of the GNU General Public License
  18. along with this program; if not, write to the Free Software
  19. Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
  20. */
  21.  
  22. /* Scheme side of channel thread interface */
  23.  
  24. #include "os2.h"
  25.  
  26. static void run_channel_thread (void *);
  27. static void start_readahead_thread (channel_context_t *);
  28. static void send_readahead_ack (qid_t, enum readahead_ack_action);
  29. static msg_t * new_message (void);
  30.  
  31. typedef struct
  32. {
  33.   LHANDLE handle;
  34.   qid_t qid;
  35.   channel_reader_t reader;
  36. } thread_arg_t;
  37.  
  38. void
  39. OS2_start_channel_thread (Tchannel channel,
  40.               channel_reader_t reader,
  41.               channel_op_t operator)
  42. {
  43.   channel_context_t * context = (OS2_make_channel_context ());
  44.   thread_arg_t * arg = (OS_malloc (sizeof (thread_arg_t)));
  45.   (CHANNEL_OPERATOR_CONTEXT (channel)) = context;
  46.   OS2_open_qid ((CHANNEL_CONTEXT_READER_QID (context)), OS2_scheme_tqueue);
  47.   OS2_open_qid
  48.     ((CHANNEL_CONTEXT_WRITER_QID (context)), (OS2_make_std_tqueue ()));
  49.   (arg -> handle) = (CHANNEL_HANDLE (channel));
  50.   (arg -> qid) = (CHANNEL_CONTEXT_WRITER_QID (context));
  51.   (arg -> reader) = reader;
  52.   (CHANNEL_CONTEXT_TID (context))
  53.     = (OS2_beginthread (run_channel_thread, arg, 0));
  54.   (CHANNEL_OPERATOR (channel)) = operator;
  55. }
  56.  
  57. static void
  58. run_channel_thread (void * arg)
  59. {
  60.   LHANDLE handle = (((thread_arg_t *) arg) -> handle);
  61.   qid_t qid = (((thread_arg_t *) arg) -> qid);
  62.   channel_reader_t reader = (((thread_arg_t *) arg) -> reader);
  63.   EXCEPTIONREGISTRATIONRECORD registration;
  64.   OS_free (arg);
  65.   (void) OS2_thread_initialize ((®istration), qid);
  66.   /* Wait for first read request before doing anything.  */
  67.   while ((OS2_wait_for_readahead_ack (qid)) == raa_read)
  68.     {
  69.       int eofp;
  70.       msg_t * message
  71.     = ((*reader) (handle, qid, (OS2_make_readahead ()), (&eofp)));
  72.       if (message == 0)
  73.     break;
  74.       OS2_send_message (qid, message);
  75.       if (eofp)
  76.     break;
  77.     }
  78.   {
  79.     tqueue_t * tqueue = (OS2_qid_tqueue (qid));
  80.     OS2_close_qid (qid);
  81.     OS2_close_std_tqueue (tqueue);
  82.   }
  83.   OS2_endthread ();
  84. }
  85.  
  86. void
  87. OS2_channel_thread_read_op (Tchannel channel,
  88.                 choparg_t arg1, choparg_t arg2, choparg_t arg3)
  89. {
  90.   (* ((long *) arg3))
  91.     = (OS2_channel_thread_read
  92.        (channel, ((char *) arg1), ((size_t) arg2)));
  93. }
  94.  
  95. void
  96. OS2_initialize_channel_thread_messages (void)
  97. {
  98.   SET_MSG_TYPE_LENGTH (mt_readahead, sm_readahead_t);
  99.   SET_MSG_TYPE_LENGTH (mt_readahead_ack, sm_readahead_ack_t);
  100. }
  101.  
  102. channel_context_t *
  103. OS2_make_channel_context (void)
  104. {
  105.   channel_context_t * context = (OS_malloc (sizeof (channel_context_t)));
  106.   OS2_make_qid_pair ((& (CHANNEL_CONTEXT_READER_QID (context))),
  107.              (& (CHANNEL_CONTEXT_WRITER_QID (context))));
  108.   (CHANNEL_CONTEXT_EOFP (context)) = 0;
  109.   (CHANNEL_CONTEXT_FIRST_READ_P (context)) = 1;
  110.   return (context);
  111. }
  112.  
  113. void
  114. OS2_channel_thread_close (Tchannel channel)
  115. {
  116.   channel_context_t * context = (CHANNEL_OPERATOR_CONTEXT (channel));
  117.   /* Send a readahead ACK informing the channel thread to kill itself.
  118.      Then, close our end of the connection -- it's no longer needed.  */
  119.   send_readahead_ack ((CHANNEL_CONTEXT_READER_QID (context)), raa_close);
  120.   OS2_close_qid (CHANNEL_CONTEXT_READER_QID (context));
  121.   OS_free (context);
  122.   /* Finally, the caller must close the channel handle.  If the
  123.      channel thread is blocked in dos_read, this will break it out and
  124.      get it to kill itself.  There's no race, because the channel
  125.      thread won't try to close the handle, and if it breaks out of
  126.      dos_read before we do the close, it will see the readahead ACK we
  127.      just sent and that will kill it.  */
  128. }
  129.  
  130. qid_t
  131. OS2_channel_thread_descriptor (Tchannel channel)
  132. {
  133.   channel_context_t * context = (CHANNEL_OPERATOR_CONTEXT (channel));
  134.   /* Make sure that the readahead thread is started, so that when
  135.      input arrives it will be registered properly so that the "select"
  136.      emulation will notice it.  */
  137.   start_readahead_thread (context);
  138.   return (CHANNEL_CONTEXT_READER_QID (context));
  139. }
  140.  
  141. static void
  142. start_readahead_thread (channel_context_t * context)
  143. {
  144.   /* Wake up the reader thread if this is the first time we are
  145.      operating on it.  This is necessary because we sometimes don't
  146.      want to read from the channel at all -- for example, when the
  147.      channel is the read side of a pipe that is being passed to a
  148.      child process.  */
  149.   if (CHANNEL_CONTEXT_FIRST_READ_P (context))
  150.     {
  151.       send_readahead_ack ((CHANNEL_CONTEXT_READER_QID (context)), raa_read);
  152.       (CHANNEL_CONTEXT_FIRST_READ_P (context)) = 0;
  153.     }
  154. }
  155.  
  156. msg_t *
  157. OS2_make_readahead (void)
  158. {
  159.   msg_t * message = (OS2_create_message (mt_readahead));
  160.   (SM_READAHEAD_INDEX (message)) = 0;
  161.   return (message);
  162. }
  163.  
  164. long
  165. OS2_channel_thread_read (Tchannel channel, char * buffer, size_t size)
  166. {
  167.   channel_context_t * context = (CHANNEL_OPERATOR_CONTEXT (channel));
  168.   qid_t qid = (CHANNEL_CONTEXT_READER_QID (context));
  169.   msg_t * message;
  170.   unsigned short index;
  171.   unsigned short navail;
  172.   if ((CHANNEL_CONTEXT_EOFP (context)) || (size == 0))
  173.     return (0);
  174.   start_readahead_thread (context);
  175.   message = (OS2_receive_message (qid, (!CHANNEL_NONBLOCKING (channel)), 1));
  176.   if (message == 0)
  177.     return (-1);
  178.   if (OS2_error_message_p (message))
  179.     {
  180.       send_readahead_ack (qid, raa_read);
  181.       OS2_handle_error_message (message);
  182.     }
  183.   if ((MSG_TYPE (message)) != mt_readahead)
  184.     OS2_logic_error ("Illegal message from channel thread.");
  185.   index = (SM_READAHEAD_INDEX (message));
  186.   if (index == 0)
  187.     send_readahead_ack (qid, raa_read);
  188.   navail = ((SM_READAHEAD_SIZE (message)) - index);
  189.   if (navail == 0)
  190.     {
  191.       OS2_destroy_message (message);
  192.       (CHANNEL_CONTEXT_EOFP (context)) = 1;
  193.       return (0);
  194.     }
  195.   else if (navail <= size)
  196.     {
  197.       FASTCOPY (((SM_READAHEAD_DATA (message)) + index), buffer, navail);
  198.       OS2_destroy_message (message);
  199.       return (navail);
  200.     }
  201.   else
  202.     {
  203.       FASTCOPY (((SM_READAHEAD_DATA (message)) + index), buffer, size);
  204.       (SM_READAHEAD_INDEX (message)) += size;
  205.       OS2_unread_message (qid, message);
  206.       return (size);
  207.     }
  208. }
  209.  
  210. static void
  211. send_readahead_ack (qid_t qid, enum readahead_ack_action action)
  212. {
  213.   msg_t * message = (OS2_create_message (mt_readahead_ack));
  214.   (SM_READAHEAD_ACK_ACTION (message)) = action;
  215.   OS2_send_message (qid, message);
  216. }
  217.  
  218. enum readahead_ack_action
  219. OS2_wait_for_readahead_ack (qid_t qid)
  220. {
  221.   /* Wait for an acknowledgement before starting another read.
  222.      This regulates the amount of data in the queue.  */
  223.   msg_t * message = (OS2_wait_for_message (qid, mt_readahead_ack));
  224.   enum readahead_ack_action action = (SM_READAHEAD_ACK_ACTION (message));
  225.   OS2_destroy_message (message);
  226.   return (action);
  227. }
  228.  
  229. void
  230. OS2_readahead_buffer_insert (void * buffer, char c)
  231. {
  232.   msg_t * last = (OS2_msg_fifo_last (buffer));
  233.   if ((last != 0) && ((SM_READAHEAD_SIZE (last)) < SM_READAHEAD_MAX))
  234.     ((SM_READAHEAD_DATA (last)) [(SM_READAHEAD_SIZE (last))++]) = c;
  235.   else
  236.     {
  237.       msg_t * message = (new_message ());
  238.       ((SM_READAHEAD_DATA (message)) [(SM_READAHEAD_SIZE (message))++]) = c;
  239.       OS2_msg_fifo_insert (buffer, message);
  240.     }
  241. }
  242.  
  243. static msg_t *
  244. new_message (void)
  245. {
  246.   msg_t * message = (OS2_make_readahead ());
  247.   (SM_READAHEAD_SIZE (message)) = 0;
  248.   return (message);
  249. }
  250.  
  251. char
  252. OS2_readahead_buffer_rubout (void * buffer)
  253. {
  254.   msg_t * message = (OS2_msg_fifo_last (buffer));
  255.   if (message == 0)
  256.     OS2_logic_error ("Rubout from empty readahead buffer.");
  257.   {
  258.     char c = ((SM_READAHEAD_DATA (message)) [--(SM_READAHEAD_SIZE (message))]);
  259.     if ((SM_READAHEAD_SIZE (message)) == 0)
  260.       {
  261.     OS2_msg_fifo_remove_last (buffer);
  262.     OS2_destroy_message (message);
  263.       }
  264.     return (c);
  265.   }
  266. }
  267.  
  268. msg_t *
  269. OS2_readahead_buffer_read (void * buffer)
  270. {
  271.   msg_t * message = (OS2_msg_fifo_remove (buffer));
  272.   return ((message == 0) ? (new_message ()) : message);
  273. }
  274.