home *** CD-ROM | disk | FTP | other *** search
/ Complete Linux / Complete Linux.iso / docs / apps / database / postgres / postgre4.z / postgre4 / src / executor / n_hashjoin.c < prev    next >
Encoding:
C/C++ Source or Header  |  1992-08-27  |  28.7 KB  |  1,046 lines

  1. /* ----------------------------------------------------------------
  2.  *   FILE
  3.  *    hashjoin.c
  4.  *    
  5.  *   DESCRIPTION
  6.  *    Routines to handle hash join nodes
  7.  *
  8.  *   INTERFACE ROUTINES
  9.  *         ExecHashJoin
  10.  *         ExecInitHashJoin
  11.  *         ExecEndHashJoin
  12.  *
  13.  *   IDENTIFICATION
  14.  *    $Header: /private/postgres/src/executor/RCS/n_hashjoin.c,v 1.13 1992/08/04 17:37:50 mer Exp $
  15.  * ----------------------------------------------------------------
  16.  */
  17.  
  18. #include <sys/file.h>
  19. #include "storage/bufmgr.h"    /* for BLCKSZ */
  20. #include "tcop/slaves.h"
  21. #include "executor/executor.h"
  22. #include "planner/clauses.h"
  23.  
  24.  RcsId("$Header: /private/postgres/src/executor/RCS/n_hashjoin.c,v 1.13 1992/08/04 17:37:50 mer Exp $");
  25.  
  26. /* ----------------------------------------------------------------
  27.  *       ExecHashJoin
  28.  *
  29.  *    This function implements the Hybrid Hashjoin algorithm.
  30.  *    recursive partitioning remains to be added.
  31.  *    Note: the relation we build hash table on is the inner
  32.  *          the other one is outer.
  33.  * ----------------------------------------------------------------
  34.  */
  35.  
  36. /**** xxref:
  37.  *           ExecProcNode
  38.  ****/
  39. TupleTableSlot                /* return: a tuple or LispNil */
  40. ExecHashJoin(node)
  41.     HashJoin node;            /* the hash join node */
  42. {
  43.     HashJoinState    hjstate;
  44.     HashState        hashstate;
  45.     EState        estate;
  46.     Plan           outerNode;
  47.     Plan        hashNode;
  48.     List        hjclauses;
  49.     List        clause;
  50.     List        qual;
  51.     ScanDirection     dir;
  52.     TupleTableSlot    inntuple;
  53.     Var            outerVar;
  54.     ExprContext        econtext;
  55.  
  56.     HashJoinTable    hashtable;
  57.     int            bucketno;
  58.     HashBucket        bucket;
  59.     HeapTuple        curtuple;
  60.  
  61.     bool        qualResult;
  62.  
  63.     TupleDescriptor    tupType;
  64.     Pointer        tupValue;
  65.     List        targetList;
  66.     int            len;
  67.     
  68.     TupleTableSlot    outerTupleSlot;
  69.     TupleTableSlot    innerTupleSlot;
  70.     int            nbatch;
  71.     int            curbatch;
  72.     File        *outerbatches;
  73.     RelativeAddr    *outerbatchNames;
  74.     File        *innerbatches;
  75.     RelativeAddr    *innerbatchNames;
  76.     RelativeAddr    *outerbatchPos;
  77.     Var            innerhashkey;
  78.     int            batch;
  79.     int            batchno;
  80.     char        *buffer;
  81.     int            i;
  82.     char        *tempname;
  83.     bool        hashPhaseDone;
  84.     char        *pos;
  85. #ifdef sequent
  86.     slock_t        *batchLock;
  87. #endif
  88.  
  89.     /* ----------------
  90.      *    get information from HashJoin node
  91.      * ----------------
  92.      */
  93.     hjstate =       get_hashjoinstate(node);
  94.     hjclauses =     get_hashclauses(node);
  95.     clause =        CAR(hjclauses);
  96.     estate =         (EState)get_state((Plan)node);
  97.     qual =         get_qpqual((Plan) node);
  98.     hashNode =         get_innerPlan((Plan) node);
  99.     outerNode =     get_outerPlan((Plan) node);
  100.     hashPhaseDone =     get_hashdone(node);
  101.  
  102.     dir =             get_es_direction(estate);
  103.  
  104.     /* -----------------
  105.      * get information from HashJoin state
  106.      * -----------------
  107.      */
  108.     hashtable =     get_hj_HashTable(hjstate);
  109.     bucket =         get_hj_CurBucket(hjstate);
  110.     curtuple =        get_hj_CurTuple(hjstate);
  111.     
  112.     /* --------------------
  113.      * initialize expression context
  114.      * --------------------
  115.      */
  116.     econtext =         get_cs_ExprContext((CommonState) hjstate);
  117.  
  118.     if (get_cs_TupFromTlist((CommonState)hjstate)) {
  119.     TupleTableSlot  result;
  120.     bool        isDone;
  121.  
  122.     result = ExecProject(get_cs_ProjInfo((CommonState)hjstate), &isDone);
  123.     if (!isDone)
  124.         return result;
  125.     }
  126.     /* ----------------
  127.      *    if this is the first call, build the hash table for inner relation
  128.      * ----------------
  129.      */
  130.     if (!hashPhaseDone) {  /* if the hash phase not completed */
  131.     hashtable = get_hashjointable(node);
  132.         if (hashtable == NULL) { /* if the hash table has not been created */
  133.         /* ----------------
  134.          * create the hash table
  135.          * ----------------
  136.          */
  137.         hashtable = ExecHashTableCreate(hashNode);
  138.         set_hj_HashTable(hjstate, hashtable);
  139.         innerhashkey = get_hashkey((Hash) hashNode);
  140.         set_hj_InnerHashKey(hjstate, innerhashkey);
  141.  
  142.         /* ----------------
  143.          * execute the Hash node, to build the hash table 
  144.          * ----------------
  145.          */
  146.         set_hashtable((Hash) hashNode, hashtable);
  147.         innerTupleSlot = ExecProcNode(hashNode);
  148.     }
  149.     bucket = NULL;
  150.     curtuple = NULL;
  151.     curbatch = 0;
  152.     set_hashdone(node, true);
  153.     }
  154.     else if (hashtable == NULL && !IsMaster && ParallelExecutorEnabled()) {
  155.     IpcMemoryId  shmid;
  156.     IpcMemoryKey hashjointablekey;
  157.     int          hashjointablesize;
  158.  
  159.     hashjointablekey = get_hashjointablekey(node);
  160.     hashjointablesize = get_hashjointablesize(node);
  161.     /* ----------------
  162.      *      in Sequent version, shared memory is implemented by
  163.      *  memory mapped files, it takes one file descriptor.
  164.      *  we may have to free one for this.
  165.      * ----------------
  166.      */
  167.     closeOneVfd();
  168.     shmid = IpcMemoryCreateWithoutOnExit(hashjointablekey,
  169.                         hashjointablesize,
  170.                         HASH_PERMISSION);
  171.     hashtable = (HashJoinTable) IpcMemoryAttach(shmid);
  172.     set_hashjointable(node, hashtable);
  173.     set_hj_HashTable(hjstate, hashtable);
  174.     set_hj_HashTableShmId(hjstate, shmid);
  175.     }
  176.     nbatch = hashtable->nbatch;
  177.     outerbatches = get_hj_OuterBatches(hjstate);
  178.     if (nbatch > 0 && outerbatches == NULL) {  /* if needs hash partition */
  179.     /* -----------------
  180.      *  allocate space for file descriptors of outer batch files
  181.      *  then open the batch files in the current process
  182.      * -----------------
  183.      */
  184.     innerhashkey = get_hashkey((Hash) hashNode);
  185.     set_hj_InnerHashKey(hjstate, innerhashkey);
  186.         outerbatchNames = (RelativeAddr*)
  187.         ABSADDR(hashtable->outerbatchNames);
  188.     outerbatches = (File*)
  189.         palloc(nbatch * sizeof(File));
  190.     for (i=0; i<nbatch; i++) {
  191.         outerbatches[i] = FileNameOpenFile(
  192.                   ABSADDR(outerbatchNames[i]), 
  193.                   O_CREAT | O_RDWR, 0600);
  194.     }
  195.     set_hj_OuterBatches(hjstate, outerbatches);
  196.     if (ParallelExecutorEnabled()) {
  197.             innerbatchNames = (RelativeAddr*)
  198.                        ABSADDR(hashtable->innerbatchNames);
  199.         innerbatches = (File*)palloc(nbatch * sizeof(File));
  200.         for (i=0; i<nbatch; i++) {
  201.         innerbatches[i] = FileNameOpenFile(
  202.                       ABSADDR(innerbatchNames[i]),
  203.                       O_CREAT | O_RDWR, 0600);
  204.         }
  205.         set_hj_InnerBatches(hjstate, innerbatches);
  206.     }
  207.     else {
  208.         /* ------------------
  209.          *  get the inner batch file descriptors from the
  210.          *  hash node
  211.          * ------------------
  212.          */
  213.         set_hj_InnerBatches(hjstate, 
  214.                 get_hashBatches(get_hashstate((Hash)
  215.                                   hashNode)));
  216.     }
  217.     }
  218.     outerbatchPos = (RelativeAddr*)ABSADDR(hashtable->outerbatchPos);
  219.     curbatch = hashtable->curbatch;
  220. #ifdef sequent
  221.     batchLock = (slock_t*)ABSADDR(hashtable->batchLock);
  222. #endif
  223.     outerbatchNames = (RelativeAddr*)ABSADDR(hashtable->outerbatchNames);
  224.     
  225.     /* ----------------
  226.      *    Now get an outer tuple and probe into the hash table for matches
  227.      * ----------------
  228.      */
  229.     outerTupleSlot =     get_cs_OuterTupleSlot((CommonState) hjstate);
  230.     outerVar =       get_leftop(clause);
  231.     
  232.     bucketno = -1;  /* if bucketno remains -1, means use old outer tuple */
  233.     if (TupIsNull((Pointer) outerTupleSlot)) {
  234.     /*
  235.      * if the current outer tuple is nil, get a new one
  236.      */
  237.     outerTupleSlot = (TupleTableSlot)
  238.         ExecHashJoinOuterGetTuple(outerNode, hjstate);
  239.     
  240.     while (curbatch <= nbatch && TupIsNull((Pointer) outerTupleSlot)) {
  241.     /*
  242.      * if the current batch runs out, switch to new batch
  243.      */
  244.         curbatch = ExecHashJoinNewBatch(hjstate);
  245.         if (curbatch > nbatch) {
  246.         /*
  247.          * when the last batch runs out, clean up
  248.          */
  249. #ifdef sequent
  250.         /* ---------------
  251.          *  we want to make sure that only the last process does
  252.          *  the cleanup.
  253.          * ---------------
  254.          */
  255.         if (ParallelExecutorEnabled()) {
  256.             S_LOCK(&(hashtable->tableLock));
  257.             if (--(hashtable->pcount) > 0) {
  258.                S_UNLOCK(&(hashtable->tableLock));
  259.                return NULL;
  260.               }
  261.             S_UNLOCK(&(hashtable->tableLock));
  262.           }
  263. #endif
  264.         if (!IsMaster && ParallelExecutorEnabled()) {
  265.             /* ----------------
  266.              *  set the shmid to the one of the current process
  267.              * ----------------
  268.              */
  269.             hashtable->shmid = get_hj_HashTableShmId(hjstate);
  270.            }
  271.         ExecHashTableDestroy(hashtable);
  272.         set_hj_HashTable(hjstate, NULL);
  273.         return NULL;
  274.           }
  275.         else
  276.           outerTupleSlot = (TupleTableSlot)
  277.           ExecHashJoinOuterGetTuple(outerNode, hjstate);
  278.       }
  279.     /*
  280.      * now we get an outer tuple, find the corresponding bucket for
  281.      * this tuple from the hash table
  282.      */
  283.     set_ecxt_outertuple(econtext, outerTupleSlot);
  284.     
  285. #ifdef HJDEBUG
  286.     printf("Probing ");
  287. #endif
  288.     bucketno = ExecHashGetBucket(hashtable, econtext, outerVar);
  289.     bucket=(HashBucket)(ABSADDR(hashtable->top) 
  290.                 + bucketno * hashtable->bucketsize);
  291.     }
  292.     
  293.     for (;;) {
  294.     /* ----------------
  295.      *    Now we've got an outer tuple and the corresponding hash bucket,
  296.      *  but this tuple may not belong to the current batch.
  297.      * ----------------
  298.      */
  299.     if (curbatch == 0 && bucketno != -1)  /* if this is the first pass */
  300.        batch = ExecHashJoinGetBatch(bucketno, hashtable, nbatch);
  301.     else
  302.        batch = 0;
  303.     if (batch > 0) {
  304.          /*
  305.           * if the current outer tuple does not belong to
  306.           * the current batch, save to the tmp file for
  307.           * the corresponding batch.
  308.           */
  309.          buffer = ABSADDR(hashtable->batch) + (batch - 1) * BLCKSZ;
  310.          batchno = batch - 1;
  311. #ifdef sequent
  312.          /* ---------------
  313.           *  lock the batch to write
  314.           * ---------------
  315.           */
  316.          if (ParallelExecutorEnabled())
  317.          S_LOCK(&(batchLock[batchno]));
  318. #endif
  319.          pos  = ExecHashJoinSaveTuple((HeapTuple)
  320.                       SlotContents(outerTupleSlot), 
  321.                       buffer,
  322.                            outerbatches[batchno],
  323.                       ABSADDR(outerbatchPos[batchno]));
  324.          
  325.          outerbatchPos[batchno] = RELADDR(pos);
  326. #ifdef sequent
  327.          /* ---------------
  328.           *  unlock the batch to write
  329.           * ---------------
  330.           */
  331.          if (ParallelExecutorEnabled())
  332.          S_UNLOCK(&(batchLock[batchno]));
  333. #endif
  334.       }
  335.     else if (bucket != NULL) {
  336.         do {
  337.         /*
  338.          * scan the hash bucket for matches
  339.          */
  340.         curtuple = ExecScanHashBucket(hjstate,
  341.                           bucket,
  342.                           curtuple,
  343.                           hjclauses,
  344.                           econtext);
  345.         
  346.         if (curtuple != NULL) {
  347.             /*
  348.              * we've got a match, but still need to test qpqual
  349.              */
  350.                     inntuple = (TupleTableSlot)
  351.             ExecStoreTuple((Pointer)curtuple, 
  352.                                        (Pointer) get_hj_HashTupleSlot(hjstate),
  353.                        InvalidBuffer,
  354.                                        false); /* don't pfree this tuple */
  355.             
  356.             set_ecxt_innertuple(econtext, inntuple);
  357.  
  358.             /* ----------------
  359.              * test to see if we pass the qualification
  360.              * ----------------
  361.              */
  362.             qualResult = ExecQual(qual, econtext);
  363.             
  364.             /* ----------------
  365.              * if we pass the qual, then save state for next call and
  366.              * have ExecProject form the projection, store it
  367.              * in the tuple table, and return the slot.
  368.              * ----------------
  369.              */
  370.             if (qualResult) {
  371.             ProjectionInfo    projInfo;
  372.             TupleTableSlot  result;
  373.             bool            isDone;
  374.             
  375.             set_hj_CurBucket(hjstate, bucket);
  376.             set_hj_CurTuple(hjstate, curtuple);
  377.             hashtable->curbatch = curbatch;
  378.             set_cs_OuterTupleSlot((CommonState)
  379.                           hjstate, outerTupleSlot);
  380.             
  381.             projInfo = get_cs_ProjInfo((CommonState) hjstate);
  382.             result = ExecProject(projInfo, &isDone);
  383.             set_cs_TupFromTlist((CommonState)hjstate, !isDone);
  384.             return result;
  385.             }
  386.         }
  387.         }
  388.         while (curtuple != NULL);
  389.     }
  390.  
  391.     /* ----------------
  392.      *   Now the current outer tuple has run out of matches,
  393.      *   so we free it and get a new outer tuple.
  394.      * ----------------
  395.      */
  396.     outerTupleSlot = (TupleTableSlot)
  397.         ExecHashJoinOuterGetTuple(outerNode, hjstate);
  398.     
  399.     while (curbatch <= nbatch && TupIsNull((Pointer) outerTupleSlot)) {
  400.     /*
  401.      * if the current batch runs out, switch to new batch
  402.      */
  403.        curbatch = ExecHashJoinNewBatch(hjstate);
  404.        if (curbatch > nbatch) {
  405.         /*
  406.          * when the last batch runs out, clean up
  407.          */
  408. #ifdef sequent
  409.         /* ---------------
  410.          *  we want to make sure that only the last process does
  411.          *  the cleanup.
  412.          * ---------------
  413.          */
  414.         if (ParallelExecutorEnabled()) {
  415.             S_LOCK(&(hashtable->tableLock));
  416.             if (--(hashtable->pcount) > 0) {
  417.                S_UNLOCK(&(hashtable->tableLock));
  418.                return NULL;
  419.               }
  420.             S_UNLOCK(&(hashtable->tableLock));
  421.           }
  422. #endif
  423.         if (!IsMaster && ParallelExecutorEnabled()) {
  424.             /* ----------------
  425.              *  set the shmid to the one of the current process
  426.              * ----------------
  427.              */
  428.             hashtable->shmid = get_hj_HashTableShmId(hjstate);
  429.            }
  430.         ExecHashTableDestroy(hashtable);
  431.         set_hj_HashTable(hjstate, NULL);
  432.         return NULL;
  433.          }
  434.        else
  435.           outerTupleSlot = (TupleTableSlot)
  436.           ExecHashJoinOuterGetTuple(outerNode, hjstate);
  437.       }
  438.     
  439.     /* ----------------
  440.      *   Now get the corresponding hash bucket for the new
  441.      *   outer tuple.
  442.      * ----------------
  443.      */
  444.     set_ecxt_outertuple(econtext, outerTupleSlot);
  445. #ifdef HJDEBUG
  446.     printf("Probing ");
  447. #endif
  448.     bucketno = ExecHashGetBucket(hashtable, econtext, outerVar);
  449.     bucket=(HashBucket)(ABSADDR(hashtable->top) 
  450.                 + bucketno * hashtable->bucketsize);
  451.     curtuple = NULL;
  452.     }
  453. }
  454.  
  455. /* ----------------------------------------------------------------
  456.  *       ExecInitHashJoin
  457.  *
  458.  *    Init routine for HashJoin node.
  459.  * ----------------------------------------------------------------
  460.  */
  461. /**** xxref:
  462.  *           ExecInitNode
  463.  ****/
  464. List    /* return: initialization status */
  465. ExecInitHashJoin(node, estate, parent)
  466.     HashJoin     node;
  467.     EState     estate;
  468.     Plan    parent;
  469. {
  470.     HashJoinState    hjstate;
  471.     List        targetList;
  472.     int            len;
  473.     TupleDescriptor    tupType;
  474.     Pointer            tupValue;
  475.     ParamListInfo       paraminfo;
  476.     ExprContext            econtext;
  477.     int            baseid;
  478.     
  479.     Plan           outerNode;
  480.     Plan        hashNode;
  481.     
  482.     /* ----------------
  483.      *  assign the node's execution state
  484.      * ----------------
  485.      */
  486.     set_state((Plan)node, (EStatePtr) estate);
  487.     
  488.     /* ----------------
  489.      * create state structure
  490.      * ----------------
  491.      */
  492.     hjstate =
  493.     MakeHashJoinState((HashJoinTable)NULL,    /* this is a little silly */
  494.               (IpcMemoryId)0,
  495.               (HashBucket )NULL,
  496.               (HeapTuple )NULL,
  497.               (OverflowTuple )NULL,
  498.               (Var)LispNil,
  499.               (FileP)NULL,
  500.               (FileP)NULL,
  501.               (charP)NULL,
  502.               (int)0,
  503.               (Pointer)NULL,
  504.               (Pointer)NULL);
  505.  
  506.     set_hashjoinstate(node, hjstate);
  507.         
  508.     /* ----------------
  509.      *  Miscellanious initialization
  510.      *
  511.      *         +    assign node's base_id
  512.      *       +    assign debugging hooks and
  513.      *       +    create expression context for node
  514.      * ----------------
  515.      */
  516.     ExecAssignNodeBaseInfo(estate, (BaseNode) hjstate, parent);
  517.     ExecAssignDebugHooks((Plan) node, (BaseNode) hjstate);
  518.     ExecAssignExprContext(estate, (CommonState) hjstate);
  519.  
  520. #define HASHJOIN_NSLOTS 2
  521.     /* ----------------
  522.      *    tuple table initialization
  523.      * ----------------
  524.      */
  525.     ExecInitResultTupleSlot(estate, (CommonState) hjstate);    
  526.     ExecInitOuterTupleSlot(estate,  hjstate);    
  527.     
  528.     /* ----------------
  529.      * initializes child nodes
  530.      * ----------------
  531.      */
  532.     outerNode = get_outerPlan((Plan) node);
  533.     hashNode  = get_innerPlan((Plan) node);
  534.     
  535.     ExecInitNode(outerNode, estate, (Plan) node);
  536.     ExecInitNode(hashNode,  estate, (Plan) node);
  537.     
  538.     /* ----------------
  539.      *    now for some voodoo.  our temporary tuple slot
  540.      *  is actually the result tuple slot of the Hash node
  541.      *  (which is our inner plan).  we do this because Hash
  542.      *  nodes don't return tuples via ExecProcNode() -- instead
  543.      *  the hash join node uses ExecScanHashBucket() to get
  544.      *  at the contents of the hash table.  -cim 6/9/91
  545.      * ----------------
  546.      */
  547.     {
  548.     HashState      hashstate  = get_hashstate((Hash) hashNode);
  549.     TupleTableSlot slot       =
  550.         get_cs_ResultTupleSlot((CommonState)hashstate);
  551.     set_hj_HashTupleSlot(hjstate, (Pointer) slot);
  552.     }
  553.     (void)ExecSetSlotDescriptor(get_hj_OuterTupleSlot(hjstate),
  554.                 ExecGetTupType(outerNode));
  555.     ExecSetSlotExecDescriptor(get_hj_OuterTupleSlot(hjstate),
  556.                 ExecGetExecTupDesc(outerNode));
  557.                   
  558.     /* ----------------
  559.      *     initialize tuple type and projection info
  560.      * ----------------
  561.      */
  562.     ExecAssignResultTypeFromTL((Plan) node, (CommonState) hjstate);
  563.     ExecAssignProjectionInfo((Plan) node, (CommonState) hjstate);
  564.  
  565.     /* ----------------
  566.      *    XXX comment me
  567.      * ----------------
  568.      */
  569.     if (IsMaster && ParallelExecutorEnabled())
  570.     {
  571.         set_hj_HashTable(hjstate, get_hashjointable(node));
  572.     }
  573.     else
  574.     {
  575.         set_hj_HashTable(hjstate, NULL);
  576.     }
  577.     
  578.     set_hashdone(node, false);
  579.     
  580.     set_hj_HashTableShmId(hjstate, 0);
  581.     set_hj_CurBucket(hjstate, NULL);
  582.     set_hj_CurTuple(hjstate, NULL);
  583.     set_hj_InnerHashKey(hjstate, NULL);
  584.     set_hj_OuterBatches(hjstate, NULL);
  585.     set_hj_InnerBatches(hjstate, NULL);
  586.     set_hj_OuterReadPos(hjstate, NULL);
  587.     set_hj_OuterReadBlk(hjstate, 0);
  588.     
  589.     set_cs_OuterTupleSlot((CommonState) hjstate, (TupleTableSlot) NULL);
  590.     set_cs_TupFromTlist((CommonState) hjstate, (bool) false);
  591.  
  592.     /* ----------------
  593.      *  return true
  594.      * ----------------
  595.      */
  596.     return
  597.     LispTrue;
  598. }
  599.  
  600. int
  601. ExecCountSlotsHashJoin(node)
  602.     Plan node;
  603. {
  604.     return ExecCountSlotsNode(get_outerPlan(node)) +
  605.        ExecCountSlotsNode(get_innerPlan(node)) +
  606.        HASHJOIN_NSLOTS;
  607. }
  608.  
  609. /* ----------------------------------------------------------------
  610.  *       ExecEndHashJoin
  611.  *
  612.  *       clean up routine for HashJoin node
  613.  * ----------------------------------------------------------------
  614.  */
  615.  
  616. /**** xxref:
  617.  *           ExecEndNode
  618.  ****/
  619. void
  620. ExecEndHashJoin(node)
  621.     HashJoin node;
  622. {
  623.     HashJoinState   hjstate;
  624.     
  625.     /* ----------------
  626.      *    get info from the HashJoin state 
  627.      * ----------------
  628.      */
  629.     hjstate = get_hashjoinstate(node);
  630.  
  631.     /* ----------------
  632.      * free hash table in case we end plan before all tuples are retrieved
  633.      * ---------------
  634.      */
  635.     if (get_hj_HashTable(hjstate)) {
  636.     ExecHashTableDestroy(get_hj_HashTable(hjstate));
  637.     set_hj_HashTable(hjstate, NULL);
  638.       }
  639.  
  640.     /* ----------------
  641.      *    Free the projection info and the scan attribute info
  642.      *
  643.      *  Note: we don't ExecFreeResultType(hjstate) 
  644.      *        because the rule manager depends on the tupType
  645.      *          returned by ExecMain().  So for now, this
  646.      *          is freed at end-transaction time.  -cim 6/2/91     
  647.      * ----------------
  648.      */    
  649.     ExecFreeProjectionInfo((CommonState) hjstate);
  650.  
  651.     /* ----------------
  652.      * clean up subtrees 
  653.      * ----------------
  654.      */
  655.     ExecEndNode(get_outerPlan((Plan) node));
  656.     ExecEndNode(get_innerPlan((Plan) node));
  657.  
  658.     /* ----------------
  659.      *  clean out the tuple table
  660.      * ----------------
  661.      */
  662.     ExecClearTuple((Pointer) get_cs_ResultTupleSlot((CommonState) hjstate));
  663.     ExecClearTuple(get_hj_OuterTupleSlot(hjstate));
  664.     ExecClearTuple(get_hj_HashTupleSlot(hjstate));
  665.  
  666.  
  667. /* ----------------------------------------------------------------
  668.  *       ExecHashJoinOuterGetTuple
  669.  *
  670.  *       get the next outer tuple for hashjoin: either by
  671.  *    executing a plan node as in the first pass, or from
  672.  *    the tmp files for the hashjoin batches.
  673.  * ----------------------------------------------------------------
  674.  */
  675.  
  676. TupleTableSlot
  677. ExecHashJoinOuterGetTuple(node, hjstate)
  678.     Plan node;
  679.     HashJoinState hjstate;
  680. {
  681.     TupleTableSlot    slot;
  682.     HashJoinTable    hashtable;
  683.     int            curbatch;
  684.     File         *outerbatches;
  685.     char         *outerreadPos;
  686.     int         batchno;
  687.     char         *outerreadBuf;
  688.     int         outerreadBlk;
  689.  
  690.     hashtable = get_hj_HashTable(hjstate);
  691.     curbatch = hashtable->curbatch;
  692.  
  693.     if (curbatch == 0) {  /* if it is the first pass */
  694.     slot = ExecProcNode(node);
  695.     return slot;
  696.     }
  697.     
  698.     /*
  699.      * otherwise, read from the tmp files
  700.      */
  701.     outerbatches = get_hj_OuterBatches(hjstate);
  702.     outerreadPos = get_hj_OuterReadPos(hjstate);
  703.     outerreadBlk = get_hj_OuterReadBlk(hjstate);
  704.     if (ParallelExecutorEnabled())
  705.        outerreadBuf = ABSADDR(hashtable->readbuf) + 
  706.               SlaveInfoP[MyPid].groupPid * BLCKSZ; 
  707.     else
  708.        outerreadBuf = ABSADDR(hashtable->readbuf); 
  709.     batchno = curbatch - 1;
  710.     
  711.    slot = ExecHashJoinGetSavedTuple(hjstate,
  712.                     outerreadBuf,
  713.                     outerbatches[batchno],
  714.                     get_hj_OuterTupleSlot(hjstate),
  715.                     &outerreadBlk,
  716.                     &outerreadPos);
  717.     
  718.     set_hj_OuterReadPos(hjstate, outerreadPos);
  719.     set_hj_OuterReadBlk(hjstate, outerreadBlk);
  720.     
  721.     return slot;
  722. }
  723.  
  724. /* ----------------------------------------------------------------
  725.  *       ExecHashJoinGetSavedTuple
  726.  *
  727.  *       read the next tuple from a tmp file using a certain buffer
  728.  * ----------------------------------------------------------------
  729.  */
  730.  
  731. TupleTableSlot
  732. ExecHashJoinGetSavedTuple(hjstate, buffer, file, tupleSlot, block, position)
  733.     HashJoinState hjstate;    
  734.     char       *buffer;
  735.     File       file;
  736.     Pointer      tupleSlot;
  737.     int       *block;       /* return parameter */
  738.     char       **position;  /* return parameter */
  739. {
  740.     char     *bufstart;
  741.     char     *bufend;
  742.     int         cc;
  743.     HeapTuple     heapTuple;
  744.     HashJoinTable hashtable;
  745.  
  746.     hashtable = get_hj_HashTable(hjstate);
  747.     bufend = buffer + *(long*)buffer;
  748.     bufstart = (char*)(buffer + sizeof(long));
  749.     if ((*position == NULL) || (*position >= bufend)) {
  750.     if (*position == NULL)
  751.         if (ParallelExecutorEnabled())
  752.         (*block) = SlaveInfoP[MyPid].groupPid;
  753.         else
  754.             (*block) = 0;
  755.     else
  756.         if (ParallelExecutorEnabled())
  757.         (*block) += hashtable->nprocess;
  758.         else
  759.             (*block)++;
  760.     FileSeek(file, *block * BLCKSZ, L_SET);
  761.     cc = FileRead(file, buffer, BLCKSZ);
  762.     NDirectFileRead++;
  763.     if (cc < 0)
  764.         perror("FileRead");
  765.     if (cc == 0)  /* end of file */
  766.         return NULL;
  767.     else
  768.         (*position) = bufstart;
  769.     }
  770.     heapTuple = (HeapTuple) (*position);
  771.     (*position) = (char*)LONGALIGN(*position + heapTuple->t_len);
  772.  
  773.     return (TupleTableSlot)
  774.     ExecStoreTuple((Pointer) heapTuple,
  775.                tupleSlot,
  776.                InvalidBuffer,
  777.                false);
  778. }
  779.  
  780. /* ----------------------------------------------------------------
  781.  *       ExecHashJoinNewBatch
  782.  *
  783.  *       switch to a new hashjoin batch
  784.  * ----------------------------------------------------------------
  785.  */
  786.  
  787. int
  788. ExecHashJoinNewBatch(hjstate)
  789.     HashJoinState hjstate;
  790. {
  791.     File         *innerBatches;
  792.     File         *outerBatches;
  793.     int         *innerBatchSizes;
  794.     Var         innerhashkey;
  795.     HashJoinTable     hashtable;
  796.     int         nbatch;
  797.     char         *readPos;
  798.     int         readBlk;
  799.     char         *readBuf;
  800.     TupleTableSlot     slot;
  801.     ExprContext     econtext;
  802.     int         i;
  803.     int         cc;
  804.     int            newbatch;
  805.  
  806.     hashtable = get_hj_HashTable(hjstate);
  807.     outerBatches = get_hj_OuterBatches(hjstate);
  808.     innerBatches = get_hj_InnerBatches(hjstate);
  809.     nbatch = hashtable->nbatch;
  810.     newbatch = hashtable->curbatch + 1;
  811. #ifdef sequent
  812.     /* -----------------
  813.      *  We want to make sure that only the last process does
  814.      *  the cleanup and switching to the new batch  and all
  815.      *  the other processes have to wait until the entire batch
  816.      *  has been finished.  This takes a counter, hashtable->pcount
  817.      *  and a barrier, hashtable->batchBarrier to achieve
  818.      * -----------------
  819.      */
  820.     if (ParallelExecutorEnabled())
  821.         S_LOCK(&(hashtable->tableLock));
  822. #endif
  823.     if (ParallelExecutorEnabled() && --(hashtable->pcount) > 0) {
  824. #ifdef sequent
  825.       /* ----------------
  826.        *  if it is not the last process, wait on the barrier
  827.        * ----------------
  828.        */
  829.       S_UNLOCK(&(hashtable->tableLock));
  830.           S_WAIT_BARRIER(&(hashtable->batchBarrier));
  831. #endif
  832.       if (newbatch > nbatch)
  833.           return newbatch;
  834.        }
  835.     else {
  836.     /* ------------------
  837.      *  this is the last process, so it will do the cleanup and
  838.      *  batch-switching.
  839.      * ------------------
  840.      */
  841.     if (newbatch == 1) {
  842.         /* 
  843.          * if it is end of the first pass, flush all the last pages for
  844.          * the batches.
  845.          */
  846.         outerBatches = get_hj_OuterBatches(hjstate);
  847.         for (i=0; i<nbatch; i++) {
  848.         cc = FileSeek(outerBatches[i], 0L, L_XTND);
  849.         if (cc < 0)
  850.             perror("FileSeek");
  851.         cc = FileWrite(outerBatches[i],
  852.                    ABSADDR(hashtable->batch) + i * BLCKSZ, BLCKSZ);
  853.         NDirectFileWrite++;
  854.         if (cc < 0)
  855.             perror("FileWrite");
  856.         }
  857.     }
  858.     if (newbatch > 1) {
  859.         /*
  860.          * remove the previous outer batch
  861.          */
  862.         FileUnlink(outerBatches[newbatch - 2]);
  863.     }
  864.     /*
  865.      * rebuild the hash table for the new inner batch
  866.      */
  867.     innerBatchSizes = (int*)ABSADDR(hashtable->innerbatchSizes);
  868.     /* --------------
  869.      *  skip over empty inner batches
  870.      * --------------
  871.      */
  872.     while (newbatch <= nbatch && innerBatchSizes[newbatch - 1] == 0) {
  873.        FileUnlink(outerBatches[newbatch-1]);
  874.        FileUnlink(innerBatches[newbatch-1]);
  875.        newbatch++;
  876.       }
  877.     if (newbatch > nbatch) {
  878.        hashtable->pcount = hashtable->nprocess;
  879. #ifdef sequent
  880.        S_UNLOCK(&(hashtable->tableLock));
  881.            S_WAIT_BARRIER(&(hashtable->batchBarrier));
  882. #endif
  883.        return newbatch;
  884.      }
  885.     ExecHashTableReset(hashtable, innerBatchSizes[newbatch - 1]);
  886.  
  887. #ifdef sequent
  888.     /* -------------------
  889.      *  batch change completed
  890.      *  release the barrier, then reset the barrier for the next batch
  891.      * -------------------
  892.      */
  893.         if (ParallelExecutorEnabled()) {
  894.        S_WAIT_BARRIER(&(hashtable->batchBarrier));
  895.        S_INIT_BARRIER(&(hashtable->batchBarrier), hashtable->nprocess);
  896.        S_UNLOCK(&(hashtable->tableLock));
  897.       }
  898. #endif
  899.     }
  900.     econtext = get_cs_ExprContext((CommonState) hjstate);
  901.     innerhashkey = get_hj_InnerHashKey(hjstate);
  902.     readPos = NULL;
  903.     readBlk = 0;
  904.     if (ParallelExecutorEnabled())
  905.        /* ----------------------
  906.     *  build the hash table of the new batch in parallel
  907.     * ----------------------
  908.     */
  909.        readBuf = ABSADDR(hashtable->readbuf) + 
  910.          SlaveInfoP[MyPid].groupPid * BLCKSZ;
  911.     else
  912.        readBuf = ABSADDR(hashtable->readbuf);
  913.  
  914.     while ((slot = ExecHashJoinGetSavedTuple(hjstate,
  915.                         readBuf, 
  916.                         innerBatches[newbatch-1],
  917.                         get_hj_HashTupleSlot(hjstate),
  918.                         &readBlk,
  919.                         &readPos))
  920.        && ! TupIsNull((Pointer) slot)) {
  921.     set_ecxt_innertuple(econtext, slot);
  922.     ExecHashTableInsert(hashtable, econtext, innerhashkey,NULL);
  923.                 /* possible bug - glass */
  924.     }
  925.     
  926.     
  927. #ifdef sequent
  928.     /* ---------------
  929.      *  we want to make sure that the processes proceed to the probe
  930.      *  phase after all the processes finish the build phase
  931.      * ---------------
  932.      */
  933.     if (ParallelExecutorEnabled())
  934.     S_LOCK(&(hashtable->tableLock));
  935. #endif
  936.     if (ParallelExecutorEnabled() && --(hashtable->pcount) > 0) {
  937. #ifdef sequent
  938.     /* ----------------
  939.      *  if not the last process, wait on the barrier
  940.      * ----------------
  941.      */
  942.     S_UNLOCK(&(hashtable->tableLock));
  943.     S_WAIT_BARRIER(&(hashtable->batchBarrier));
  944. #endif
  945.        }
  946.     else {
  947.     /* -----------------
  948.      *  only the last process comes to this branch
  949.      *  now all the processes have finished the build phase
  950.      * ----------------
  951.      */
  952.  
  953.     /*
  954.      * after we build the hash table, the inner batch is no longer needed
  955.      */
  956.     FileUnlink(innerBatches[newbatch - 1]);
  957.     set_hj_OuterReadPos(hjstate, NULL);
  958.     hashtable->pcount = hashtable->nprocess;
  959. #ifdef sequent
  960.     /* -----------------
  961.      *  release the barrier, then reset it for the next batch
  962.      * -----------------
  963.      */
  964.     if (ParallelExecutorEnabled()) {
  965.         S_WAIT_BARRIER(&(hashtable->batchBarrier));
  966.         S_INIT_BARRIER(&(hashtable->batchBarrier), hashtable->nprocess);
  967.         S_UNLOCK(&(hashtable->tableLock));
  968.       }
  969. #endif
  970.        }
  971.     hashtable->curbatch = newbatch;
  972.     return newbatch;
  973. }
  974.  
  975. /* ----------------------------------------------------------------
  976.  *       ExecHashJoinGetBatch
  977.  *
  978.  *       determine the batch number for a bucketno
  979.  *      +----------------+-------+-------+ ... +-------+
  980.  *    0             nbuckets                       totalbuckets
  981.  * batch         0           1       2     ...
  982.  * ----------------------------------------------------------------
  983.  */
  984.  
  985. int
  986. ExecHashJoinGetBatch(bucketno, hashtable, nbatch)
  987. int bucketno;
  988. HashJoinTable hashtable;
  989. int nbatch;
  990. {
  991.     int b;
  992.     if (bucketno < hashtable->nbuckets || nbatch == 0)
  993.        return 0;
  994.     
  995.     b = (float)(bucketno - hashtable->nbuckets) /
  996.     (float)(hashtable->totalbuckets - hashtable->nbuckets) *
  997.     nbatch;
  998.     return b+1;
  999. }
  1000.  
  1001. /* ----------------------------------------------------------------
  1002.  *       ExecHashJoinSaveTuple
  1003.  *
  1004.  *       save a tuple to a tmp file using a buffer.
  1005.  *    the first few bytes in a page is an offset to the end
  1006.  *    of the page.
  1007.  * ----------------------------------------------------------------
  1008.  */
  1009.  
  1010. char *
  1011. ExecHashJoinSaveTuple(heapTuple, buffer, file, position)
  1012.     HeapTuple    heapTuple;
  1013.     char     *buffer;
  1014.     File     file;
  1015.     char     *position;
  1016. {
  1017.     long    *pageend;
  1018.     char    *pagestart;
  1019.     char    *pagebound;
  1020.     int        cc;
  1021.  
  1022.     pageend = (long*)buffer;
  1023.     pagestart = (char*)(buffer + sizeof(long));
  1024.     pagebound = buffer + BLCKSZ;
  1025.     if (position == NULL)
  1026.        position = pagestart;
  1027.     
  1028.     if (position + heapTuple->t_len >= pagebound) {
  1029.        cc = FileSeek(file, 0L, L_XTND);
  1030.        if (cc < 0)
  1031.       perror("FileSeek");
  1032.        cc = FileWrite(file, buffer, BLCKSZ);
  1033.        NDirectFileWrite++;
  1034.        if (cc < 0)
  1035.       perror("FileWrite");
  1036.        position = pagestart;
  1037.        *pageend = 0;
  1038.       }
  1039.     bcopy(heapTuple, position, heapTuple->t_len);
  1040.     position = (char*)LONGALIGN(position + heapTuple->t_len);
  1041.     *pageend = position - buffer;
  1042.     
  1043.     return position;
  1044. }
  1045.