home *** CD-ROM | disk | FTP | other *** search
/ Complete Linux / Complete Linux.iso / docs / apps / database / postgres / postgre4.z / postgre4 / src / commands / copy.c < prev    next >
Encoding:
C/C++ Source or Header  |  1992-08-27  |  18.8 KB  |  759 lines

  1. /*
  2.  * "$Header: /private/postgres/src/commands/RCS/copy.c,v 1.38 1992/08/16 03:38:13 mer Exp $"
  3.  */
  4.  
  5. #include <stdio.h>
  6.  
  7. #include "tmp/postgres.h"
  8. #include "tmp/globals.h"
  9. #include "tmp/align.h"
  10. #include "catalog/syscache.h"
  11. #include "catalog/pg_type.h"
  12. #include "catalog/pg_index.h"
  13.  
  14. #include "access/heapam.h"
  15. #include "access/htup.h"
  16. #include "access/itup.h"
  17. #include "access/relscan.h"
  18. #include "utils/rel.h"
  19. #include "utils/log.h"
  20. #include "tmp/daemon.h"
  21. #include "utils/fmgr.h"
  22. #include "machine.h"
  23.  
  24. /*
  25.  * New copy code.
  26.  * 
  27.  * This code "knows" the following about tuples:
  28.  * 
  29.  */
  30.  
  31. static bool reading_from_input = false;
  32.  
  33. extern FILE *Pfout, *Pfin;
  34.  
  35. void
  36. DoCopy(relname, binary, from, pipe, filename)
  37.  
  38. char *relname;
  39. bool binary, from, pipe;
  40. char *filename;
  41.  
  42. {
  43.     FILE *fp;
  44.     Relation rel, heap_openr();
  45.  
  46.     reading_from_input = pipe;
  47.  
  48.     rel = heap_openr(relname);
  49.     if (rel == NULL) elog(WARN, "Copy: class %s does not exist.", relname);
  50.  
  51.     if (from)
  52.     {
  53.         if (pipe && IsUnderPostmaster) ReceiveCopyBegin();
  54.         if (IsUnderPostmaster)
  55.         {
  56.             fp = pipe ? Pfin : fopen(filename, "r");
  57.         }
  58.         else
  59.         {
  60.             fp = pipe ? stdin : fopen(filename, "r");
  61.         }
  62.         if (fp == NULL) 
  63.         {
  64.             elog(WARN, "COPY: file %s could not be open for reading", filename);
  65.         }
  66.         CopyFrom(rel, binary, fp);
  67.     }
  68.     else
  69.     {
  70.         if (pipe && IsUnderPostmaster) SendCopyBegin();
  71.         if (IsUnderPostmaster)
  72.         {
  73.             fp = pipe ? Pfout : fopen(filename, "w");
  74.         }
  75.         else
  76.         {
  77.             fp = pipe ? stdout : fopen(filename, "w");
  78.         }
  79.         if (fp == NULL) 
  80.         {
  81.             elog(WARN, "COPY: file %s could not be open for writing", filename);
  82.         }
  83.         CopyTo(rel, binary, fp);
  84.     }
  85.     if (!pipe)
  86.     {
  87.         fclose(fp);
  88.     }
  89.     else if (!from && !binary)
  90.     {
  91.         fputs(".\n", fp);
  92.         if (IsUnderPostmaster) fflush(Pfout);
  93.     }
  94. }
  95.  
  96. CopyTo(rel, binary, fp)
  97.  
  98. Relation rel;
  99. bool binary;
  100. FILE *fp;
  101.  
  102. {
  103.     HeapTuple tuple, heap_getnext();
  104.     Relation heap_openr();
  105.     HeapScanDesc scandesc, heap_beginscan();
  106.  
  107.     int32 attr_count, i;
  108.     Attribute *attr;
  109.     func_ptr *out_functions;
  110.     int dummy;
  111.     ObjectId out_func_oid;
  112.     ObjectId *elements;
  113.     Datum value;
  114.     Boolean isnull = (Boolean) true;
  115.     char *nulls;
  116.     char *string;
  117.     bool *byval;
  118.     int32 ntuples;
  119.  
  120.     scandesc = heap_beginscan(rel, 0, NULL, NULL, NULL);
  121.  
  122.     attr_count = rel->rd_rel->relnatts;
  123.     attr = (Attribute *) &rel->rd_att;
  124.  
  125.     if (!binary)
  126.     {
  127.         out_functions = (func_ptr *)
  128.                          palloc(attr_count * sizeof(func_ptr));
  129.         elements = (ObjectId *) palloc(attr_count * sizeof(ObjectId));
  130.         for (i = 0; i < attr_count; i++) {
  131.             out_func_oid = (ObjectId) GetOutputFunction(attr[i]->atttypid);
  132.             fmgr_info(out_func_oid, &out_functions[i], &dummy);
  133.             elements[i] = GetTypeElement(attr[i]->atttypid);
  134.         }
  135.     }
  136.     else
  137.     {
  138.         nulls = (char *) palloc(attr_count);
  139.         for (i = 0; i < attr_count; i++) nulls[i] = ' ';
  140.  
  141.         /* XXX expensive */
  142.  
  143.         ntuples = CountTuples(rel);
  144.         fwrite(&ntuples, sizeof(int32), 1, fp);
  145.     }
  146.  
  147.     for (tuple = heap_getnext(scandesc, NULL, NULL);
  148.          tuple != NULL; 
  149.          tuple = heap_getnext(scandesc, NULL, NULL))
  150.     {
  151.         for (i = 0; i < attr_count; i++)
  152.         {
  153.             value = (Datum) 
  154.                     heap_getattr(tuple, InvalidBuffer, i+1, attr, &isnull);
  155.             if (!binary)
  156.             {
  157.                 if (!isnull)
  158.                 {
  159.                     string = (char *) (out_functions[i]) (value, elements[i]);
  160.                     CopyAttributeOut(fp, string);
  161.                     pfree(string);
  162.                 }
  163.  
  164.                 if (i == attr_count - 1)
  165.                 {
  166.                     fputc('\n', fp);
  167.                 }
  168.                 else
  169.                 {
  170.                     fputc('\t', fp);
  171.                 }
  172.             }
  173.             else
  174.             {
  175.  
  176.             /*
  177.              * only interesting thing heap_getattr tells us in this case
  178.              * is if we have a null attribute or not.
  179.              */
  180.  
  181.                 if (isnull) nulls[i] = 'n';
  182.             }
  183.         }
  184.  
  185.         if (binary)
  186.         {
  187.             int32 null_ct = 0, length;
  188.  
  189.             for (i = 0; i < attr_count; i++) 
  190.             {
  191.                 if (nulls[i] == 'n') null_ct++;
  192.             }
  193.  
  194.             length = tuple->t_len - tuple->t_hoff;
  195.             fwrite(&length, sizeof(int32), 1, fp);
  196.             fwrite(&null_ct, sizeof(int32), 1, fp);
  197.             if (null_ct > 0)
  198.             {
  199.                 for (i = 0; i < attr_count; i++)
  200.                 {
  201.                     if (nulls[i] == 'n')
  202.                     {
  203.                         fwrite(&i, sizeof(int32), 1, fp);
  204.                         nulls[i] = ' ';
  205.                     }
  206.                 }
  207.             }
  208.             fwrite((char *) tuple + tuple->t_hoff, length, 1, fp);
  209.         }
  210.     }
  211.  
  212.     heap_endscan(scandesc);
  213.     if (binary)
  214.     {
  215.         pfree(nulls);
  216.     }
  217.     else
  218.     {
  219.         pfree(out_functions);
  220.         pfree(elements);
  221.     }
  222.  
  223.     heap_close(rel);
  224. }
  225.  
  226. CopyFrom(rel, binary, fp)
  227.  
  228. Relation rel;
  229. bool binary;
  230. FILE *fp;
  231.  
  232. {
  233.     HeapTuple tuple, heap_formtuple();
  234.     IndexTuple ituple, index_formtuple();
  235.     Relation heap_openr();
  236.     AttributeNumber attr_count;
  237.     Attribute *attr;
  238.     func_ptr *in_functions;
  239.     int i, j, k, dummy;
  240.     oid in_func_oid;
  241.     Datum *values, *index_values;
  242.     char *nulls, *index_nulls;
  243.     bool *byval;
  244.     Boolean isnull;
  245.     bool has_index;
  246.     int done = 0;
  247.     char *string, *ptr, *CopyReadAttribute();
  248.     Relation *index_rels;
  249.     int32 len, null_ct, null_id;
  250.     int32 ntuples, tuples_read = 0;
  251.     bool reading_to_eof = true;
  252.     ObjectId *elements;
  253.  
  254.     Relation *index_relations;
  255.     int28 *index_atts;
  256.     int n_indices;
  257.  
  258.     attr = (Attribute *) &rel->rd_att;
  259.  
  260.     attr_count = rel->rd_rel->relnatts;
  261.  
  262.     if (rel->rd_rel->relhasindex)
  263.     {
  264.         GetIndexRelations(rel->rd_id, &n_indices, &index_rels, &index_atts);
  265.         has_index = true;
  266.     }
  267.     else
  268.     {
  269.         has_index = false;
  270.     }
  271.  
  272.     if (!binary)
  273.     {
  274.         in_functions = (func_ptr *) palloc(attr_count * sizeof(func_ptr));
  275.         elements = (ObjectId *) palloc(attr_count * sizeof(ObjectId));
  276.         for (i = 0; i < attr_count; i++)
  277.         {
  278.             in_func_oid = (ObjectId) GetInputFunction(attr[i]->atttypid);
  279.             fmgr_info(in_func_oid, &in_functions[i], &dummy);
  280.             elements[i] = GetTypeElement(attr[i]->atttypid);
  281.         }
  282.     }
  283.     else
  284.     {
  285.          fread(&ntuples, sizeof(int32), 1, fp);
  286.          if (ntuples != 0) reading_to_eof = false;
  287.     }
  288.  
  289.     values       = (Datum *) palloc(sizeof(Datum) * attr_count);
  290.     index_values = (Datum *) palloc(sizeof(Datum) * attr_count);
  291.     nulls        = (char *) palloc(attr_count);
  292.     index_nulls  = (char *) palloc(attr_count);
  293.     byval        = (bool *) palloc(attr_count * sizeof(bool));
  294.  
  295.     for (i = 0; i < attr_count; i++) 
  296.     {
  297.         nulls[i] = ' ';
  298.         index_nulls[i] = ' ';
  299.         byval[i] = (bool) IsTypeByVal(attr[i]->atttypid);
  300.     }
  301.  
  302.     while (!done)
  303.     {
  304.  
  305.         if (!binary)
  306.         {
  307.             for (i = 0; i < attr_count && !done; i++)
  308.             {
  309.                 string = CopyReadAttribute(i, fp, &isnull);
  310.                 if (isnull)
  311.                 {
  312.                     values[i] = NULL;
  313.                     nulls[i] = 'n';
  314.                 }
  315.                 else if (string == NULL)
  316.                 {
  317.                     done = 1;
  318.                 }
  319.                 else
  320.                 {
  321.                     values[i] = (Datum) (in_functions[i]) (string, elements[i]);
  322.             /*
  323.              * Sanity check - by reference attributes cannot return
  324.              * NULL
  325.              */
  326.             if (!PointerIsValid(values[i]) &&
  327.             !(rel->rd_att.data[i]->attbyval))
  328.             {
  329.             elog(WARN, "copy from: Bad file format");
  330.             }
  331.                 }
  332.             }
  333.         }
  334.         else /* binary */
  335.         {
  336.             fread(&len, sizeof(int32), 1, fp);
  337.             if (feof(fp)) 
  338.             {
  339.                 done = 1;
  340.             }
  341.             else
  342.             {
  343.                 fread(&null_ct, sizeof(int32), 1, fp);
  344.                 if (null_ct > 0)
  345.                 {
  346.                     for (i = 0; i < null_ct; i++)
  347.                     {
  348.                         fread(&null_id, sizeof(int32), 1, fp);
  349.                         nulls[null_id] = 'n';
  350.                     }
  351.                 }
  352.  
  353.                 string = (char *) palloc(len);
  354.                 fread(string, len, 1, fp);
  355.  
  356.                 ptr = string;
  357.  
  358.                 for (i = 0; i < attr_count; i++)
  359.                 {
  360.                     if (byval[i] && nulls[i] != 'n')
  361.                     {
  362.                         switch(attr[i]->attlen)
  363.                         {
  364.                             case sizeof(char):
  365.                                 values[i] = (Datum) *(unsigned char *) ptr;
  366.                                 ptr += sizeof(char);
  367.                                 break;
  368.                             case sizeof(short):
  369.                 ptr = (char *) SHORTALIGN(ptr);
  370.                                 values[i] = (Datum) *(unsigned short *) ptr; 
  371.                                 ptr += sizeof(short);
  372.                                 break;
  373.                             case 3:
  374.                             case sizeof(long):
  375.                 ptr = (char *) LONGALIGN(ptr);
  376.                                 values[i] = (Datum) *(unsigned long *) ptr; 
  377.                                 ptr += sizeof(long);
  378.                                 break;
  379.                             default:
  380.                                 elog(WARN, "COPY BINARY: impossible size!");
  381.                                 break;
  382.                         }
  383.                     }
  384.                     else if (nulls[i] != 'n')
  385.                     {
  386.                         if (attr[i]->attlen < 0)
  387.                         {
  388.                 ptr = (char *)LONGALIGN(ptr);
  389.                             values[i] = (Datum) ptr;
  390.                             ptr += * (unsigned long *) ptr;
  391.                         }
  392.                         else
  393.                         {
  394.                 ptr = (char *)LONGALIGN(ptr);
  395.                             values[i] = (Datum) ptr;
  396.                             ptr += attr[i]->attlen;
  397.                         }
  398.                     }
  399.                 }
  400.             }
  401.         }
  402.         if (done) continue;
  403.  
  404.         tuple = heap_formtuple(attr_count, attr, values, nulls);
  405.         heap_insert(rel, tuple, NULL);
  406.  
  407.         if (has_index)
  408.         {
  409.             for (i = 0; i < n_indices; i++)
  410.             {
  411.                 j = 0;
  412.                 while (index_atts[i].data[j] != 0)
  413.                 {
  414.                     if (nulls[index_atts[i].data[j] - 1] == 'n')
  415.                     {
  416.                         index_nulls[j] = 'n';
  417.                     }
  418.                     else 
  419.                     {
  420.                         index_values[j] = values[index_atts[i].data[j] - 1];
  421.                     }
  422.                     j++;
  423.                 }
  424.                 ituple = index_formtuple(j, &(index_rels[i]->rd_att.data[0]), 
  425.                                          index_values, index_nulls);
  426.                 ituple->t_tid = tuple->t_ctid;
  427.                 (void) index_insert(index_rels[i], ituple, NULL, NULL);
  428.                 pfree(ituple);
  429.                 for (k = 0; k < j; k++) index_nulls[k] = ' ';
  430.             }
  431.         }
  432.  
  433.         if (binary) pfree(string);
  434.  
  435.         for (i = 0; i < attr_count; i++) 
  436.         {
  437.             if (!byval[i] && nulls[i] != 'n')
  438.             {
  439.                 if (!binary) pfree(values[i]);
  440.             }
  441.             else if (nulls[i] == 'n')
  442.             {
  443.                 nulls[i] = ' ';
  444.             }
  445.         }
  446.  
  447.         /* pfree the rulelock thing that is allocated */
  448.  
  449.         pfree(tuple->t_lock.l_lock);
  450.         pfree(tuple);
  451.         tuples_read++;
  452.  
  453.         if (!reading_to_eof && ntuples == tuples_read) done = true;
  454.     }
  455.     pfree(values);
  456.     if (!binary) pfree(in_functions);
  457.     pfree(nulls);
  458.     pfree(byval);
  459.     heap_close(rel);
  460. }
  461.  
  462. GetOutputFunction(type)
  463.     ObjectId    type;
  464. {
  465.     HeapTuple    typeTuple;
  466.  
  467.     typeTuple = SearchSysCacheTuple(TYPOID,
  468.                     (char *) type,
  469.                     (char *) NULL,
  470.                     (char *) NULL,
  471.                     (char *) NULL);
  472.  
  473.     if (HeapTupleIsValid(typeTuple))
  474.     return((int) ((TypeTupleForm) GETSTRUCT(typeTuple))->typoutput);
  475.  
  476.     elog(WARN, "GetOutputFunction: Cache lookup of type %d failed", type);
  477.     return(InvalidObjectId);
  478. }
  479.  
  480. GetTypeElement(type)
  481.     ObjectId    type;
  482. {
  483.     HeapTuple    typeTuple;
  484.  
  485.     typeTuple = SearchSysCacheTuple(TYPOID,
  486.                     (char *) type,
  487.                     (char *) NULL,
  488.                     (char *) NULL,
  489.                     (char *) NULL);
  490.  
  491.     if (HeapTupleIsValid(typeTuple))
  492.     return((int) ((TypeTupleForm) GETSTRUCT(typeTuple))->typelem);
  493.  
  494.     elog(WARN, "GetOutputFunction: Cache lookup of type %d failed", type);
  495.     return(InvalidObjectId);
  496. }
  497.  
  498. GetInputFunction(type)
  499.     ObjectId    type;
  500. {
  501.     HeapTuple    typeTuple;
  502.  
  503.     typeTuple = SearchSysCacheTuple(TYPOID,
  504.                     (char *) type,
  505.                     (char *) NULL,
  506.                     (char *) NULL,
  507.                     (char *) NULL);
  508.  
  509.     if (HeapTupleIsValid(typeTuple))
  510.     return((int) ((TypeTupleForm) GETSTRUCT(typeTuple))->typinput);
  511.  
  512.     elog(WARN, "GetInputFunction: Cache lookup of type %d failed", type);
  513.     return(InvalidObjectId);
  514. }
  515.  
  516. IsTypeByVal(type)
  517.     ObjectId    type;
  518. {
  519.     HeapTuple    typeTuple;
  520.  
  521.     typeTuple = SearchSysCacheTuple(TYPOID,
  522.                     (char *) type,
  523.                     (char *) NULL,
  524.                     (char *) NULL,
  525.                     (char *) NULL);
  526.  
  527.     if (HeapTupleIsValid(typeTuple))
  528.         return((int) ((TypeTupleForm) GETSTRUCT(typeTuple))->typbyval);
  529.  
  530.     elog(WARN, "GetInputFunction: Cache lookup of type %d failed", type);
  531.  
  532.     return(InvalidObjectId);
  533. }
  534.  
  535. /* 
  536.  * Given the OID of a relation, return an array of index relation descriptors
  537.  * and the number of index relations.  These relation descriptors are open
  538.  * using heap_open().
  539.  *
  540.  * Space for the array itself is palloc'ed.
  541.  */
  542.  
  543. #define N_INDEXRELS = 5
  544.  
  545. typedef struct rel_list
  546. {
  547.     ObjectId index_rel_oid;
  548.     int28 intlist;
  549.     struct rel_list *next;
  550. }
  551. RelationList;
  552.  
  553. GetIndexRelations(main_relation_oid, n_indices, index_rels, index_atts)
  554.  
  555. ObjectId main_relation_oid;
  556. int *n_indices;
  557. Relation **index_rels;
  558. int28 **index_atts;
  559.  
  560. {
  561.     RelationList *head, *scan;
  562.     Relation pg_index_rel, heap_openr(), index_open();
  563.     HeapScanDesc scandesc, heap_beginscan();
  564.     ObjectId index_relation_oid;
  565.     HeapTuple tuple;
  566.     Attribute *attr;
  567.     int i;
  568.     int28 *datum;
  569.     Boolean isnull;
  570.  
  571.     pg_index_rel = heap_openr("pg_index");
  572.     scandesc = heap_beginscan(pg_index_rel, 0, NULL, NULL, NULL);
  573.     attr = (Attribute *) &pg_index_rel->rd_att;
  574.  
  575.     *n_indices = 0;
  576.  
  577.     head = (RelationList *) palloc(sizeof(RelationList));
  578.     scan = head;
  579.     head->next = NULL;
  580.  
  581.     for (tuple = heap_getnext(scandesc, NULL, NULL);
  582.          tuple != NULL; 
  583.          tuple = heap_getnext(scandesc, NULL, NULL))
  584.     {
  585.         index_relation_oid = (ObjectId)
  586.                          heap_getattr(tuple, InvalidBuffer, 2, attr, &isnull);
  587.         if (index_relation_oid == main_relation_oid)
  588.         {
  589.             scan->index_rel_oid = (ObjectId) 
  590.                 heap_getattr(tuple,
  591.                  InvalidBuffer,
  592.                  Anum_pg_index_indexrelid,
  593.                  attr,
  594.                  &isnull);
  595.             datum = (int28 *)
  596.                 heap_getattr(tuple,
  597.                  InvalidBuffer,
  598.                  Anum_pg_index_indkey,
  599.                  attr,
  600.                  &isnull);
  601.             bcopy(datum, &scan->intlist, sizeof(int28));
  602.             (*n_indices)++;
  603.             scan->next = (RelationList *) palloc(sizeof(RelationList));
  604.             scan = scan->next;
  605.         }
  606.     }
  607.  
  608.     heap_endscan(scandesc);
  609.     heap_close(pg_index_rel);
  610.  
  611.     *index_rels = (Relation *) palloc(*n_indices * sizeof(Relation));
  612.     *index_atts = (int28 *) palloc(*n_indices * sizeof(int28));
  613.  
  614.     for (i = 0, scan = head; i < *n_indices; i++, scan = scan->next)
  615.     {
  616.         (*index_rels)[i] = index_open(scan->index_rel_oid);
  617.         bcopy(&scan->intlist, &((*index_atts)[i]), sizeof(int28));
  618.     }
  619.  
  620.     for (i = 0, scan = head; i < *n_indices + 1; i++)
  621.     {
  622.         scan = head->next;
  623.         pfree(head);
  624.         head = scan;
  625.     }
  626. }
  627.  
  628.  
  629. #define EXT_ATTLEN 5*8192
  630.  
  631. /*
  632.  * Reads input from fp until eof is seen.  If we are reading from standard
  633.  * input, AND we see a dot on a line by itself (a dot followed immediately
  634.  * by a newline), we exit as if we saw eof.  This is so that copy pipelines
  635.  * can be used as standard input.
  636.  */
  637.  
  638. char *
  639. CopyReadAttribute(attno, fp, isnull)
  640.  
  641. int attno;
  642. FILE *fp;
  643. Boolean *isnull;
  644.  
  645. {
  646.     static char attribute[EXT_ATTLEN];
  647.     char c;
  648.     int done = 0;
  649.     int i = 0;
  650.     int length = 0;
  651.     int read_newline = 0;
  652.  
  653.     if (feof(fp))
  654.     {
  655.         *isnull = (Boolean) false;
  656.         return(NULL);
  657.     }
  658.  
  659.     while (!done)
  660.     {
  661.         c = getc(fp);
  662.  
  663.         if (feof(fp))
  664.         {
  665.             *isnull = (Boolean) false;
  666.             return(NULL);
  667.         }
  668.         else if (reading_from_input && attno == 0 && i == 0 && c == '.') 
  669.         {
  670.             attribute[0] = c;
  671.             c = getc(fp);
  672.             if (c == '\n')
  673.             {
  674.                 *isnull = (Boolean) false;
  675.                 return(NULL);
  676.             }
  677.             else if (c == '\t')
  678.             {
  679.                 attribute[1] = 0;
  680.                 *isnull = (Boolean) false;
  681.                 return(&attribute[0]);
  682.             }
  683.             else
  684.             {
  685.                 attribute[1] = c;
  686.                 i = 2;
  687.             }
  688.         }
  689.         else if (c == '\\') 
  690.         {
  691.             c = getc(fp);
  692.         }
  693.         else if (c == '\t' || c == '\n')
  694.         {
  695.             done = 1;
  696.         }
  697.         if (!done) attribute[i++] = c;
  698.         if (i == EXT_ATTLEN - 1)
  699.             elog(WARN, "CopyReadAttribute - attribute length too long");
  700.     }
  701.     attribute[i] = '\0';
  702.     if (i == 0) 
  703.     {
  704.         *isnull = (Boolean) true;
  705.         return(NULL);
  706.     }
  707.     else
  708.     {
  709.         *isnull = (Boolean) false;
  710.         return(&attribute[0]);
  711.     }
  712. }
  713.  
  714. CopyAttributeOut(fp, string)
  715.  
  716. FILE *fp;
  717. char *string;
  718.  
  719. {
  720.     int i;
  721.     int len = strlen(string);
  722.  
  723.     for (i = 0; i < len; i++)
  724.     {
  725.         if (string[i] == '\t' || string[i] == '\\')
  726.         {
  727.             fputc('\\', fp);
  728.         }
  729.         fputc(string[i], fp);
  730.     }
  731. }
  732.  
  733. /*
  734.  * Returns the number of tuples in a relation.  Unfortunately, currently
  735.  * must do a scan of the entire relation to determine this.
  736.  *
  737.  * relation is expected to be an open relation descriptor.
  738.  */
  739.  
  740. int
  741. CountTuples(relation)
  742.  
  743. Relation relation;
  744.  
  745. {
  746.     HeapScanDesc scandesc, heap_beginscan();
  747.     HeapTuple tuple, heap_getnext();
  748.  
  749.     int i;
  750.  
  751.     scandesc = heap_beginscan(relation, 0, NULL, NULL, NULL);
  752.  
  753.     for (tuple = heap_getnext(scandesc, NULL, NULL), i = 0;
  754.          tuple != NULL; 
  755.          tuple = heap_getnext(scandesc, NULL, NULL), i++);
  756.     heap_endscan(scandesc);
  757.     return(i);
  758. }
  759.