home
***
CD-ROM
|
disk
|
FTP
|
other
***
search
/
Nebula
/
nebula.bin
/
SourceCode
/
Classes
/
SampleClasses
/
objectThreadPerform.m
< prev
next >
Wrap
Text File
|
1992-08-04
|
12KB
|
346 lines
// -------------------------------------------------------------------------------------
// objectThreadPerform.m
// (see objectThreadPerform.h for usage information)
// Martin D. Flynn, NeXT Computer, Inc.
// -------------------------------------------------------------------------------------
// Permission is granted to freely redistribute this source code, and to use fragments
// of this code in your own applications if you find them to be useful. This class,
// along with the source code, come with no warranty of any kind, and the user assumes
// all responsibility for its use.
// -------------------------------------------------------------------------------------
#import <stdio.h>
#import <libc.h>
#import <c.h>
#import <dpsclient/dpsclient.h>
#import <dpsclient/dpsNeXT.h>
#import <servers/netname.h>
#import <objc/zone.h>
#import <mach/cthreads.h>
#import <appkit/Application.h>
#import <appkit/Panel.h>
#import <appkit/Listener.h>
#import <appkit/nextstd.h>
// -------------------------------------------------------------------------------------
// misc defines
#define appPortFORMAT "port_%s"
#define cpNil (char*)nil
#define vpNil (void*)nil
// -------------------------------------------------------------------------------------
// main thread performance information structure
/* target/action perform information */
#define SIGNATURE 0xF121658F
typedef struct targetAction_s {
long sig; // structure signature
id target;
SEL action;
id arg[2];
int argCount;
any_t result;
mutex_t mutex;
condition_t condition;
BOOL wait;
struct targetAction_s *next;
} targetAction_t;
/* mach port data structure */
typedef struct {
msg_header_t hdr;
msg_type_t type;
void *data;
} performMessage_t;
// -------------------------------------------------------------------------------------
// global static variables
static targetAction_t *lastDp = (targetAction_t*)nil; // last chained perform struct
static mutex_t performMutex = (mutex_t)nil; // perform mutex
static port_t performPort = (port_t)nil; // perform message port
static cthread_t mainThread = (cthread_t)nil; // main thread
// -------------------------------------------------------------------------------------
@implementation Object(ThreadPerform)
// -------------------------------------------------------------------------------------
// target/action structure alloc/free
// -------------------------------------------------------------------------------------
/* allocate target/action structure */
static targetAction_t
*allocActionStruct(id self, SEL aSel, id arg0, id arg1, int count, BOOL wait)
{
targetAction_t *dp = NXZoneMalloc([self zone], sizeof(targetAction_t));
dp->sig = SIGNATURE;
dp->target = self;
dp->action = aSel;
dp->arg[0] = arg0;
dp->arg[1] = arg1;
dp->argCount = count;
dp->wait = wait;
dp->mutex = mutex_alloc();
dp->condition = condition_alloc();
dp->result = (any_t)nil;
dp->next = (targetAction_t*)nil;
return dp;
}
/* free target/action structure */
static targetAction_t *freeActionStruct(targetAction_t *dp)
{
targetAction_t *next;
mutex_lock(performMutex);
next = dp->next;
if (dp == lastDp) lastDp = (targetAction_t*)nil;
mutex_unlock(performMutex);
mutex_free(dp->mutex);
condition_free(dp->condition);
free(dp);
return next;
}
// -------------------------------------------------------------------------------------
// thread perform
// -------------------------------------------------------------------------------------
/* perform with argCount specified */
- perform:(SEL)selector with:arg1 with:arg2 argCount:(int)argCount
{
switch (argCount) {
case 0: return [self perform:selector]; break;
case 1: return [self perform:selector with:arg1]; break;
case 2: return [self perform:selector with:arg1 with:arg2]; break;
}
return (id)nil;
}
/* perform target/action in structure */
static id performActionStruct(targetAction_t *dp)
{
/* perform method */
mutex_lock(dp->mutex);
dp->result = [dp->target perform:dp->action with:dp->arg[0] with:dp->arg[1]
argCount:dp->argCount];
/* signal method completion */
condition_signal(dp->condition);
mutex_unlock(dp->mutex);
/* return results */
return (id)dp->result;
}
/* port message handler */
static void _performProc(msg_header_t *msg, void *data)
{
targetAction_t *dp = ((performMessage_t*)msg)->data;
while (dp && (dp->sig == SIGNATURE)) { // loop until no data, or wait
BOOL wait = dp->wait; // wait status save for later checking
performActionStruct(dp); // execute action
if (wait) break; // break if waiting for return
dp = freeActionStruct(dp); // free and get next structure
}
}
/* port initialization (MUST BE EXECUTE FROM MAIN THREAD ONLY!) */
// - This function will be called automatically from forkPerform:...
#define doINIT { if (mainThread == (cthread_t)nil) objectThreadPerformInit(); }
void objectThreadPerformInit()
{
char sName[256];
/* return if already initialized */
if (mainThread) return;
mainThread = cthread_self();
/* allocate perform mutex */
performMutex = mutex_alloc();
/* allocate perform port (port name is made public) */
sprintf(sName, appPortFORMAT, [NXApp appName]);
if ((port_allocate(task_self(),&performPort) == KERN_SUCCESS) &&
(port_set_backlog(task_self(),performPort,PORT_BACKLOG_MAX) == KERN_SUCCESS) &&
(netname_check_in(name_server_port,sName,PORT_NULL,performPort) == NETNAME_SUCCESS)) {
DPSAddPort(performPort, _performProc, MSG_SIZE_MAX, vpNil, NX_MODALRESPTHRESHOLD);
} else {
NXLogError("objectThreadPerfrom: Unable to allocate port for thread support");
performPort = (port_t)nil;
exit(255);
}
}
/* explicit initialization (MUST BE EXECUTE FROM MAIN THREAD ONLY!) */
+ initThreadSupport
{
doINIT;
return self;
}
/* return true if calling thread is main thread */
+ (BOOL)isMainThread { return (mainThread == cthread_self()); }
- (BOOL)isMainThread { return (mainThread == cthread_self()); }
// -------------------------------------------------------------------------------------
// port message perform support
// multiple calls to 'mainThreadPerform:with:wait:' with wait:NO will be chained together
// if the prior call has not completed execution. This is done to reduce the load on
// mach_port usage.
// -------------------------------------------------------------------------------------
/* send perform message to port */
static BOOL _sendPerformToPort(port_t portId, void *data)
{
performMessage_t pm;
msg_return_t err;
msg_timeout_t timeout = 45000; // 45 seconds
/* check for valid port */
if (!portId) return YES;
/* set up the header */
pm.hdr.msg_simple = TRUE; // data is inline
pm.hdr.msg_size = sizeof(performMessage_t);
pm.hdr.msg_type = MSG_TYPE_NORMAL;
pm.hdr.msg_remote_port = portId; // destination port
pm.hdr.msg_local_port = PORT_NULL;
pm.hdr.msg_id = 0; // receiver message type
/* set up the typeDescriptor */
pm.type.msg_type_name = MSG_TYPE_CHAR;
pm.type.msg_type_size = 8; // 8 bits / byte
pm.type.msg_type_inline = TRUE; // data is inline
pm.type.msg_type_number = sizeof(targetAction_t*);
/* set up the data */
pm.type.msg_type_longform = FALSE;
pm.type.msg_type_deallocate = FALSE; // do not deallocate
pm.data = data; // the data
/* send message and return results */
err = msg_send((msg_header_t*)&pm, (msg_option_t)SEND_TIMEOUT, timeout);
return (err == SEND_SUCCESS)? NO: YES;
}
/* local mainThreadPerform method */
static id _mainThreadPerform(port_t portId, targetAction_t *dp)
{
any_t result;
/* check non-wait request */
if (!dp->wait) {
mutex_lock(performMutex);
mutex_lock(dp->mutex);
if (lastDp) lastDp->next = dp;
else _sendPerformToPort(portId, (void*)dp);
lastDp = dp;
mutex_unlock(dp->mutex);
mutex_unlock(performMutex);
return (id)nil;
}
/* send message and wait for return */
mutex_lock(dp->mutex);
if (!_sendPerformToPort(portId, (void*)dp)) condition_wait(dp->condition, dp->mutex);
mutex_unlock(dp->mutex);
result = dp->result;
/* free structure */
freeActionStruct(dp);
return (id)result;
}
/* perform selector from main thread (no args) */
- mainThreadPerform:(SEL)aSelector wait:(BOOL)waitForReturn
{
targetAction_t *dp;
doINIT; /* just in case: we better be the main thread if this is executed! */
if ([self isMainThread]) return [self perform:aSelector];
dp = allocActionStruct(self, aSelector, (id)nil, (id)nil, 0, waitForReturn);
return _mainThreadPerform(performPort, dp);
}
/* perform selector from main thread (1 arg) */
- mainThreadPerform:(SEL)aSelector with:anArg wait:(BOOL)waitForReturn
{
targetAction_t *dp;
doINIT; /* just in case: we better be the main thread if this is executed! */
if ([self isMainThread]) return [self perform:aSelector with:anArg];
dp = allocActionStruct(self, aSelector, anArg, (id)nil, 1, waitForReturn);
return _mainThreadPerform(performPort, dp);
}
/* perform selector from main thread (2 args) */
- mainThreadPerform:(SEL)aSelector with:anArg0 with:anArg1 wait:(BOOL)waitForReturn
{
targetAction_t *dp;
doINIT; /* just in case: we better be the main thread if this is executed! */
if ([self isMainThread]) return [self perform:aSelector with:anArg0 with:anArg1];
dp = allocActionStruct(self, aSelector, anArg0, anArg1, 2, waitForReturn);
return _mainThreadPerform(performPort, dp);
}
// -------------------------------------------------------------------------------------
// fork thread support
// -------------------------------------------------------------------------------------
/* forked method router */
static void threadRouter(targetAction_t *dp)
{
/* wait here until parent thread is ready */
mutex_lock(dp->mutex);
mutex_unlock(dp->mutex);
/* execute thread */
performActionStruct(dp); // execute action
freeActionStruct(dp); // free structure
/* terminate thread */
cthread_exit(0); // terminate thread
}
/* fork method thread */
- (cthread_t)forkPerform:(SEL)aSelector with:arg0 with:arg1 argc:(int)cnt detach:(BOOL)detach
{
cthread_t cthread;
targetAction_t *dp = allocActionStruct(self, aSelector, arg0, arg1, cnt, NO);
/* initialize if necessary (for the first time, we better be the main thread!)*/
doINIT;
/* fork thread */
mutex_lock(dp->mutex);
cthread = cthread_fork((cthread_fn_t)threadRouter, (any_t)dp);
if (detach) cthread_detach(cthread);
mutex_unlock(dp->mutex);
cthread_yield(); // allow thread to run
return (detach)? (cthread_t)nil: cthread; // handle may not be valid if detached
}
/* fork method thread */
- (cthread_t)forkPerform:(SEL)aSelector detach:(BOOL)detach
{
return [self forkPerform:aSelector with:(id)nil with:(id)nil argc:0 detach:detach];
}
/* fork method thread */
- (cthread_t)forkPerform:(SEL)aSelector with:anArg detach:(BOOL)detach
{
return [self forkPerform:aSelector with:anArg with:(id)nil argc:1 detach:detach];
}
/* fork method thread */
- (cthread_t)forkPerform:(SEL)aSelector with:anArg0 with:anArg1 detach:(BOOL)detach
{
return [self forkPerform:aSelector with:anArg0 with:anArg1 argc:2 detach:detach];
}
@end