home *** CD-ROM | disk | FTP | other *** search
- /*
- * threads.c
- *
- * Main implementation for threads.
- * 2/4/88
- * bnb
- * streamline free pool code
- *
- * Last modified: 1/6/88
- * by: bnb
- * reason: remove knowledge of specific sycnchroobjects
- *
- * Last modified: 1/2/88
- * by: bnb
- * reason: willjoin and join
- *
- *
- * Last modified: 12/21/87
- * by: bnb
- * reason: nuke old version (non asm) of swtch
- *
- * Last modified: 8/15/88
- * by: chase
- * reason: VAX/Unix and other support etc.
- */
-
- #define _THREADS_C
-
- #include "presto.h"
- #include "machdep.h"
- #include "atomic_int.h" /* for counter */
-
- //
- // thisthread ALWAYS refers to the currently running thread in the
- // context of the user. thisthread must be private to each processor,
- // but is set on context switches appropriately.
- // Use staticthread to give us something to reference into before
- // the system gets going during the static construction.
- //
-
- private_t Thread staticthread(-1);
- Thread *systhread = &staticthread;
-
- private_t Thread *thisthread = &staticthread;
- private_t int _rtmp; // must be private (hack to simplify asm movs)
-
- //
- // maintain a free list of thread templates.
- //
-
- #define THREADPOOL_HIWATER 100 /*save only this much*/
- static shared_t ThreadQ t_free(TS_ANY);
- static shared_t AtomicInt t_freetag;
-
- //
- //
- // Some notes on constructors:
- // Automatic constructors of base and sub classes are painful.
- // If we are resurrecting an old thread, we don't need to go
- // through all of the rigamarole to reinit its member
- // elements. Instead, when first constructed,
- // we call "init" on them, and
- // let them do the work involved in making themnselves for the
- // first time. After that, they are presumed to be "reusable"
- // after calling "reinit".
- //
-
- Thread::Thread(char *name, int tid, long ssiz, int musthavestack)
- : (OBJ_THREAD, name)
- {
- Thread *t;
-
- if (tid < 0)
- error("Invalid thread id");
- if (ssiz < 0)
- error("Negative stack size");
-
- if (this == 0) { // no derived class
- t = t_free.get();
- if (t == 0) // can't find anything
- t = (Thread*)new char[sizeof(Thread)];
- t->t_slockcount = 0;
- t->t_callstate.init(); // intentional ctor
- t->t_expired = 0;
- this = t;
- } else { // derived class
- this = this;
- this->t_slockcount = 0;
- this->t_callstate.init();
- this->t_expired = 0;
- }
-
- #ifdef debug
- dout << form("thisthread creating thread %s (%x)\n",
- name, this);
- #endif debug
-
- t_flags = 0;
- t_data = (Objany)0;
-
- if (ssiz == 0) { // run on existing stack
- t_stack = new Stack(0);
- t_flags = TF_KEEPSTACK;
- } else if (musthavestack) {
- t_stack = new Stack(ssiz);
- } else {
- t_neededstacksize = ssiz; // union
- t_flags |= TF_INCOMPLETE;
- }
-
- t_csp = 0;
- t_fp = 0;
- // t_stack already set
- t_state = TS_IDLE;
- // t_flags already set
- t_tag = t_freetag++;
- t_tid = (tid > 0)? tid : t_tag;
- t_pri = TP_BASEPRIO;
- t_proc = 0; // running on
- t_blockedon = 0; // waiting on
- t_jthread = 0; // joining thread
- }
-
- //
- // Thread constructor for staticthread.
- //
-
- extern Process staticproc;
-
- Thread::Thread(int tid)
- {
- setflags(TF_SCHEDULER | TF_NONPREEMPTABLE);
- setproc(&staticproc);
- t_expired = 0;
- t_slockcount = 0;
- t_tid = tid;
- }
-
- //
- // "Virtual" constructor
- //
- Thread*
- Thread::newthread(char *name, int tid, long ssiz, int musthavestack)
- {
- return new Thread(name,tid,ssiz,musthavestack);
- }
-
-
- //
- // Delete a thread:
- // When a thread comes here, it will never run again.
- // Always return it's stack to the free pool.
- // If the thread will be joined upon, don't restore it to
- // the thread free pool, otherwise do.
- //
- // if we make it to the final return, the regular deallocation
- // will take place.
- //
- //
- // We make sure that the guy being deleted is
- // actually finished. Eventually, we should be
- // able to halt him midstream, for now, we just
- // wait until he is done. This is used by join
- // to make sure that we don't destroy a thread
- // before it has finished executing.
- //
- Thread::~Thread()
- {
- if (this == &staticthread) {
- this = 0;
- return;
- }
-
- while (t_state&TS_RUNNING)
- ;
-
- if (((t_state&TS_FINISHED) == 0) && ((t_state&TS_DELETE) == 0))
- error("Destroying an unfinished thread");
-
- delete t_stack; // always reuse
-
- if ((t_state&TS_DELETE) == 0 && t_free.length() < THREADPOOL_HIWATER) {
- t_free.append(this);
- this = 0;
- }
-
- // if this != 0, regular deallocation will occur here.
- }
-
-
- //
- // Start a thread running in some invocation function of an object.
- // We save the callstate from the the start call and use it
- // later when we actually get this puppy scheduled. We are
- // trying not to be machine dependent here, but we are still
- // assuming:
- // - sizeof(Objany) == sizeof(PFany) == sizeof(int*)
- // - args are pushed in the stack in a sensible order so
- // we can figure out what comes next
- // -- that we can figure out how many longwords we
- // were called with through some func nargs().
- // On the vax this is just an inspection of *(ap). On
- // the ns32000 we can get away with using libpps's nargs()
- // though what they are doing is very simple (see nargs.c
- // in the distribution directory).
- //
- // Fork:
- // if (this == thisthread)
- // then we just start up an asynchronous invocation using
- // an anonymous
- // thread having the same params as the starting thread.
- // This allows people to make asynchronous invocation
- // without having to create threads explicitly. First arg
- // specifies if they will join on the thread or not. Forked
- // thread is returned.
- //
- //
-
- #define NUMSTARTARGS 3
-
- int
- Thread::start(Objany obj, PFany pf, ...)
- {
- extern int nargs();
-
- if ( (t_state&TS_IDLE) == 0)
- error("Can't start an invocation in a busy thread");
-
- //
- // We don't want the hidden first arg this, obj, or pf.
- // -NUMSTARTARGS is the first argument that we need to save.
- //
- /*XX MACHDEP */
- t_callstate.set(pf, nargs()-NUMSTARTARGS, ((int*)(&pf)+1));
- t_start1(obj);
- return 0;
- }
-
-
- #define NUMFORKARGS 4
-
- Thread*
- Thread::fork(int needjoin, Objany obj, PFany pf, ...)
- {
- register Thread* startthread;
- extern int nargs();
-
- if (this == thisthread) {
- // newthread here
- startthread = newthread(name(), 0, stack()->size());
- if (startthread == 0)
- thisthread->error("Can't create asynch invocation");
- } else {
- // can't control thread if its not thisthread
- // must disallow forking on threads other than thisthread
- this->error("Can only fork off of thisthread");
- }
-
- if (needjoin == TF_WILLJOIN)
- startthread->willjoin();
-
- //
- // We don't want the hidden first arg this, needjoin, obj, or pf.
- // -NUMFORKARGS is the first argumennt that we need to save.
- //
- /*XX MACHDEP */
- startthread->t_callstate.set(pf, nargs()-NUMFORKARGS, ((int*)(&pf)+1));
- startthread->t_start1(obj);
- return startthread;
- }
-
-
-
- //
- // Pass new thread to the scheduler
- //
- void
- Thread::t_start1(Objany obj)
- {
- extern shared_t ThreadQ *preschedthreads;
- t_boundobj = obj;
- setstate(TS_VIRGIN);
- if ((t_flags&TF_SCHEDULER)==0) {
- // don't schedule the scheduler
- if (sched)
- sched->begin(this);
- else { // must deal with early thread
- if (preschedthreads == 0)
- preschedthreads = new ThreadQ(TS_VIRGIN);
- preschedthreads->append(this);
- }
- }
- return;
- }
-
-
- //
- // Spinning wait for thread to stop running
- //
- void
- Thread::isrunning2()
- {
- while (t_state&TS_RUNNING)
- ;
- }
-
-
-
- //
- // Why do we need to know why we woke up? I don't think so
- //
-
- void
- Thread::wakeup(SynchroObject* so = 0)
- {
- //
- // Hold off on waking a thread that is running, but going to sleep.
- // Not doing so may cause us to trash the thread's object fields.
- //
- isrunning2();
-
- if ( (t_state & (TS_BLOCKED)) == 0)
- error("Waking a non-blocked object");
-
- if (so && t_blockedon != so)
- error("Wokeup on the wrong thing!");
-
- andstate(~(TS_BLOCKED));
- t_blockedon = 0;
- sched->resume(this);
- }
-
-
- int
- Thread::run()
- {
- int results;
- Thread *schedthread = thisthread;
-
- this->setproc(thisproc);
-
- thisthread = this;
-
- schedthread->isnotrunning();
- this->isrunning();
- results = this->runrun(); // BOOM
- this->isnotrunning();
-
- // if finished, and nobody will join on this thread, delete it.
- if (t_state & TS_FINISHED)
- if ((t_flags & TF_WILLJOIN) == 0)
- delete this;
-
- thisthread = schedthread;
-
- schedthread->isrunning();
- return results;
- }
-
-
- //
- // runrun:
- //
- // This is a very central and somewhat difficult routine.
- //
- //
- // Begin the invocation in the invocation object using the current
- // thread running in the current processor
- //
- // This is essentialy a synchronous fork. Control returns the processor
- // thread when forked thread blocks or terminates via a swtch back
- // from the invocation function.
- //
- // We don't worry about saving the regs on the right stack when we do
- // the stack switch inside the virgin test. No register context ever
- // needs to be restored since threads, once they terminate, never return.
- //
-
- int
- Thread::runrun()
- {
- int dummy; /// MUST BE FIRST
- #ifdef vax
- int _rtmp = 0; /// MUST BE SECOND
- #endif vax
-
- #ifdef debug
- dout << form("runrun: begin thread %x\n", this);
- #endif debug
-
- // t_timer.timerstart(); XXX
- t_expired = 0;
-
- if (t_state & TS_VIRGIN) {
- //
- // Thread must be nonpreemptable while we are first
- // initializing it (a swtch back before it has enough
- // state to determine to whom we should switch would
- // be bad news)
-
- //
- // if we are not a whole thread, become one. Force
- // Stack to be something of size t_neededstacksize;
- //
- if (t_flags&TF_INCOMPLETE) { // try again
- t_flags &= ~TF_INCOMPLETE;
- t_stack = new Stack(t_neededstacksize);
- if (t_stack == 0)
- error("Null stack in runrun");
- }
-
- #if (i386 || sun)
- //
- // Insure stack of calling (process) thread is cleanly restored
- // (including registers) when new thread next switches.
- //
- // Do this cleanly -- push a proper "swtch" context in
- // current stack, and return on new stack. When the
- // thread swtch()'s back, will return from runrun(), but
- // with proper register state. Must insure runrun()
- // doesn't have any register variables (if so, need to get
- // them from the stack). Older technique (ns32000,
- // anyhow) has stack-pointer pass frame-pointer, then
- // adjust frame pointer back on 1st swtch() back from new
- // thread, and restores "random" register contents.
- //
- // First call to runrun() by "slave" process thread starts
- // process thread itself; in this case, TF_KEEPSTACK is on.
- // Since keeping calling stack, have the new stack top out
- // at the dummy variable in this procedure (leaving enough
- // room for swtch() context).
- //
- extern init_stack(Thread*,int*);
- init_stack(this, (this->t_flags&TF_KEEPSTACK)
- ? &dummy-16 : t_stack->top());
- #endif (i386 || sun)
-
- #if (ns32000 || vax)
- t_fp = FP(dummy);
- if ((this->t_flags&TF_KEEPSTACK) == 0) {
- //
- // remember current sp
- //
- #ifdef ns32000
- { asm("sprd sp, __rtmp"); }
- #endif
- #ifdef vax
- { asm("movl sp, -8(fp)"); }
- #endif
- t_csp = (int*)_rtmp;
-
- //
- // load new sp with the top of the new stack
- //
- _rtmp = (int)(t_stack->top());
- // lose any stored regs from previous context...
- // but not important.
- #ifdef vax
- { asm("movl -8(fp), sp"); }
- #endif
- #ifdef ns32000
- { asm("lprd sp, __rtmp"); }
- #endif
- }
- #endif (vax || ns32000)
-
- // We invoke the saved callstate with a "zero" sp
- // indicator to say "just go ahead and use whatever
- // stack you are currently running on.
- //
-
- (void)this->t_callstate.call(0,t_boundobj);
-
- // NOT REACHED:
- // We can never return normally from the invocation
- // since the fp which is created on the call points
- // back to the stack of the processor thread which
- // is handling the invocation. Since its not likely
- // that processor's thread will be in the same state
- // when the invocation return we can't use it for
- // anything. callstate->call must swtch back to us, after
- // returning the thread to the free pool.
- //
- // We wouldn't have to worry about any of this if we
- // were running on a uniprocessor
- //
- error("Return to runrun!");
-
- } else {
- (void)swtch();
- //
- // NOTREACHED
- //
- }
- return 0; // for the compiler only
- }
-
-
- //
- // Go to sleep on a synchronization object.
- // The synchronization object has already taken care of the semantics
- // of going to sleep. We just change our state and conk out...
- //
- void
- Thread::sleep(SynchroObject* so = 0)
- {
-
- if (this != thisthread)
- error("this!=thisthread in sleep. HELP!");
-
- t_blockedon = so;
-
- if ((t_state&TS_RUNNING)==0)
- error("Can't block a non running thread");
-
- (void)swtch();
- // wakeup here
- }
-
-
- double
- Thread::cputime()
- {
- return 0;
- }
-
- int
- Thread::canpreempt()
- {
- if ((t_state&TS_RUNNING) && // is running
- ((t_flags&TF_NONPREEMPTABLE) == 0) && // is preemptable
- ( t_slockcount == 0) && // not in spinlock cs
- ((t_state&TS_VIRGIN) == 0) && // is not a virgin
- t_expired) // has run long enough
- {
- t_expired = 1;
- return 1;
- } else {
- t_expired = 1;
- return 0;
- }
- }
-
- //
- // Threads come here when they want to terminate. If they send
- // any arguments, it will be returned to any thread which waits
- // to join on thisthread (assuming someone has marked it as TF_WILLJOIN)
- //
- // All joining (and joinee) threads serialize at a single j_lock.
- // Until this is demonstrated to be a problem, it will stand.
- //
- // Note: this code is called in the context of the terminating
- // thread. The thread destructor is automatically invoked in the
- // context of the scheduler thread when we return to it via swtch().
- //
-
- static shared_t Spinlock j_lock; // share single lock
-
- void
- Thread::terminate(Objany retobj=0)
- {
- int rtmp;
-
- if (this != thisthread) {
- thisthread->error("terminate: Can only finish self");
- return;
- }
-
- if (t_flags&TF_WILLJOIN) {
- Thread *t;
- j_lock.lock();
-
- orstate(TS_FINISHED);
- t = t_jthread; // shared union
- t_jvalue = retobj;
- if (t) { // someone waiting
- j_lock.unlock();
- t->wakeup((SynchroObject*)this);
- } else
- j_lock.unlock();
- } else
- orstate(TS_FINISHED);
-
- #ifdef debug
- dout << form("terminate: thread %x t_csp %x t_fp %x\n",
- this, t_csp, t_fp);
- #endif debug
-
- this->swtch();
- // NOT REACHED
- return;
- }
-
- //
- // Join:
- //
- // If a thread is joining on another thread, block that thread
- // until the joined thread terminates. When it does, the joiner
- // is responsible for deleting it.
- //
-
-
- void
- Thread::willjoin()
- {
- if (t_flags&TF_WILLJOIN)
- thisthread->error("Can only join once on a thread");
- else
- t_flags |= TF_WILLJOIN;
- }
-
- Objany
- Thread::join()
- {
- Thread *me = thisthread;
-
- #ifdef debug
- dout << form("join: thread %x joining on %x\n", thisthread, this);
- #endif debug
-
- // want to join on someone else. Wait until they finish
- //
-
- if ( (t_flags&TF_WILLJOIN) == 0)
- error("Can't join on a free thread");
-
- j_lock.lock();
- if (this->state()&TS_FINISHED) { // thread is done,
- j_lock.unlock(); // return results
- } else { // sleep on "this"
- me->orstate(TS_BLOCKED);
- t_jthread = me; // careful of coercion
- me->nonpreemptable();
- j_lock.unlock();
- #ifdef debug
- dout << "join: thread not finished, sleeping\n";
- #endif debug
- me->sleep((SynchroObject*)this);
- me->preemptable();
- }
-
- if ((this->state()&TS_FINISHED) == 0)
- this->error("erroneous join");
-
- Objany retobj = t_jvalue;
- delete this; // will wait for other to terminate
- return retobj;
- }
-
- /*
- void
- Thread::setaffinity(Process* p)
- {
- t_proc = p; // shouldn't really use t_proc here
- }
- */
-
- void
- Thread::print(ostream& s)
- {
- s << "Thread:";
- Object::print(s); // get base class to show
- s << "\n";
- s << form("t_sp=0x%x, t_sp.limit=0x%x t_ssz=0x%x, t_csp=0x%x, t_fp=0x%x\n",
- stack()->top(), stack()->limit(),stack()->size(), t_csp, t_fp) <<
- form("t_state=0x%x, t_flags=0x%x, t_tag=%d, t_tid=%d, t_pri=%d\n",
- t_state, t_flags, t_tag, t_tid, t_pri);
- // s << "t_timer=" << &t_timer << "\n";
- s << "t_proc=" << t_proc << "\n";
- s << "t_callstate=" << t_callstate << "\n";
- s << "t_blockedon=" << t_blockedon << "\n";
- }
-
-
-
- ThreadQUnlocked::ThreadQUnlocked(int neededstate, Thread *t = 0) :(t)
- {
- tq_neededstate = neededstate;
- }
-
-
- ThreadQUnlocked::~ThreadQUnlocked()
- {
- Thread *t;
- while (t = get()) {
- t->setstate(TS_DELETE);
- delete (Thread*)t;
- }
- }
-
-
-
- ThreadQ::ThreadQ(int neededstate, Thread *t = 0) : (t)
- {
- tq_lock = new Spinlock;
- tq_neededstate = neededstate;
- tq_length = 0;
- }
-
-
- //
- // Delete a threadQ and all of the threads sitting on it.
- // We do not bother to return the threads to the reclaimq.
- // (in fact, we don't even bother to invoke the threads potentially
- // virtual constructor becuase the derived thread class may have
- // already seen this thread destroyed. Memory management needs to be
- // separated from object cleanup.
- //
- ThreadQ::~ThreadQ()
- {
- Thread *t;
- while (t = get()) {
- t->setstate(TS_DELETE);
- delete (Thread*)t;
- }
- }
-
- int
- Thread::tagcnt()
- {
- return int(t_freetag);
- }
-
-
- void
- ThreadQ::print(ostream& s)
- {
- s << form("(ThreadQ)this=0x%x, tq_neededstate=0x%x, tq_length=0x%x",
- this, tq_neededstate, tq_length);
- s << " tq_lock= " << tq_lock << "\n";
- s << "\tElements: ";
- Oqueue::print(s);
- }
-
- void
- ThreadQUnlocked::print(ostream& s)
- {
- s << form( "(ThreadQUnlocked)this=0x%x, tq_neededstate=0x%x",
- this, tq_neededstate);
- s << "\tElements: ";
- Oqueue::print(s);
- }
-