home *** CD-ROM | disk | FTP | other *** search
/ Complete Linux / Complete Linux.iso / docs / apps / database / postgres / postgre4.z / postgre4 / src / commands / async.c next >
Encoding:
C/C++ Source or Header  |  1992-08-27  |  12.6 KB  |  511 lines

  1. /* ----------------------------------------------------------------
  2.  *   FILE
  3.  *    async.c
  4.  *    
  5.  *   DESCRIPTION
  6.  *    Asynchronous notification
  7.  *
  8.  *   INTERFACE ROUTINES
  9.  *      void Async_Notify(char *relname);
  10.  *      void Async_Listen(char *relname,int pid);
  11.  *      void Async_Unlisten(char *relname, int pid);
  12.  *
  13.  *   NOTES
  14.  *    
  15.  *   IDENTIFICATION
  16.  *    $Header: /private/postgres/src/commands/RCS/async.c,v 1.4 1992/08/26 21:08:55 mer Exp $
  17.  * ----------------------------------------------------------------
  18.  */
  19. /*
  20.  * Model is:
  21.  * 1. Multiple backends on same machine.
  22.  *
  23.  * 2. Query on one backend sends stuff over an asynchronous portal by 
  24.  *    appending to a relation, and then doing an async. notification
  25.  *    (which takes place after commit) to all listeners on this relation.
  26.  *
  27.  * 3. Async. notification results in all backends listening on relation 
  28.  *    to be woken up, by a process signal kill(2), with name of relation
  29.  *    passed in shared memory.
  30.  *
  31.  * 4. Each backend notifies its respective frontend over the comm
  32.  *    channel using the out-of-band channel.
  33.  *
  34.  * 5. Each frontend receives this notification and processes accordingly.
  35.  *
  36.  * #4,#5 are changing soon with pending rewrite of portal/protocol.
  37.  */
  38.  
  39. #include <strings.h>
  40. #include <signal.h>
  41. #include <errno.h>
  42. #include "tmp/postgres.h"
  43.  
  44. RcsId("$Header: /private/postgres/src/commands/RCS/async.c,v 1.4 1992/08/26 21:08:55 mer Exp $");
  45.  
  46. /* ----------------
  47.  *    FILE INCLUDE ORDER GUIDELINES
  48.  *
  49.  *    1) postgres.h
  50.  *    2) various support files ("everything else")
  51.  *    3) node files
  52.  *    4) catalog/ files
  53.  *    5) execdefs.h and execmisc.h, if necessary.
  54.  *    6) extern files come last.
  55.  * ----------------
  56.  */
  57. #include "access/attnum.h"
  58. #include "access/ftup.h"
  59. #include "access/heapam.h"
  60. #include "access/htup.h"
  61. #include "access/relscan.h"
  62. #include "access/skey.h"
  63. #include "access/tqual.h"
  64.  
  65. #include "commands/copy.h"
  66. #include "storage/buf.h"
  67. #include "storage/itemptr.h"
  68. #include "tmp/miscadmin.h"
  69. #include "tmp/portal.h"
  70. #include "utils/excid.h"
  71. #include "utils/log.h"
  72. #include "utils/mcxt.h"
  73. #include "utils/palloc.h"
  74. #include "utils/rel.h"
  75.  
  76. #include "nodes/pg_lisp.h"
  77. #include "tcop/dest.h"
  78. #include "commands/command.h"
  79.  
  80. #include "catalog/catname.h"
  81. #include "catalog/syscache.h"
  82. #include "catalog/pg_attribute.h"
  83. #include "catalog/pg_proc.h"
  84. #include "catalog/pg_relation.h"
  85. #include "catalog/pg_type.h"
  86. #include "catalog/pg_listener.h"
  87.  
  88. #include "executor/execdefs.h"
  89. #include "executor/execdesc.h"
  90.  
  91. #include "tmp/simplelists.h"
  92.  
  93. typedef struct {
  94.     char relname[16];
  95.     SLNode Node;
  96. } PendingNotify;
  97.  
  98. static SLList pendingNotifies;
  99. static int initialized = 0;    /* statics in this module initialized? */
  100.  
  101. /*
  102.  *--------------------------------------------------------------
  103.  * Async_Notify --
  104.  *
  105.  *      Adds the relation to the list of pending notifies.
  106.  *      All notification happens at end of commit.
  107.  *      
  108.  *
  109.  * Results:
  110.  *      XXX
  111.  *
  112.  * Side effects:
  113.  *      All tuples for relname in pg_listener are updated.
  114.  *
  115.  *--------------------------------------------------------------
  116.  */
  117. void
  118. Async_Notify(relname)
  119.      char *relname;
  120. {
  121.     PendingNotify *p;
  122.     HeapTuple lTuple;
  123.     struct listener *lStruct;
  124.     Relation lRel;
  125.     HeapScanDesc sRel;
  126.     TupleDescriptor tdesc;
  127.     Buffer b;
  128.     Datum d;
  129.     int notifypending, isnull;
  130.  
  131.     if (! initialized) {
  132.     SLNewList(&pendingNotifies,offsetof(PendingNotify,Node));
  133.     initialized = 1;
  134.     }
  135.  
  136.     elog(NOTICE,"Async_Notify: %s",relname);
  137.     lRel = heap_openr("pg_listener");
  138.     tdesc = RelationGetTupleDescriptor(lRel);
  139.     sRel = heap_beginscan(lRel,0,NowTimeQual,0,(ScanKey)NULL);
  140.     while (HeapTupleIsValid(lTuple = heap_getnext(sRel,0,&b))) {
  141.     d = (Datum) heap_getattr(lTuple,b,Anum_pg_listener_relname,tdesc,
  142.                  &isnull);
  143.     if (! strcmp(relname,DatumGetPointer(d))) {
  144.         d = (Datum) heap_getattr(lTuple,b,Anum_pg_listener_notify,tdesc,
  145.                      &isnull);
  146.         notifypending = (int)DatumGetPointer(d);
  147.         if (! notifypending) {
  148.         ItemPointerData oldTID;
  149.         oldTID = lTuple->t_ctid;
  150.         ((struct listener *)GETSTRUCT(lTuple))->notification = 1;
  151.         heap_replace(lRel,&oldTID,lTuple);
  152.         }
  153.     }
  154.     ReleaseBuffer(b);
  155.     }
  156.     heap_endscan(sRel);
  157.     heap_close(lRel);
  158. }
  159.  
  160. #if 0
  161. static int
  162. AsyncExistsPendingNotify(relname)
  163.      char *relname;
  164. {
  165.     for (p = (PendingNotify *)SLGetHead(&pendingNotifies); p != NULL;
  166.      p = (PendingNotify *)SLGetSucc(&p->Node)) {
  167.     if (!strcmp(p->relname,relname)) {
  168.         return 1;
  169.     }
  170.     }
  171.     return 0;
  172. }
  173. #endif
  174.  
  175. /*
  176.  *--------------------------------------------------------------
  177.  * Async_NotifyAtCommit --
  178.  *
  179.  *      Signal all backends listening on relations pending notification.
  180.  *
  181.  *      This corresponds to the 'notify <relation>' command in 
  182.  *      postquel.
  183.  *
  184.  *   XXX: what if we signal ourselves?
  185.  *
  186.  * Results:
  187.  *      XXX
  188.  *
  189.  * Side effects:
  190.  *      XXX
  191.  *
  192.  *--------------------------------------------------------------
  193.  */
  194. void Async_NotifyAtCommit()
  195. {
  196.     char *relname;
  197.     Relation r;
  198.     TupleDescriptor tdesc;
  199.     HeapScanDesc s;
  200.     HeapTuple htup;
  201.     Buffer b;
  202.     Datum d;
  203.     char *relnamei;
  204.     int pid;
  205.     int isnull;
  206.     int notifypending;
  207.     bool didNotify;
  208.     static int reentrant;    /* hack:
  209.                    This is a transaction itself, so we
  210.                    don't want to loop at commit time
  211.                    processing */
  212.     /*
  213.      * XXX Turn off notify for 4.0.1.  Late discovery of implementation flaws.
  214.      */
  215.     return;
  216. #if 0
  217.     if (reentrant)
  218.       return;
  219.     reentrant = 1;
  220.     CommandCounterIncrement();
  221.  
  222.     if (! initialized) {
  223.     SLNewList(&pendingNotifies,offsetof(PendingNotify,Node));
  224.     initialized = 1;
  225.     }
  226.  
  227.     r = heap_openr("pg_listener");
  228.     tdesc = RelationGetTupleDescriptor(r);
  229.     s = heap_beginscan(r,0,NowTimeQual,0,(ScanKey)NULL);
  230.  
  231.     htup = heap_getnext(s,0,&b);
  232.     if (HeapTupleIsValid(htup)) {
  233.     didNotify = true;
  234.     StartTransactionCommand();
  235.     }
  236.     else
  237.     didNotify = false;
  238.  
  239.     while (HeapTupleIsValid(htup)) {
  240.     d = (Datum) heap_getattr(htup,b,Anum_pg_listener_notify,tdesc,
  241.                  &isnull);
  242.     notifypending = (int)DatumGetPointer(d);
  243.     if (notifypending) {
  244.         d = (Datum) heap_getattr(htup,b,Anum_pg_listener_pid,tdesc,&isnull);
  245.         pid = (int) DatumGetPointer(d);
  246.         if (kill (pid,SIGUSR2) < 0) {
  247.         extern int errno;
  248.         if (errno == ESRCH) { /* no such process */
  249.             heap_delete(r,&htup->t_ctid);
  250.         }
  251.         }
  252.     }
  253.     ReleaseBuffer(b);
  254.     htup = heap_getnext(s,0,&b);
  255.     }
  256.     heap_endscan(s);
  257.     heap_close(r);
  258.     if (didNotify)
  259.     CommitTransactionCommand();
  260.     reentrant = 0;
  261. #endif
  262. }
  263.  
  264. #if 0
  265. /*
  266.  *--------------------------------------------------------------
  267.  * Async_NotifyAtAbort --
  268.  *
  269.  *      Gets rid of pending notifies.  List elements are automatically
  270.  *      freed through memory context.
  271.  *      
  272.  *
  273.  * Results:
  274.  *      XXX
  275.  *
  276.  * Side effects:
  277.  *      XXX
  278.  *
  279.  *--------------------------------------------------------------
  280.  */
  281. void
  282. Async_NotifyAtAbort()
  283. {
  284.     SLNewList(&pendingNotifies,offsetof(PendingNotify,Node));
  285.     intialized = 1;
  286. }
  287. #endif
  288.  
  289. /*
  290.  *--------------------------------------------------------------
  291.  * Async_Listen --
  292.  *
  293.  *      Register a backend (identified by its Unix PID) as listening
  294.  *      on the specified relation.  
  295.  *
  296.  *      This corresponds to the 'listen <relation>' command in 
  297.  *      postquel.
  298.  *
  299.  *      One listener per relation, pg_listener relation is keyed
  300.  *      on (relname,pid) to provide multiple listeners in future.
  301.  *
  302.  * Results:
  303.  *      pg_listeners is updated.
  304.  *
  305.  * Side effects:
  306.  *      XXX
  307.  *
  308.  *--------------------------------------------------------------
  309.  */
  310. void Async_Listen(relname, pid)
  311.      char *relname;
  312.      int pid;
  313. {
  314.     Datum values[Natts_pg_listener];
  315.     char nulls[Natts_pg_listener];
  316.     TupleDescriptor tdesc;
  317.     HeapScanDesc s;
  318.     HeapTuple htup,tup;
  319.     Relation lDesc;
  320.     Buffer b;
  321.     Datum d;
  322.     int i, isnull;
  323.     int alreadyListener = 0;
  324.     int ourPid = getpid();
  325.     char *relnamei;
  326.  
  327.     elog(NOTICE,"Async_Listen: %s",relname);
  328.     for (i = 0 ; i < Natts_pg_listener; i++) {
  329.         nulls[i] = ' ';
  330.         values[i] = NULL;
  331.     }
  332.     
  333.     i = 0;
  334.     values[i++] = (Datum) relname;
  335.     values[i++] = (Datum) pid;
  336.     values[i++] = (Datum) 0;    /* no notifies pending */
  337.  
  338.     lDesc = heap_openr(Name_pg_listener);
  339.  
  340.     /* is someone already listening.  One listener per relation */
  341.     tdesc = RelationGetTupleDescriptor(lDesc);
  342.     s = heap_beginscan(lDesc,0,NowTimeQual,0,(ScanKey)NULL);
  343.     while (HeapTupleIsValid(htup = heap_getnext(s,0,&b))) {
  344.     d = (Datum) heap_getattr(htup,b,Anum_pg_listener_relname,tdesc,
  345.                  &isnull);
  346.     relnamei = DatumGetPointer(d);
  347.     if (!strcmp(relnamei,relname)) {
  348.         d = (Datum) heap_getattr(htup,b,Anum_pg_listener_pid,tdesc,&isnull);
  349.         pid = (int) DatumGetPointer(d);
  350.         if (pid != ourPid) {
  351.         alreadyListener = 1;
  352.         break;
  353.         }
  354.     }
  355.     ReleaseBuffer(b);
  356.     }
  357.     heap_endscan(s);
  358.  
  359.     tup = heap_formtuple(Natts_pg_listener,
  360.              &lDesc->rd_att,
  361.              values,
  362.              nulls);
  363.     heap_insert(lDesc,tup,(double *)NULL);
  364.     
  365.     pfree((Pointer)tup);
  366.     if (alreadyListener) {
  367.     elog(NOTICE,"Async_Listen: already one listener on %s (possibly dead)",relname);
  368.     }
  369.     heap_close(lDesc);
  370.    
  371. }
  372.  
  373. /*
  374.  *--------------------------------------------------------------
  375.  * Async_Unlisten --
  376.  *
  377.  *      Remove the backend from the list of listening backends
  378.  *      for the specified relation.
  379.  *    
  380.  *      This would correspond to the 'unlisten <relation>' 
  381.  *      command in postquel, but there isn't one yet.
  382.  *
  383.  * Results:
  384.  *      pg_listeners is updated.
  385.  *
  386.  * Side effects:
  387.  *      XXX
  388.  *
  389.  *--------------------------------------------------------------
  390.  */
  391. void Async_Unlisten(relname,pid)
  392.      char *relname;
  393.      int pid;
  394. {
  395.     Relation lDesc;
  396.     HeapTuple lTuple;
  397.     lTuple = SearchSysCacheTuple(LISTENREL,relname,pid);
  398.     lDesc = heap_openr(Name_pg_listener);
  399.     if (lTuple != NULL) {
  400.     heap_delete(lDesc,&lTuple->t_ctid);
  401.     }
  402.     heap_close(lDesc);
  403. }
  404.  
  405. /*
  406.  * --------------------------------------------------------------
  407.  * Async_NotifyFrontEnd --
  408.  *
  409.  *      Perform an asynchronous notification to front end over
  410.  *      portal comm channel.  The name of the relation which contains the
  411.  *      data is sent to the front end.
  412.  *
  413.  *      We remove the notification flag from the pg_listener tuple
  414.  *      associated with our process.
  415.  *
  416.  * Results:
  417.  *      XXX
  418.  *
  419.  * Side effects:
  420.  *      NB: This is the signal handler for SIGUSR2.  We could have been
  421.  *      in the middle of dumping portal data.  
  422.  *
  423.  *      We make use of the out-of-band channel to transmit the
  424.  *      notification to the front end.  The actual data transfer takes
  425.  *      place at the front end's request.
  426.  *
  427.  * --------------------------------------------------------------
  428.  */
  429. GlobalMemory notifyContext = NULL;
  430.  
  431. void Async_NotifyFrontEnd()
  432. {
  433.     char *relname;
  434.     extern whereToSendOutput;
  435.     Relation r;
  436.     HeapScanDesc s;
  437.     TupleDescriptor tdesc;
  438.     Datum d;
  439.     Buffer b;
  440.     HeapTuple htup;
  441.     int isnull, notifypending, pid;
  442.     char msg[1025];
  443.     int ourpid = getpid();
  444.  
  445.     bzero(msg,sizeof(msg));
  446.     
  447.     if (whereToSendOutput != Remote) {
  448.     elog(NOTICE,"Async_NotifyPortal: no asynchronous notifcation on interactive sessions");
  449.     return;
  450.     }
  451.  
  452.     /* Sorry, this code is mix-n-match styles of using getstruct and
  453.      * heap_getattr.
  454.      */
  455.     StartTransactionCommand();
  456.     {
  457.     /* debugging */
  458.     FILE *f;
  459.     f = fopen("/dev/ttyp6","w");
  460.     fprintf(f,"Got signal\n",msg);
  461.  
  462.     r = heap_openr("pg_listener");
  463.     tdesc = RelationGetTupleDescriptor(r);
  464.     s = heap_beginscan(r,0,NowTimeQual,0,(ScanKey)NULL);
  465.     while (HeapTupleIsValid(htup = heap_getnext(s,0,&b))) {
  466.     d = (Datum) heap_getattr(htup,b,Anum_pg_listener_notify,tdesc,
  467.                  &isnull);
  468.     notifypending = (int)DatumGetPointer(d);
  469.     if (notifypending) {
  470.         d = (Datum) heap_getattr(htup,b,Anum_pg_listener_pid,tdesc,&isnull);
  471.         pid = (int) DatumGetPointer(d);
  472.         if (pid == ourpid) {
  473.         ItemPointerData oldTID;
  474.         d = (Datum) heap_getattr(htup,b,Anum_pg_listener_relname,tdesc,&isnull);
  475.         relname = DatumGetPointer(d);
  476.         oldTID = htup->t_ctid;
  477.         /* unset notify flag */
  478.         ((struct listener *)GETSTRUCT(htup))->notification = 0;
  479.         heap_replace(r,&oldTID,htup);
  480.  
  481.         /* notify front end of presence,
  482.            but not any more detail */
  483.         sprintf(msg,"A%s %d",relname,pid);
  484.         /* debugging */
  485.         fprintf(f,"%s\n",msg);
  486.         if (pq_sendoob("A",1)<0) {
  487.             extern int errno;
  488.             fprintf(f,"pq_sendoob failed: errno=%d",errno);
  489.         }
  490.         /* call backend PQ lib -- different memory context */
  491.         {
  492.             MemoryContext orig;
  493.             if (notifyContext == NULL) {
  494.             notifyContext = CreateGlobalMemory("notify");
  495.             }
  496.             orig = MemoryContextSwitchTo((MemoryContext)notifyContext);
  497.             PQappendNotify(relname,pid);
  498.             (void) MemoryContextSwitchTo(orig);
  499.         }
  500.         }
  501.     }
  502.     ReleaseBuffer(b);
  503.     }
  504.     fclose(f);
  505.     }
  506.     heap_endscan(s);
  507.     heap_close(r);
  508.     CommitTransactionCommand();
  509.  
  510. }
  511.