home *** CD-ROM | disk | FTP | other *** search
/ Complete Linux / Complete Linux.iso / docs / apps / database / postgres / postgre4.z / postgre4 / src / planner / prep / handleunion.c < prev    next >
Encoding:
C/C++ Source or Header  |  1992-08-27  |  19.3 KB  |  821 lines

  1. /*
  2.  *  Routine takes the 'union' parsetree, messes around with it, 
  3.  *  strips the union relations, and returns
  4.  *  a list of orthogonal plans.
  5.  *
  6.  *  NOTE: Current implementation of versions disallows schema changes 
  7.  *        between versions.  Thus we may assume that the attributes
  8.  *        remain the same between versions and their deltas.
  9.  *
  10.  * $Header: /private/postgres/src/planner/prep/RCS/handleunion.c,v 1.12 1991/11/18 17:29:14 mer Exp $
  11.  */
  12.  
  13. #include "tmp/c.h"
  14.  
  15. #include "nodes/nodes.h"
  16. #include "nodes/pg_lisp.h"
  17. #include "nodes/execnodes.h"
  18. #include "nodes/plannodes.h"
  19. #include "nodes/plannodes.a.h"
  20. #include "nodes/primnodes.h"
  21. #include "nodes/primnodes.a.h"
  22. #include "nodes/relation.h"
  23. #include "nodes/relation.a.h"
  24.  
  25. #include "parser/parse.h"
  26. #include "parser/parsetree.h"
  27. #include "tmp/utilities.h"
  28. #include "utils/log.h"
  29. #include "utils/lsyscache.h"
  30.  
  31. #include "planner/internal.h"
  32. #include "planner/plancat.h"
  33. #include "planner/planner.h"
  34. #include "planner/prepunion.h"
  35. #include "planner/clauses.h"
  36. #include "planner/handleunion.h"
  37. #include "planner/semanopt.h"
  38. /* #include "planner/cfi.h" */
  39.  
  40. /* #define LispRemove remove  */
  41.  
  42. List
  43. handleunion (root,rangetable, tlist, qual)
  44.      List root, rangetable, tlist, qual;
  45. {
  46.  
  47.   List new_root = LispNil;
  48.   List temp_tlist = LispNil;
  49.   List temp_qual = LispNil;
  50.   List i = LispNil;
  51.   List tlist_qual = LispNil;
  52.   List rt_entry = LispNil;
  53.   List temp = LispNil;
  54.   List new_parsetree = LispNil;
  55.   List union_plans = LispNil;
  56.   int planno = 0;
  57.   List tmp_uplans = LispNil;
  58.   List firstplan = LispNil;
  59.   List secondplan = LispNil;
  60.   int num_plans = 0;
  61.  
  62.   /*
  63.    * SplitTlistQual returns a list of the form:
  64.    * ((tlist1 qual1) (tlist2 qual2) ...) 
  65.    */
  66.  
  67.   /*
  68.    *  First remove the union flag from the rangetable entries
  69.    *  since we are processing them now.
  70.    */
  71.  
  72.   foreach(i,rangetable) {
  73.     rt_entry = CAR(i);
  74.     if (member(lispAtom("union"),rt_flags(rt_entry))) {
  75.       rt_flags(rt_entry) = LispRemove(lispAtom("union"),
  76.                       rt_flags(rt_entry));
  77.     }
  78.   }
  79.  
  80.   tlist_qual = SplitTlistQual (root,rangetable,tlist,qual);
  81.  
  82. #ifdef VERBOSE
  83.   num_plans = length(tlist_qual);
  84.   printf("####################\n");
  85.   printf("number of plans generated: %d\n", num_plans);
  86.   printf("####################\n");
  87. #endif
  88.   
  89.   foreach (i, tlist_qual) {
  90.     temp = CAR(i);
  91.     new_parsetree = lispCons(root,lispCons(CAR(temp),
  92.                        lispCons(CADR(temp),
  93.                             LispNil) ));
  94.     union_plans = nappend1(union_plans,
  95.                (LispValue)planner(new_parsetree));
  96.  
  97.   }
  98.  
  99.   /*
  100.    * testing:
  101.    */
  102.   
  103. /*  union_plans = CDR(union_plans);  */
  104.  
  105. #ifdef SWAP_PLANS
  106.   firstplan = CAR(union_plans);
  107.   secondplan = CADR(union_plans);
  108.  
  109.   CAR(union_plans) = secondplan;
  110.   CADR(union_plans) = firstplan;
  111. #endif
  112. /*
  113.   foreach(i, union_plans) {
  114.     planno += 1;
  115.     tmp_uplans = nappend1(tmp_uplans, CAR(i));
  116.   }
  117. */
  118.  
  119. return(union_plans);  
  120.  
  121. }
  122.  
  123.  
  124. /*
  125.  *  Given a 'union'ed tlist and qual, this routine splits them
  126.  *  up into separate othrogonal (tlist qual) pairs.
  127.  *  For example, 
  128.  *  tlist:  (emp | emp1).name
  129.  *  qual:   (emp | emp1). salary < 5000
  130.  *  will be split into:
  131.  *  ( ((emp.name) (emp.salary < 5000)) ((emp1.name) (emp1.salary < 5000)) )
  132.  */
  133.  
  134.  
  135. List
  136. SplitTlistQual(root,rangetable,tlist, qual)
  137.      List root,rangetable,tlist, qual;
  138. {
  139.   List i = LispNil;
  140.   List tqlist = LispNil;
  141.   List  temp = LispNil;
  142.   List tl_list = LispNil;
  143.   List qual_list = LispNil;
  144.   List unionlist = LispNil;
  145.   List mod_qual = LispNil;
  146.   List varlist = LispNil;
  147.   List is_redundent = LispNil;
  148.  
  149.   unionlist = collect_union_sets(tlist,qual);
  150. #ifdef VERBOSE
  151.   printf("##########################\n");
  152.   printf("Union sets are: \n");
  153.   lispDisplay(unionlist,0);
  154.   printf("\n");
  155.   fflush(stdout);
  156.   printf("##########################\n");
  157. #endif
  158.   /*
  159.    *  If query is a delete, the tlist is null.
  160.    */
  161.   if (CAtom(root_command_type_atom(root)) == DELETE) 
  162.     foreach(i,unionlist) 
  163.       foreach(temp,CAR(i))
  164.     tl_list = nappend1(tl_list, LispNil);
  165.   else {
  166.     tl_list = lispCons(tlist, LispNil);
  167.     foreach(i, unionlist)   /*  This loop effectively handles union joins */
  168.       tl_list = SplitTlist(CAR(i),tl_list);
  169.   }
  170.   qual_list = lispCons(qual,LispNil);
  171.   foreach(i,unionlist) {
  172.     qual_list = SplitQual(CAR(i),qual_list); 
  173.   }
  174.  
  175. /*
  176.  *  The assumption here is that both the tl_list and the qual_list, 
  177.  *  must be of the same length, if not, something funky has happened.
  178.  *
  179.  *  XXX I think this is not true in the case of 'DELETEs' ! sp.
  180.  *  XXX In this case, 'tl_list' is nil.
  181.  */
  182.   
  183.   if (CAtom(root_command_type_atom(root)) != DELETE)
  184.       if (length (tl_list) != length(qual_list))
  185.     elog (WARN, "UNION:  tlist with missing qual, or vice versa");
  186.  
  187.     if(null(tl_list)) {
  188.       /*
  189.        * this is a special case, where tl_list is null (i.e. this
  190.        * is a DELETE command.
  191.        * DISCLAIMER: I don't know what this code is doing, I don't
  192.        * know what it is supposed to do, all I know is that before it
  193.        * didn't work in queries like:
  194.        * "delete foobar from f in (foo | bar) where foobar.a = 1"
  195.        * and now it works (provided that you made a couple of prayers
  196.        * and/or sacrifices to the gods....).
  197.        * sp._
  198.        */
  199.       temp = NULL;
  200.       varlist = find_allvars(root,rangetable, temp, CAR(qual_list));
  201.       is_redundent = LispNil;
  202.       mod_qual = SemantOpt(varlist,rangetable,CAR(qual_list),&is_redundent,1);
  203.       if (is_redundent != LispTrue) {
  204.         mod_qual = SemantOpt2(rangetable,mod_qual,mod_qual,temp);
  205.         tqlist = nappend1(tqlist,
  206.                   lispCons(temp, 
  207.                        lispCons(mod_qual,
  208.                             LispNil)));
  209.       }
  210.       qual_list = CDR(qual_list);
  211.     } else {
  212.       foreach(i, tl_list) {
  213.     temp = CAR(i);
  214.  
  215.     /*
  216.      *  Varlist contains the list of varnos of relations that participate
  217.      *  in the current query.  It is used to detect existential clauses.
  218.      *  Initially, varlist contains the list of target relations (varnos).
  219.      *  This should only be done once for each query.
  220.      */
  221.  
  222.     varlist = find_allvars(root,rangetable, temp, CAR(qual_list));
  223.     is_redundent = LispNil;
  224.     mod_qual = SemantOpt(varlist,rangetable,CAR(qual_list),&is_redundent,1);
  225.     if (is_redundent != LispTrue) {
  226.       mod_qual = SemantOpt2(rangetable,mod_qual,mod_qual,temp);
  227.       tqlist = nappend1(tqlist,
  228.                 lispCons(temp, 
  229.                      lispCons(mod_qual,
  230.                           LispNil)));
  231.     }
  232.       qual_list = CDR(qual_list);
  233.       }
  234.     }
  235.  
  236.   return (tqlist);
  237.  
  238. }
  239.  
  240.  
  241.  
  242. /*
  243.  * SplitTlist
  244.  * returns a list of the form (tlist1 tlist2 ...)
  245.  */
  246.  
  247. List
  248. SplitTlist (unionlist,tlists)
  249.      List unionlist;
  250.      List tlists;
  251. {
  252.   List i = LispNil;
  253.   List j = LispNil;
  254.   List x = LispNil;
  255.   List tlist = LispNil;
  256.   List t = LispNil;
  257.   List varnum = LispNil;
  258.   List new_tlist ;
  259.   List tle = LispNil;
  260.   List varlist = LispNil;
  261.   Var varnode = (Var)NULL;
  262.   List tl_list = LispNil;
  263.   List flatten_list = LispNil;
  264.  
  265.   foreach(t, tlists) {
  266.     tlist = CAR(t);
  267.     foreach (i, unionlist) {
  268.       varnum = CAR(i);
  269.       new_tlist = LispNil;
  270.       new_tlist = copy_seq_tree(tlist);
  271.  
  272.       foreach (x, new_tlist) {
  273.     tle = CAR(x);
  274.     varlist = tl_expr(tle);
  275.     if (IsA(varlist,Const) || IsA(varlist,Var))
  276.       flatten_list = LispNil;
  277.     else
  278.       if (is_clause(varlist)) 
  279.         split_tlexpr(CDR(varlist),varnum);
  280.       else
  281.         if (CAtom(CAR(varlist)) == UNION) {
  282.           flatten_list = flatten_union_list(CDR(varlist));
  283.           foreach (j, flatten_list) {
  284.         varnode = (Var)CAR(j);
  285.         /*
  286.          * Find matching entry, and change the tlist to it.
  287.          */
  288.  
  289.         if (CInteger(varnum) == get_varno(varnode)) {
  290.           tl_expr(tle) = (List)varnode;
  291.           break;
  292.         }
  293.           }
  294.         } /* for, varlist */
  295.         
  296.       } /* for, new_tlist*/
  297.       
  298.       /*
  299.        *  At this point, the new_tlist is an orthogonal targetlist, without
  300.        *  any union relations.
  301.        */
  302.     
  303.       if (new_tlist == NULL) 
  304.     elog(WARN, "UNION: resulting tlist is empty");
  305.     
  306.       tl_list = nappend1(tl_list,new_tlist);
  307.     }
  308.   } /*tlists */
  309.   return(tl_list);
  310.  
  311. }
  312.  
  313. /*
  314.  * This routine is called to split the tlist whenever
  315.  *  there is an expression in the targetlist.
  316.  * Routine returns nothing as it modifies the tlist in place.
  317.  */
  318.  
  319. void
  320. split_tlexpr(clauses,varnum)
  321.      List clauses;
  322.      List varnum;
  323. {
  324.   List x = LispNil;
  325.   List ulist = LispNil;
  326.   List clause = LispNil;
  327.   List flat_list = LispNil;
  328.   Var varnode = (Var)NULL;
  329.   List i = LispNil;
  330.  
  331.   foreach (x,clauses) {
  332.     clause = CAR(x);
  333.     if (IsA(clause,Const) || 
  334.     IsA(clause,Var) )
  335.       flat_list = LispNil;
  336.     else
  337.       if (is_clause(clause))
  338.     split_tlexpr(CDR(clause), varnum);
  339.       else
  340.     if (consp(clause) &&
  341.         CAtom(CAR(clause)) == UNION) {
  342.       flat_list = flatten_union_list(CDR(clause));
  343.       foreach(i,flat_list) {
  344.         varnode = (Var)CAR(i);
  345.         if (CInteger(varnum) == get_varno(varnode)) {
  346.           CAR(clause) = (List)varnode;
  347.           break;
  348.         }
  349.       }
  350.     }
  351.   }
  352. }
  353.  
  354. /*
  355.  *  collect_union_sets
  356.  *  runs through the tlist and qual, and forms a list of 
  357.  *  unions sets in the query.  
  358.  *  If there is > 1 union set, we are dealing with a union join.
  359.  */
  360.  
  361. List 
  362. collect_union_sets(tlist,qual)
  363.      List tlist;
  364.      List qual;
  365. {
  366.   List i = LispNil;
  367.   List x = LispNil;
  368.   List j = LispNil;
  369.   int varno = 0;
  370.   List tle = LispNil;
  371.   List varlist = LispNil;
  372.   List current_union_set = LispNil;
  373.   List union_sets = LispNil;
  374.   List qual_union_sets = LispNil;
  375.   List flattened_ulist = LispNil;
  376.  
  377. /*
  378.  *  First we run through the targetlist and collect all union sets
  379.  *  there.
  380.  */
  381.  
  382.   foreach(i, tlist) {
  383.     tle = CAR(i);
  384.     varlist = tl_expr(tle);
  385.     current_union_set = LispNil;
  386.  
  387.     if (is_clause(varlist)) { /* if it is an expression */
  388.       flattened_ulist = collect_tlist_uset(CDR(varlist));
  389.       foreach (x, flattened_ulist) 
  390.     current_union_set = nappend1(current_union_set,
  391.                      lispInteger(get_varno((Var)CAR(x))) );
  392.     } else
  393.       if (consp(varlist) &&
  394.       CAtom(CAR(varlist)) == UNION) {
  395.     flattened_ulist = flatten_union_list(CDR(varlist));
  396.     foreach (x, flattened_ulist) 
  397.       current_union_set = nappend1(current_union_set,
  398.                        lispInteger(get_varno((Var)CAR(x))) );
  399.  
  400.       }
  401.     union_sets = nappend1(union_sets,
  402.               current_union_set);
  403.  
  404.   }
  405.   union_sets = remove_subsets(union_sets);
  406.  
  407. /*
  408.  *  Now we run through the qualifications to collect 
  409.  *  union sets.
  410.  */
  411.   
  412.   qual_union_sets = find_qual_union_sets(qual);
  413.   
  414.   if (qual_union_sets != LispNil)
  415.     foreach(i, qual_union_sets)
  416.       union_sets = nappend1(union_sets, CAR(i));
  417.  
  418.   if (length(union_sets) > 1)
  419.     union_sets = remove_subsets(union_sets);
  420.  
  421.   return(union_sets);
  422. }
  423.  
  424. /*
  425.  *  This routine is called whenever there is a 
  426.  *  an expression in the targetlist.
  427.  *  Returns a flattened ulist if found.
  428.  */
  429.  
  430. List collect_tlist_uset(args)
  431.      List args;
  432. {
  433.   List retlist = LispNil;
  434.   List x = LispNil;
  435.   List i = LispNil;
  436.   List current_clause = LispNil;
  437.   List ulist = LispNil;
  438.   
  439.   foreach (x, args) {
  440.     current_clause = CAR(x);
  441.     if (IsA(current_clause,Const))
  442.       retlist = LispNil;
  443.     else
  444.       if (is_clause(current_clause))
  445.     retlist = collect_tlist_uset(CDR(current_clause));
  446.       else
  447.     if (consp(current_clause) &&
  448.         CAtom(CAR(current_clause)) == UNION) {
  449.       retlist = flatten_union_list(CDR(current_clause));
  450.     }
  451.     else
  452.       retlist = LispNil;   /* If it is just a varnode. */
  453.  
  454.     foreach(i,retlist)
  455.       if (CAR(i) != LispNil)
  456.     ulist = nappend1(ulist,CAR(i));
  457.   }
  458.  
  459.   return(ulist);
  460. }
  461. List
  462. find_qual_union_sets(qual)
  463.      List qual;
  464. {
  465.   List leftop = LispNil;
  466.   List rightop = LispNil;
  467.   List i = LispNil;
  468.   List j = LispNil;
  469.   List union_sets = LispNil;
  470.   List current_union_set = LispNil;
  471.   List qual_uset = LispNil;
  472.   List flattened_ulist = LispNil;
  473.  
  474.   if (null(qual))
  475.     return(LispNil);
  476.   else
  477.     if (is_clause(qual)) {
  478.       leftop = (List) get_leftop(qual);
  479.       rightop = (List) get_rightop(qual);
  480.  
  481.       if (consp(leftop) && CAtom(CAR(leftop)) == UNION) {
  482.     current_union_set = LispNil;
  483.     flattened_ulist = flatten_union_list(CDR(leftop));
  484.     foreach(i, flattened_ulist) 
  485.       current_union_set = nappend1(current_union_set,
  486.                        lispInteger(get_varno((Var)CAR(i))) );
  487.     union_sets = nappend1(union_sets,
  488.                   current_union_set);
  489.       }    
  490.       if (consp(rightop) && CAtom(CAR(rightop)) == UNION) {
  491.     current_union_set = LispNil;
  492.     flattened_ulist = flatten_union_list(CDR(rightop));
  493.     foreach(i, flattened_ulist) 
  494.       current_union_set = nappend1(current_union_set,
  495.                        lispInteger(get_varno((Var)CAR(i))) );
  496.     union_sets = nappend1(union_sets,
  497.                   current_union_set);
  498.       }
  499.     } else
  500.       if (and_clause(qual)) {
  501.     foreach(j,get_andclauseargs(qual)) {
  502.       qual_uset = find_qual_union_sets(CAR(j));
  503.       if (qual_uset != LispNil) 
  504.         foreach(i,qual_uset) 
  505.           union_sets = nappend1(union_sets,
  506.                     CAR(i));
  507.     }
  508.       } else
  509.     if (or_clause(qual)) {
  510.       foreach(j,get_orclauseargs(qual)) {
  511.         qual_uset = find_qual_union_sets(CAR(j));
  512.         if (qual_uset != LispNil)
  513.           foreach(i,qual_uset)
  514.         union_sets = nappend1(union_sets,
  515.                       CAR(i));
  516.       }
  517.     } else
  518.       if (not_clause(qual)) {
  519.         qual_uset = find_qual_union_sets(CDR(qual));
  520.         if (qual_uset != LispNil)
  521.           foreach(i,qual_uset)
  522.         union_sets = nappend1(union_sets,
  523.                       CAR(i));
  524.       }
  525.  
  526.   return(union_sets);
  527.  
  528. }
  529.  
  530. List
  531. flatten_union_list(ulist)
  532.      List ulist;
  533. {
  534.   List retlist = LispNil;
  535.   List i = LispNil;
  536.   List tmp_var = LispNil;
  537.   List tmplist = LispNil;
  538.  
  539.   foreach(i,ulist) {
  540.     tmp_var = CAR(i);
  541.     if (consp(tmp_var) && CAtom(CAR(tmp_var)) == UNION)
  542.       tmplist = flatten_union_list(CDR(tmp_var));
  543.     else
  544.       retlist = nappend1(retlist, tmp_var);
  545.   }
  546.  
  547.   if (tmplist)
  548.     foreach(i, tmplist)
  549.       retlist= nappend1(retlist, CAR(i));
  550.  
  551.   return(retlist);
  552.   
  553. }
  554.  
  555. /*
  556.  *  remove_subsets
  557.  *  Routine finds and removes any subsets in the given list.
  558.  *  Returns the modified list.
  559.  *  Thus given ( (A B C) (B C) (D E F) (D F) ), routine returns:
  560.  *  ( (A B C) (D E F) )
  561.  */
  562.  
  563. List 
  564. remove_subsets(usets)
  565.      List usets;
  566. {
  567.   List retlist = LispNil;
  568.   List i = LispNil;
  569.   List j = LispNil;
  570.   List k = LispNil;
  571.   List uset1 = LispNil;
  572.   List uset2 = LispNil;
  573.   int is_subset = 1;
  574.  
  575.   foreach(i, usets) {
  576.     uset1 = CAR(i);
  577.     foreach(j, CDR(i)) {
  578.       uset2 = CAR(j);
  579.       if (uset2 == LispNil)
  580.     break;
  581.  
  582.  
  583.       if (length (uset1) >= length(uset2) &&
  584.       length(uset2) != 1) {
  585.     foreach(k, uset2) 
  586.       if (!member(CAR(k),uset1)) {
  587.         is_subset = 0;
  588.         break;
  589.       }
  590.     if (is_subset) 
  591.       CAR(j) = lispCons(lispInteger(1), LispNil);
  592.       } /* uset1 > uset2 */
  593.       else 
  594.     if (length(uset1) < length(uset2) &&
  595.         length(uset1) != 1) {
  596.       foreach(k,uset1) 
  597.         if (!member(CAR(k), uset2)) {
  598.           is_subset = 0;
  599.           break;
  600.         }
  601.       if (is_subset) 
  602.         CAR(i)= lispCons(lispInteger(1),LispNil);
  603.     }
  604.       is_subset = 1;
  605.     } /* inner loop usets */
  606.   } /* outer loop usets */
  607.   
  608.   foreach (i, usets) 
  609.     if (length (CAR(i)) > 1)
  610.       retlist = nappend1(retlist, CAR(i) );
  611.  
  612.   return(retlist);
  613.  
  614. }
  615.  
  616. /*
  617.  *  SplitQual:
  618.  *
  619.  *  Given a list of union rels, this routine splits the qualification
  620.  *  up accordingly.  
  621.  *  NOTE:  Only union rels in the qualification will be split.
  622.  *  ( I believe that utimately, we want to split union rels by position. )
  623.  *
  624.  *  Input:  (uvar1 uvar2 ...) (unioned qual)
  625.  *  Output: (qual1 qual2 ...)
  626.  */
  627.  
  628. List 
  629. SplitQual (ulist, uquals)
  630.      List ulist;
  631.      List uquals;
  632. {
  633.   List i = LispNil;
  634.   List x = LispNil;
  635.   List uqual = LispNil;
  636.   List uvarno = LispNil;
  637.   List qual_list = LispNil;
  638.   List currentlist = LispNil;
  639.   List new_qual = LispNil;
  640.  
  641.   foreach(x, uquals) {
  642.     uqual = CAR(x);
  643.     if (uqual == LispNil) {
  644.       foreach(i, ulist) 
  645.     qual_list = nappend1(qual_list, LispNil);
  646.       return(qual_list);
  647.     }
  648.     foreach(i, ulist) {
  649.       uvarno = CAR(i);
  650.       currentlist = nappend1(currentlist, uvarno);
  651.       new_qual = copy_seq_tree(uqual);
  652.       new_qual = find_matching_union_qual(currentlist, new_qual);
  653.       qual_list = nappend1(qual_list, new_qual);
  654.       currentlist = LispNil;
  655.     }
  656.   }
  657.   return(qual_list);
  658.  
  659. }
  660.  
  661. /*
  662.  * find_matching_union_qual
  663.  *
  664.  * Given a union qual, and a union relation, returns 
  665.  * the qualifications that apply to it.
  666.  */
  667.  
  668. List 
  669. find_matching_union_qual(ulist, qual)
  670.      List ulist, qual;
  671. {
  672.   List leftop = LispNil;
  673.   List rightop = LispNil;
  674.   List op = LispNil;
  675.   List matching_clauseargs = LispNil;
  676.   List retqual = LispNil;
  677.   List i = LispNil;
  678.   List tmp_list = LispNil;
  679.  
  680.   if (null(qual))
  681.     return(LispNil);
  682.   else
  683.     if (is_clause(qual)) {
  684.       leftop = (List)get_leftop(qual);
  685.       rightop = (List)get_rightop(qual);
  686.       op = get_op(qual);
  687.       leftop = find_matching_union_qual(ulist,leftop);
  688.       rightop = find_matching_union_qual(ulist,rightop);
  689.       match_union_clause(ulist, &leftop, &rightop);
  690.       retqual = make_opclause((Oper)op,(Var)leftop,(Var)rightop);
  691.  
  692.       /*
  693.        * if the clause does not belong to the current union relation,
  694.        * remove it from the qualification.
  695.        */
  696.  
  697.       /*
  698.      if (matching_clauseargs == LispNil)
  699.     remove(clause, clauses);  * NOTE: may have to mod. qual *
  700.       else
  701.     clauseargs = matching_clauseargs; 
  702.       */
  703.     }
  704.   else
  705.     if (and_clause (qual)) {
  706.       foreach(i, get_andclauseargs(qual)) {
  707.     tmp_list = nappend1(tmp_list, find_matching_union_qual(ulist, 
  708.                                  CAR(i)));
  709.       }
  710.       retqual = make_andclause(tmp_list);
  711.     }
  712.     else
  713.       if (or_clause (qual)) {
  714.     foreach(i, get_orclauseargs(qual)) {
  715.       tmp_list = nappend1(tmp_list, find_matching_union_qual(ulist, 
  716.                                  CAR(i)));
  717.     }
  718.     retqual = make_orclause(tmp_list);
  719.       }
  720.       else
  721.     if (not_clause (qual)) {
  722.       retqual = find_matching_union_qual(ulist, 
  723.                          get_notclausearg(qual));
  724.       retqual = make_notclause(retqual);
  725.     }
  726.     else
  727.       retqual = qual;
  728.     
  729.   return(retqual);
  730. }
  731. /*
  732.  *  match_union_clauses:
  733.  *
  734.  *  Given a list of *2* clause args, (leftarg rightarg), 
  735.  *  it picks out the clauses within unions that
  736.  *  are relevant to the particular union relation being scanned.
  737.  *
  738.  *  returns nothing. It modifies the qual in place.
  739.  */
  740.  
  741. void
  742. match_union_clause (unionlist, leftarg, rightarg)
  743.      List unionlist, *leftarg, *rightarg;
  744. {
  745.   List i = LispNil;
  746.   List x = LispNil;
  747.   List varlist = LispNil;
  748.   Var uvarnode = (Var)NULL;
  749.   List varlist2 = LispNil;
  750.  
  751.   /*
  752.    *  If leftarg is a union var.
  753.    */
  754.   if (consp(*leftarg) && CAtom(CAR(*leftarg)) == UNION) {
  755.     varlist = flatten_union_list(CDR(*leftarg));
  756. /*    varlist = CDR(*leftarg); */
  757.     foreach(i,varlist) {
  758.       uvarnode = (Var)CAR(i);
  759.       if (member(lispInteger(get_varno(uvarnode)), unionlist)) {
  760.     *leftarg = (List)uvarnode;
  761.     break;
  762.       }
  763.     }
  764.   }
  765.   
  766.   /*
  767.    *  If rightarg is a union var.
  768.    */
  769.   if (consp(*rightarg) && CAtom(CAR(*rightarg)) == UNION) {
  770.     varlist = flatten_union_list(CDR(*rightarg));
  771. /*    varlist = CDR(*rightarg); */
  772.     foreach(i,varlist) {
  773.       uvarnode = (Var)CAR(i);
  774.       if (member(lispInteger(get_varno(uvarnode)), unionlist)) {
  775.     *rightarg = (List)uvarnode;
  776.     break;
  777.       }
  778.     }
  779.   }
  780.  
  781. }
  782. #ifdef NOT_USED
  783. /*
  784.  *  Routine is not used.
  785.  *  find_union_vars
  786.  *  runs through the rangetable, and forms a list of all the 
  787.  *  relations that are unioned. It will also remove the union
  788.  *  flag from the rangetable entries after it is done processing
  789.  *  them.
  790.  *  REturns a list of varnos.
  791.  */
  792.  
  793. List
  794. find_union_vars (rangetable)
  795.      List rangetable;
  796. {
  797.   
  798.   List i = LispNil;
  799.   List unionlist = LispNil;
  800.   List rt_entry = LispNil;
  801.   Index uvarno = 0;
  802.  
  803.   /*
  804.    * XXX what about the case when there are 2 emps in the rangetable, and
  805.    * both are union relations?
  806.    * Currently, both would appear in the unionlist.
  807.    */
  808.  
  809.   foreach( i, rangetable) {
  810.     rt_entry = CAR(i);
  811.     uvarno += 1;
  812.     if (member(lispAtom("union"),rt_flags(rt_entry))) {
  813.       unionlist = nappend1(unionlist, lispInteger(uvarno));
  814.       rt_flags(rt_entry) = LispRemove(lispAtom("union"), 
  815.                   rt_flags(rt_entry)); 
  816.     }     
  817.   }
  818.   return(unionlist);
  819. }
  820. #endif
  821.