home *** CD-ROM | disk | FTP | other *** search
/ Aminet 18 / aminetcdnumber181997.iso / Aminet / dev / m2 / CycloneModules.lha / modules / txt / Threads.mod < prev    next >
Text File  |  1996-11-13  |  10KB  |  527 lines

  1. IMPLEMENTATION MODULE Threads;
  2. (*f*)
  3.  
  4. (* Copyright (c) Robert Ennals
  5. ** This provides tools for easy management of threads within Cyclone
  6. *)
  7.         
  8. (*$ ReLoadA4+ *)
  9.  
  10. FROM InOut      IMPORT WriteString, WriteCard, WriteLn;
  11. FROM DosD       IMPORT ProcessPtr, NpTags, ctrlC, ctrlE, ctrlF;
  12. FROM DosL       IMPORT CreateNewProc, Delay;
  13. FROM SYSTEM     IMPORT TAG, CAST, ADR, ADDRESS, REG, LONGSET;
  14. FROM ExecD      IMPORT ExecBase, execBase, ExecBasePtr, TaskPtr;
  15. FROM ExecL      IMPORT Forbid, Permit, Signal, Wait;
  16. FROM UtilityD   IMPORT tagUser;
  17. FROM Heap       IMPORT Allocate, Deallocate;
  18. FROM String     IMPORT Copy;
  19. FROM ModulaLib  IMPORT Raise;
  20.  
  21. (*e*)
  22.  
  23. VAR
  24. (*f*)
  25.     RootThread  : Thread;
  26. (*e*)
  27.  
  28. CONSTRUCTOR ThreadStream.Init;
  29. (*f*)
  30. BEGIN
  31.     Dead                := FALSE;
  32.     head                := NIL;
  33.     tail                := NIL;
  34.     CurrentItems        := 0;
  35.     ReadThread          := NIL;
  36.     WriteThreads.head   := NIL;
  37.     WriteThreads.tail   := NIL;
  38.     ReadTrigger         := FALSE;
  39.     WriteTriggers       := 0;
  40.     Limit               := 8;
  41. END ThreadStream.Init;
  42. (*e*)
  43.  
  44. PROCEDURE ThreadStream.StreamSync;
  45. (*f*)
  46. BEGIN
  47.     IF Dead=TRUE THEN Raise(2) END;
  48. END ThreadStream.StreamSync;
  49. (*e*)
  50.  
  51. PROCEDURE ThreadStream.Finish;
  52. (*f*)
  53. VAR
  54.     curth   : ThreadNodePtr;
  55.  
  56. BEGIN
  57.     Dead:=TRUE;
  58.  
  59.     (* wake up any threads waiting FOR the stream *)
  60.  
  61.     IF WriteTriggers>0 THEN
  62.     curth := WriteThreads.head;
  63.     WHILE curth<>NIL DO
  64.         Signal(curth^.tn_Thread^^.th_Task, LONGSET{ctrlE});
  65.         curth:=curth^.tn_Next;
  66.     END;
  67.     END;
  68.  
  69.     IF (ReadTrigger=TRUE) AND (ReadThread<>NIL) THEN
  70.     Signal(ReadThread^^.th_Task,LONGSET{ctrlE});
  71.     END;
  72.  
  73. END ThreadStream.Finish;
  74. (*e*)
  75.  
  76. PROCEDURE ThreadStream.Get() : ADDRESS;
  77. (*f*)
  78.     (* Read an item FROM the stream AND wake up anybody waiting FOR
  79.     ** you TO DO so
  80.     *)
  81. VAR
  82.     outAdr  : ADDRESS;
  83.     node    : ts_NodePtr;
  84.     curth   : ThreadNodePtr;
  85.  
  86. BEGIN
  87.  
  88.     (* First DO normal lists stuff *)
  89.  
  90.     StreamSync;
  91.  
  92.     outAdr:=NIL;
  93.  
  94.     Forbid;
  95.  
  96.     node := head;
  97.     IF node<>NIL THEN
  98.     IF node^.data <> NIL THEN;
  99.         outAdr := node^.data;
  100.  
  101.         IF tail = node THEN
  102.         head:=NIL;
  103.         tail:=NIL;
  104.         ELSE
  105.         head := node^.next;
  106.         head^.prev := NIL;
  107.         END;
  108.  
  109.         Deallocate(node);
  110.         DEC(CurrentItems);
  111.     END;
  112.     END;
  113.  
  114.     Permit;
  115.  
  116.  
  117.     (* Now signal any Write threads that are waiting TO
  118.     ** Write TO the stream *)
  119.  
  120.     IF outAdr<>NIL THEN
  121.     IF WriteTriggers>0 THEN
  122.         curth := WriteThreads.head;
  123.         WHILE curth<>NIL DO
  124.         Signal(curth^.tn_Thread^^.th_Task, LONGSET{ctrlE});
  125.         curth:=curth^.tn_Next;
  126.         END;
  127.     END;
  128.     END;
  129.  
  130.     RETURN outAdr;
  131.  
  132. END ThreadStream.Get;
  133. (*e*)
  134.  
  135. PROCEDURE ThreadStream.GetWait() : ADDRESS;
  136. (*f*)
  137. VAR
  138.     result  : ADDRESS;
  139.     th      : Thread;
  140.  
  141. BEGIN
  142.     th := ThisThread();
  143.  
  144.     result := Get();
  145.     IF result = NIL THEN
  146.     ReadThread := th^.BackPtr;
  147.     ReadTrigger := TRUE;
  148.     REPEAT
  149.         result:=Get();
  150.  
  151.         IF result = NIL THEN
  152.         ReSync;
  153.         Sleep;
  154.         END;
  155.     UNTIL result<>NIL;
  156.     END;
  157.     ReadTrigger:=FALSE;
  158.  
  159.     RETURN result;
  160.  
  161. END ThreadStream.GetWait;
  162. (*e*)
  163.  
  164. PROCEDURE ThreadStream.Send(item : ADDRESS) : BOOLEAN;
  165. (*f*)
  166. VAR
  167.     node    : ts_NodePtr;
  168.     th      : Thread;
  169.  
  170. BEGIN
  171.  
  172.     StreamSync;
  173.  
  174.     (* DO the standard list stuff *)
  175.  
  176.     Forbid;
  177.  
  178.     IF ((CurrentItems=Limit) AND (Limit<>0)) THEN
  179.     (* It is already full *)
  180.  
  181.     Permit;
  182.  
  183.     RETURN FALSE;
  184.  
  185.     ELSE
  186.  
  187.     INC(CurrentItems);
  188.  
  189.     Allocate(node, SIZE(ts_Node));
  190.     node^.prev := tail;
  191.     node^.next := NIL;
  192.     node^.data := item;
  193.  
  194.     IF node^.prev#NIL THEN node^.prev^.next:=node; ELSE head:=node; END;
  195.     tail := node;
  196.  
  197.     Permit;
  198.  
  199.     (* Now signal the server IF it is waiting FOR input *)
  200.  
  201.     IF (ReadTrigger=TRUE) AND (ReadThread<>NIL) THEN
  202.  
  203.         Signal(ReadThread^^.th_Task,LONGSET{ctrlE});
  204.  
  205.     END;
  206.  
  207.     RETURN TRUE;
  208.     END;
  209.  
  210. END ThreadStream.Send;
  211. (*e*)
  212.  
  213. PROCEDURE ThreadStream.SendWait(item : ADDRESS);
  214. (*f*)
  215. VAR
  216.     result  : BOOLEAN;
  217.     node    : ThreadNode;
  218.     tth     : Thread;
  219.  
  220. BEGIN
  221.     result:=Send(item);
  222.  
  223.     IF result=FALSE THEN
  224.     INC(WriteTriggers);
  225.     (* add ourselves TO the waiting list *)
  226.  
  227.     Forbid; (* Nothing ELSE can cut IN WHILE we insert an item *)
  228.     tth:=ThisThread();
  229.     node.tn_Thread:=ADR(tth);
  230.     node.tn_Prev:=WriteThreads.head;
  231.     WriteThreads.head^.tn_Next:=ADR(node);
  232.     node.tn_Next:=NIL;
  233.     WriteThreads.head:=ADR(node);
  234.     Permit;    
  235.  
  236.     REPEAT
  237.         ReSync;
  238.         result:=Send(item);
  239.         Sleep;
  240.     UNTIL result=TRUE;
  241.  
  242.     (* Now remove ourselves FROM the list *)
  243.  
  244.     Forbid; (* Nothing ELSE can cut IN WHILE we insert an item *)
  245.     WriteThreads.head:=node.tn_Prev;
  246.     WriteThreads.head^.tn_Next:=NIL;
  247.     Permit; 
  248.     END;
  249. END ThreadStream.SendWait;
  250. (*e*)
  251.  
  252. PROCEDURE StreamRead () : ADDRESS;
  253. (*f*)
  254. VAR
  255.     th : Thread;
  256.     res : ADDRESS;
  257.  
  258. BEGIN
  259.     th := ThisThread();
  260.     res := th^.InStream^.GetWait();
  261.     RETURN res;
  262. END StreamRead;
  263. (*e*)
  264.  
  265. PROCEDURE StreamWrite (item : ADDRESS);
  266. (*f*)
  267. VAR
  268.     th : Thread;
  269.  
  270. BEGIN
  271.     th := ThisThread();
  272.     th^.OutStream^.SendWait(item);
  273. END StreamWrite;
  274. (*e*)
  275.  
  276. PROCEDURE FinishOutStream;
  277. (*f*)
  278. VAR
  279.     th : Thread;
  280.  
  281. BEGIN
  282.     th := ThisThread();
  283.     th^.OutStream^.Finish;
  284. END FinishOutStream;
  285. (*e*)   
  286.  
  287. PROCEDURE FinishInStream;
  288. (*f*)
  289. VAR
  290.     th : Thread;
  291.  
  292. BEGIN
  293.     th := ThisThread();
  294.     th^.InStream^.Finish;
  295. END FinishInStream;
  296. (*e*)  
  297.  
  298. PROCEDURE ThisThread () : Thread;
  299. (*f*)
  300. VAR
  301.     th  : Thread;
  302.  
  303. BEGIN
  304.     th:=(CAST(Thread, execBase^.thisTask^.userData));
  305.     RETURN th;
  306. END ThisThread;
  307. (*e*)
  308.  
  309. PROCEDURE LaunchStub;
  310. (*f*)
  311.  
  312. (* Launchstub shields your thread PROCEDURE FROM the tricky bits
  313. ** OF thread syncronisation AND terminations
  314. *)
  315.  
  316. VAR
  317.     th      : Thread;
  318.  
  319.  
  320. BEGIN
  321.     (* First we will run the PROCEDURE *)
  322.  
  323.     th := ThisThread();
  324.  
  325.     TRY
  326.     th^.th_Proc;
  327.     FINALLY
  328.     (* This should call our PROCEDURE *)
  329.  
  330.     (* Now the PROCEDURE has terminated so we need TO shut down
  331.     ** We do this by sending a signal to our parent and clearing
  332.     ** Thread.Th_Task
  333.     *)
  334.  
  335.     Forbid;
  336.  
  337.     (* We send our parent a ctrlE signal incase they are waiting FOR
  338.     ** us TO terminate
  339.     *)
  340.  
  341.     th^.Flags := th_flags{th_Dead};
  342.     th^.Dead := TRUE;
  343.  
  344.     Signal(th^.Parent^.th_Task,LONGSET{ctrlE});
  345.     END;
  346.  
  347. END LaunchStub;
  348. (*e*)
  349.  
  350. PROCEDURE Nothing;
  351. (*f*)
  352. BEGIN
  353.  
  354. END Nothing;
  355. (*e*)
  356.  
  357. PROCEDURE Thread.Start;
  358. (*f*)
  359.  
  360. VAR
  361.     TagBuffer   : ARRAY [0..10] OF LONGINT;
  362.     TagAdr      : ADDRESS;
  363.     outproc     : ProcessPtr;
  364.  
  365. BEGIN
  366.  
  367.     Parent      := ThisThread();
  368.     Flags       := th_flags{};
  369.     BackPtr     := ADR(Self);
  370.  
  371.     TagAdr:=TAG(TagBuffer,npEntry,ADR(LaunchStub),npName,ADR(th_Name),npPriority,th_Pri,0,0);
  372.  
  373.     Forbid; (* make sure the Thread doesn't start before we are ready *)
  374.     outproc     := CreateNewProc(TagAdr);
  375.     th_Task  := ADR(outproc^.task);
  376.     th_Task^.userData := Self;
  377.     Dead     := FALSE;
  378.     IF InStream<>NIL THEN
  379.         InStream^.ReadThread := ADR(Self);
  380.     END;
  381.     Permit; (* Now it can run *)
  382.  
  383. END Thread.Start;
  384. (*e*)
  385.  
  386. PROCEDURE SetupThread(VAR th : Thread; Proc : ThreadProc; Name : ARRAY OF CHAR ; Pri : LONGINT ; InStream, OutStream : ThreadStream);
  387. (*f*)
  388.  
  389. BEGIN
  390.     th^.Version      := 1;
  391.     th^.th_Proc      := Proc;
  392.     Copy(th^.th_Name, Name);
  393.     th^.th_Pri       := Pri;
  394.     th^.InStream     := InStream;
  395.     th^.OutStream    := OutStream;
  396.     th^.Dead         := TRUE;
  397. END SetupThread;
  398. (*e*)
  399.  
  400. PROCEDURE Thread.Terminate;
  401. (*f*)
  402. VAR
  403.     GotSig : LONGSET;
  404.  
  405. BEGIN
  406.  
  407.     Flags := Flags + th_flags{th_Terminate};
  408.  
  409.     WHILE Dead = FALSE DO;
  410.  
  411.     Signal(th_Task,LONGSET{ctrlE});
  412.  
  413.     (* When it has finished removing itself, the endcode will send a ctrlE
  414.     ** signal TO the parent. We must wait FOR that
  415.     **
  416.     ** The signal is sent BY LaunchStub after a forbid so as TO prevent
  417.     ** the parent reading the signal before the child has quit.
  418.     *)
  419.  
  420.     GotSig:=Wait(LONGSET{ctrlE});
  421.     END;
  422.  
  423.  
  424.     (* When we have got this back it means that the Thread has terminated *)
  425.  
  426. END Thread.Terminate;
  427. (*e*)
  428.  
  429. PROCEDURE Thread.Wake;
  430. (*f*)
  431. (* Just wakes up the task by sending it a sync signal
  432. ** the Thread may put itself back TO sleep again IF what it was waiting
  433. ** FOR hasn't happened
  434. *)
  435.  
  436. BEGIN
  437.     Signal(th_Task,LONGSET{ctrlE});
  438. END Thread.Wake;
  439. (*e*)
  440.  
  441. PROCEDURE CheckTerminate() : BOOLEAN;
  442. (*f*)
  443. VAR
  444.     th  : Thread;
  445.  
  446. BEGIN
  447.     th := ThisThread();
  448.     RETURN ( th_Terminate IN th^.Flags);
  449. END CheckTerminate;
  450. (*e*)
  451.  
  452. PROCEDURE ReSync;
  453. (*f*)
  454. BEGIN
  455.     IF (CheckTerminate()=TRUE) THEN Raise(1) END;
  456. END ReSync;
  457. (*e*)
  458.  
  459. PROCEDURE Sleep;
  460. (*f*)
  461. VAR
  462.     th  : Thread;
  463.     GotSig : LONGSET;
  464.    
  465. BEGIN
  466.     th := ThisThread();
  467.  
  468.     th^.Flags := th^.Flags + th_flags{th_Sleep};
  469.     GotSig:=Wait(LONGSET{ctrlE});
  470.     th^.Flags := th^.Flags - th_flags{th_Sleep};
  471.  
  472. END Sleep;
  473. (*e*)
  474.  
  475. PROCEDURE ThreadWait(siga : LONGSET) : LONGSET;
  476. (*f*)
  477. VAR
  478.     GotSig : LONGSET;
  479.  
  480. BEGIN
  481.     GotSig := Wait(LONGSET{ctrlE} + siga);
  482.     ReSync;
  483.     RETURN GotSig;
  484. END ThreadWait;
  485. (*e*)
  486.  
  487.  
  488. BEGIN
  489. (*f*)
  490.  
  491. (* SET up the current task as being a root Thread FOR a Thread system
  492. ** Must only be called ONCE AND only IN the main Thread.
  493. **
  494. ** Currently standard DOS signals are used FOR sending messages
  495. ** CtrlE is sent TO tell a Thread TO resume IF it is waiting
  496. ** FOR a resync or to tell it to terminate (thread must check).
  497. **
  498. ** As threads DO NOT have an Input OR OutPut, these signals should NOT
  499. ** interfere WITH standard operations IF a user attempts TO issue them.
  500. *)
  501.     NEW(RootThread);
  502.  
  503.     RootThread^.th_Name      := "MAIN";
  504.     RootThread^.Version      := 1;
  505.     RootThread^.th_Task      := execBase^.thisTask;
  506.     RootThread^.InStream     := NIL;
  507.     RootThread^.OutStream    := NIL;
  508.     RootThread^.Parent       := NIL;
  509.     RootThread^.Flags        := CAST(th_flags,0);
  510.     RootThread^.th_Proc      := NIL;
  511.     RootThread^.BackPtr      := ADR(RootThread);
  512.  
  513.     Forbid; (* freeze everything WHILE we mess WITH the task structure *)
  514.     execBase^.thisTask^.userData      := CAST(ADDRESS,RootThread);
  515.     Permit; (* quickly resume multitasking *)
  516.  
  517.     (* We now have a structure we can use *)
  518. (*e*)
  519.  
  520. CLOSE
  521. (*f*)
  522.     IF RootThread #NIL THEN DISPOSE(RootThread) END;
  523. (*e*)
  524.  
  525. END Threads.
  526.  
  527.