home
***
CD-ROM
|
disk
|
FTP
|
other
***
search
/
OS/2 Shareware BBS: 10 Tools
/
10-Tools.zip
/
mitsch75.zip
/
scheme-7_5_17-src.zip
/
scheme-7.5.17
/
src
/
microcode
/
os2cthrd.c
< prev
next >
Wrap
C/C++ Source or Header
|
1999-01-02
|
9KB
|
274 lines
/* -*-C-*-
$Id: os2cthrd.c,v 1.10 1999/01/02 06:11:34 cph Exp $
Copyright (c) 1994-1999 Massachusetts Institute of Technology
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 2 of the License, or (at
your option) any later version.
This program is distributed in the hope that it will be useful, but
WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
*/
/* Scheme side of channel thread interface */
#include "os2.h"
static void run_channel_thread (void *);
static void start_readahead_thread (channel_context_t *);
static void send_readahead_ack (qid_t, enum readahead_ack_action);
static msg_t * new_message (void);
typedef struct
{
LHANDLE handle;
qid_t qid;
channel_reader_t reader;
} thread_arg_t;
void
OS2_start_channel_thread (Tchannel channel,
channel_reader_t reader,
channel_op_t operator)
{
channel_context_t * context = (OS2_make_channel_context ());
thread_arg_t * arg = (OS_malloc (sizeof (thread_arg_t)));
(CHANNEL_OPERATOR_CONTEXT (channel)) = context;
OS2_open_qid ((CHANNEL_CONTEXT_READER_QID (context)), OS2_scheme_tqueue);
OS2_open_qid
((CHANNEL_CONTEXT_WRITER_QID (context)), (OS2_make_std_tqueue ()));
(arg -> handle) = (CHANNEL_HANDLE (channel));
(arg -> qid) = (CHANNEL_CONTEXT_WRITER_QID (context));
(arg -> reader) = reader;
(CHANNEL_CONTEXT_TID (context))
= (OS2_beginthread (run_channel_thread, arg, 0));
(CHANNEL_OPERATOR (channel)) = operator;
}
static void
run_channel_thread (void * arg)
{
LHANDLE handle = (((thread_arg_t *) arg) -> handle);
qid_t qid = (((thread_arg_t *) arg) -> qid);
channel_reader_t reader = (((thread_arg_t *) arg) -> reader);
EXCEPTIONREGISTRATIONRECORD registration;
OS_free (arg);
(void) OS2_thread_initialize ((®istration), qid);
/* Wait for first read request before doing anything. */
while ((OS2_wait_for_readahead_ack (qid)) == raa_read)
{
int eofp;
msg_t * message
= ((*reader) (handle, qid, (OS2_make_readahead ()), (&eofp)));
if (message == 0)
break;
OS2_send_message (qid, message);
if (eofp)
break;
}
{
tqueue_t * tqueue = (OS2_qid_tqueue (qid));
OS2_close_qid (qid);
OS2_close_std_tqueue (tqueue);
}
OS2_endthread ();
}
void
OS2_channel_thread_read_op (Tchannel channel,
choparg_t arg1, choparg_t arg2, choparg_t arg3)
{
(* ((long *) arg3))
= (OS2_channel_thread_read
(channel, ((char *) arg1), ((size_t) arg2)));
}
void
OS2_initialize_channel_thread_messages (void)
{
SET_MSG_TYPE_LENGTH (mt_readahead, sm_readahead_t);
SET_MSG_TYPE_LENGTH (mt_readahead_ack, sm_readahead_ack_t);
}
channel_context_t *
OS2_make_channel_context (void)
{
channel_context_t * context = (OS_malloc (sizeof (channel_context_t)));
OS2_make_qid_pair ((& (CHANNEL_CONTEXT_READER_QID (context))),
(& (CHANNEL_CONTEXT_WRITER_QID (context))));
(CHANNEL_CONTEXT_EOFP (context)) = 0;
(CHANNEL_CONTEXT_FIRST_READ_P (context)) = 1;
return (context);
}
void
OS2_channel_thread_close (Tchannel channel)
{
channel_context_t * context = (CHANNEL_OPERATOR_CONTEXT (channel));
/* Send a readahead ACK informing the channel thread to kill itself.
Then, close our end of the connection -- it's no longer needed. */
send_readahead_ack ((CHANNEL_CONTEXT_READER_QID (context)), raa_close);
OS2_close_qid (CHANNEL_CONTEXT_READER_QID (context));
OS_free (context);
/* Finally, the caller must close the channel handle. If the
channel thread is blocked in dos_read, this will break it out and
get it to kill itself. There's no race, because the channel
thread won't try to close the handle, and if it breaks out of
dos_read before we do the close, it will see the readahead ACK we
just sent and that will kill it. */
}
qid_t
OS2_channel_thread_descriptor (Tchannel channel)
{
channel_context_t * context = (CHANNEL_OPERATOR_CONTEXT (channel));
/* Make sure that the readahead thread is started, so that when
input arrives it will be registered properly so that the "select"
emulation will notice it. */
start_readahead_thread (context);
return (CHANNEL_CONTEXT_READER_QID (context));
}
static void
start_readahead_thread (channel_context_t * context)
{
/* Wake up the reader thread if this is the first time we are
operating on it. This is necessary because we sometimes don't
want to read from the channel at all -- for example, when the
channel is the read side of a pipe that is being passed to a
child process. */
if (CHANNEL_CONTEXT_FIRST_READ_P (context))
{
send_readahead_ack ((CHANNEL_CONTEXT_READER_QID (context)), raa_read);
(CHANNEL_CONTEXT_FIRST_READ_P (context)) = 0;
}
}
msg_t *
OS2_make_readahead (void)
{
msg_t * message = (OS2_create_message (mt_readahead));
(SM_READAHEAD_INDEX (message)) = 0;
return (message);
}
long
OS2_channel_thread_read (Tchannel channel, char * buffer, size_t size)
{
channel_context_t * context = (CHANNEL_OPERATOR_CONTEXT (channel));
qid_t qid = (CHANNEL_CONTEXT_READER_QID (context));
msg_t * message;
unsigned short index;
unsigned short navail;
if ((CHANNEL_CONTEXT_EOFP (context)) || (size == 0))
return (0);
start_readahead_thread (context);
message = (OS2_receive_message (qid, (!CHANNEL_NONBLOCKING (channel)), 1));
if (message == 0)
return (-1);
if (OS2_error_message_p (message))
{
send_readahead_ack (qid, raa_read);
OS2_handle_error_message (message);
}
if ((MSG_TYPE (message)) != mt_readahead)
OS2_logic_error ("Illegal message from channel thread.");
index = (SM_READAHEAD_INDEX (message));
if (index == 0)
send_readahead_ack (qid, raa_read);
navail = ((SM_READAHEAD_SIZE (message)) - index);
if (navail == 0)
{
OS2_destroy_message (message);
(CHANNEL_CONTEXT_EOFP (context)) = 1;
return (0);
}
else if (navail <= size)
{
FASTCOPY (((SM_READAHEAD_DATA (message)) + index), buffer, navail);
OS2_destroy_message (message);
return (navail);
}
else
{
FASTCOPY (((SM_READAHEAD_DATA (message)) + index), buffer, size);
(SM_READAHEAD_INDEX (message)) += size;
OS2_unread_message (qid, message);
return (size);
}
}
static void
send_readahead_ack (qid_t qid, enum readahead_ack_action action)
{
msg_t * message = (OS2_create_message (mt_readahead_ack));
(SM_READAHEAD_ACK_ACTION (message)) = action;
OS2_send_message (qid, message);
}
enum readahead_ack_action
OS2_wait_for_readahead_ack (qid_t qid)
{
/* Wait for an acknowledgement before starting another read.
This regulates the amount of data in the queue. */
msg_t * message = (OS2_wait_for_message (qid, mt_readahead_ack));
enum readahead_ack_action action = (SM_READAHEAD_ACK_ACTION (message));
OS2_destroy_message (message);
return (action);
}
void
OS2_readahead_buffer_insert (void * buffer, char c)
{
msg_t * last = (OS2_msg_fifo_last (buffer));
if ((last != 0) && ((SM_READAHEAD_SIZE (last)) < SM_READAHEAD_MAX))
((SM_READAHEAD_DATA (last)) [(SM_READAHEAD_SIZE (last))++]) = c;
else
{
msg_t * message = (new_message ());
((SM_READAHEAD_DATA (message)) [(SM_READAHEAD_SIZE (message))++]) = c;
OS2_msg_fifo_insert (buffer, message);
}
}
static msg_t *
new_message (void)
{
msg_t * message = (OS2_make_readahead ());
(SM_READAHEAD_SIZE (message)) = 0;
return (message);
}
char
OS2_readahead_buffer_rubout (void * buffer)
{
msg_t * message = (OS2_msg_fifo_last (buffer));
if (message == 0)
OS2_logic_error ("Rubout from empty readahead buffer.");
{
char c = ((SM_READAHEAD_DATA (message)) [--(SM_READAHEAD_SIZE (message))]);
if ((SM_READAHEAD_SIZE (message)) == 0)
{
OS2_msg_fifo_remove_last (buffer);
OS2_destroy_message (message);
}
return (c);
}
}
msg_t *
OS2_readahead_buffer_read (void * buffer)
{
msg_t * message = (OS2_msg_fifo_remove (buffer));
return ((message == 0) ? (new_message ()) : message);
}