home *** CD-ROM | disk | FTP | other *** search
/ Complete Linux / Complete Linux.iso / docs / apps / database / postgres / postgre4.z / postgre4 / src / access / nobtree / nobtsearch.c < prev    next >
Encoding:
C/C++ Source or Header  |  1992-08-27  |  36.2 KB  |  1,253 lines

  1. /*
  2.  *  btsearch.c -- search code for postgres btrees.
  3.  */
  4.  
  5. #include "tmp/c.h"
  6.  
  7. #ifdef NOBTREE
  8. #include "tmp/postgres.h"
  9.  
  10. #include "storage/bufmgr.h"
  11. #include "storage/bufpage.h"
  12. #include "storage/page.h"
  13.  
  14. #include "utils/log.h"
  15. #include "utils/rel.h"
  16. #include "utils/excid.h"
  17.  
  18. #include "access/heapam.h"
  19. #include "access/genam.h"
  20. #include "access/ftup.h"
  21. #include "access/skey.h"
  22. #include "access/sdir.h"
  23. #include "access/tupmacs.h"
  24. #include "access/nobtree.h"
  25.  
  26. RcsId("$Header: /private/postgres/src/access/nobtree/RCS/nobtsearch.c,v 1.6 1991/06/26 19:12:14 mao Exp $");
  27.  
  28. /*
  29.  *  _nobt_search() -- Search for a scan key in the index.
  30.  *
  31.  *    This routine is actually just a helper that sets things up and
  32.  *    calls a recursive-descent search routine on the tree.
  33.  */
  34.  
  35. NOBTStack
  36. _nobt_search(rel, keysz, scankey, bufP)
  37.     Relation rel;
  38.     int keysz;
  39.     ScanKey scankey;
  40.     Buffer *bufP;
  41. {
  42.     *bufP = _nobt_getroot(rel, NOBT_READ);
  43.     return (_nobt_searchr(rel, keysz, scankey, bufP, (NOBTStack) NULL));
  44. }
  45.  
  46. /*
  47.  *  _nobt_searchr() -- Search the tree recursively for a particular scankey.
  48.  *
  49.  *    This routine takes a pointer to a sequence number.  As we descend
  50.  *    the tree, any time we see a key exactly equal to the one being
  51.  *    inserted, we save its sequence number plus one.  If we are inserting
  52.  *    a tuple, then we can use this number to distinguish among duplicates
  53.  *    and to guarantee that we never store the same sequence number for the
  54.  *    same key at any level in the tree.  This is an important guarantee,
  55.  *    since the Lehman and Yao algorithm relies on being able to find
  56.  *    its place in parent pages by looking at keys in the parent.
  57.  */
  58.  
  59. NOBTStack
  60. _nobt_searchr(rel, keysz, scankey, bufP, stack_in)
  61.     Relation rel;
  62.     int keysz;
  63.     ScanKey scankey;
  64.     Buffer *bufP;
  65.     NOBTStack stack_in;
  66. {
  67.     NOBTStack stack;
  68.     OffsetIndex offind;
  69.     Page page;
  70.     NOBTPageOpaque opaque;
  71.     PageNumber newpg;
  72.     BlockNumber par_blkno;
  73.     BlockNumber blkno;
  74.     ItemId itemid;
  75.     NOBTIItem btitem;
  76.     NOBTIItem item_save;
  77.     int item_nbytes;
  78.  
  79.     page = BufferGetPage(*bufP, 0);
  80.     opaque = (NOBTPageOpaque) PageGetSpecialPointer(page);
  81.  
  82.     /* if this is a leaf page, we're done */
  83.     if (opaque->nobtpo_flags & NOBTP_LEAF)
  84.     return (stack_in);
  85.  
  86.     /*
  87.      *  Find the appropriate item on the internal page, and get the child
  88.      *  page that it points to.
  89.      */
  90.  
  91.     par_blkno = BufferGetBlockNumber(*bufP);
  92.     offind = _nobt_binsrch(rel, *bufP, keysz, scankey, NOBT_DESCENT);
  93.     itemid = PageGetItemId(page, offind);
  94.     btitem = (NOBTIItem) PageGetItem(page, itemid);
  95.     blkno = btitem->nobtii_child;
  96.  
  97.     /* save current, next key on stack */
  98.     item_nbytes = ItemIdGetLength(itemid);
  99.     item_save = (NOBTIItem) palloc(item_nbytes);
  100.     bcopy((char *) btitem, (char *) item_save, item_nbytes);
  101.     stack = (NOBTStack) palloc(sizeof(NOBTStackData));
  102.     stack->nobts_offset = offind;
  103.     stack->nobts_blkno = par_blkno;
  104.     stack->nobts_btitem = item_save;
  105.  
  106. #ifndef    NORMAL
  107.     /* if there's no "next" key on this page, use the high key */
  108.     if (offind++ >= PageGetMaxOffsetIndex(page)) {
  109.     if (opaque->nobtpo_next == P_NONE) {
  110.         stack->nobts_nxtitem = (NOBTIItem) NULL;
  111.     } else {
  112.         itemid = PageGetItemId(page, 0);
  113.         item_nbytes = ItemIdGetLength(itemid);
  114.         item_save = (NOBTIItem) palloc(item_nbytes);
  115.         btitem = (NOBTIItem) PageGetItem(page, itemid);
  116.         bcopy((char *) btitem, (char *) item_save, item_nbytes);
  117.         stack->nobts_nxtitem = item_save;
  118.     }
  119.     } else {
  120.     itemid = PageGetItemId(page, offind);
  121.     item_nbytes = ItemIdGetLength(itemid);
  122.     item_save = (NOBTIItem) palloc(item_nbytes);
  123.     btitem = (NOBTIItem) PageGetItem(page, itemid);
  124.     bcopy((char *) btitem, (char *) item_save, item_nbytes);
  125.     stack->nobts_nxtitem = item_save;
  126.     }
  127. #endif    /* ndef NORMAL */
  128.  
  129.     stack->nobts_parent = stack_in;
  130.  
  131.     /* drop the read lock on the parent page and acquire one on the child */
  132.     _nobt_relbuf(rel, *bufP, NOBT_READ);
  133.     *bufP = _nobt_getbuf(rel, blkno, NOBT_READ);
  134.  
  135.     /*
  136.      *  Race -- the page we just grabbed may have split since we read its
  137.      *  pointer in the parent.  If it has, we may need to move right to its
  138.      *  new sibling.  Do that.
  139.      */
  140.  
  141.     *bufP = _nobt_moveright(rel, *bufP, keysz, scankey, NOBT_READ, stack);
  142.  
  143.     /* okay, all set to move down a level */
  144.     return (_nobt_searchr(rel, keysz, scankey, bufP, stack));
  145. }
  146.  
  147. /*
  148.  *  _nobt_moveright() -- move right in the btree if necessary.
  149.  *
  150.  *    When we drop and reacquire a pointer to a page, it is possible that
  151.  *    the page has changed in the meanwhile.  If this happens, we're
  152.  *    guaranteed that the page has "split right" -- that is, that any
  153.  *    data that appeared on the page originally is either on the page
  154.  *    or strictly to the right of it.
  155.  *
  156.  *    This routine decides whether or not we need to move right in the
  157.  *    tree by examining the high key entry on the page.  If that entry
  158.  *    is strictly less than one we expect to be on the page, then our
  159.  *    picture of the page is incorrect and we need to move right.
  160.  *
  161.  *    On entry, we have the buffer pinned and a lock of the proper type.
  162.  *    If we move right, we release the buffer and lock and acquire the
  163.  *    same on the right sibling.
  164.  */
  165.  
  166. Buffer
  167. _nobt_moveright(rel, buf, keysz, scankey, access, stack)
  168.     Relation rel;
  169.     Buffer buf;
  170.     int keysz;
  171.     ScanKey scankey;
  172.     int access;
  173.     NOBTStack stack;
  174. {
  175.     Page page;
  176.     PageNumber right;
  177.     PageNumber newpg;
  178.     NOBTPageOpaque opaque;
  179.     ItemId hikey;
  180.     ItemId itemid;
  181.     BlockNumber rblkno;
  182.     IndexTuple stktupa, chktupa;
  183.     IndexTuple stktupb, chktupb;
  184.     NOBTIItem chkiitem;
  185.     NOBTLItem chklitem;
  186.     char *stkstra, *chkstra;
  187.     char *stkstrb, *chkstrb;
  188.     int cmpsiza, cmpsizb;
  189.     bool inconsistent;
  190.     bool isleaf;
  191.  
  192.     page = BufferGetPage(buf, 0);
  193.     opaque = (NOBTPageOpaque) PageGetSpecialPointer(page);
  194.     inconsistent = false;
  195.  
  196.     if (opaque->nobtpo_flags & NOBTP_LEAF)
  197.     isleaf = true;
  198.     else
  199.     isleaf = false;
  200.  
  201.     /*
  202.      *  For the no-overwrite implementation, here are the things that can
  203.      *  cause us to have to move around in the tree:
  204.      *
  205.      *    +  The page we have just come to has just split, but the parent
  206.      *         now contains the correct entries for the new children, so the
  207.      *       tree is consistent.  In this case, we adjust the parent pointer
  208.      *       stack, move right as far as necessary, and continue.
  209.      *
  210.      *    +  The tree is genuinely inconsistent -- there was a failure during
  211.      *       the write of the modified tree to disk, and some set of pages
  212.      *       failed to make it out.  In this case, we must immediately repair
  213.      *       the damage.  We're on one of the modified pages.
  214.      *
  215.      *  Short-term Lehman and Yao locking guarantees that if no failure has
  216.      *  occurred, then case one must have happened.  The no-overwrite algorithm
  217.      *    by Sullivan guarantees that we can recover from two in the face of
  218.      *  any failure.
  219.      */
  220.  
  221. #ifdef    SHADOW
  222.     /* first, if this page has been replaced, then move to the new page */
  223.     while ((newpg = opaque->nobtpo_replaced) != P_NONE) {
  224.     _nobt_relbuf(rel, buf, access);
  225.     buf = _nobt_getbuf(rel, newpg, access);
  226.     page = BufferGetPage(buf, 0);
  227.     opaque = (NOBTPageOpaque) PageGetSpecialPointer(page);
  228.     inconsistent = true;
  229.     }
  230. #endif    /* SHADOW */
  231.  
  232.     /*
  233.      *  If the key range on the parent doesn't match the key range on the
  234.      *  child at which we've arrived (after possibly following "replaced"
  235.      *  links above), then the tree may be inconsistent.  The hairy code
  236.      *  structure (next page, prev page == P_NONE -- four cases) is due to
  237.      *  the fact that the high and low key in the index are handled specially
  238.      *  to avoid expensive updates.
  239.      */
  240.  
  241. #ifndef    NORMAL
  242.     if (opaque->nobtpo_next == P_NONE) {
  243.     if (opaque->nobtpo_prev == P_NONE) {
  244.         if (stack != (NOBTStack) NULL)
  245.         inconsistent = true;
  246.     } else if (stack == NULL) {
  247.         inconsistent = true;
  248.     } else {
  249.         stkstra = ((char *) stack->nobts_btitem) + sizeof(NOBTIItemData);
  250.         if (isleaf) {
  251.         chklitem = (NOBTLItem) PageGetItem(page, PageGetItemId(page, 0));
  252.         chktupa = &(chklitem->nobtli_itup);
  253.         chkstra = ((char *) chktupa) + sizeof(IndexTupleData);
  254.         cmpsiza = IndexTupleSize(chktupa) - sizeof(IndexTupleData);
  255.         } else {
  256.         chkiitem = (NOBTIItem) PageGetItem(page, PageGetItemId(page, 0));
  257.         chkstra = ((char *) chkiitem) + sizeof(NOBTIItemData);
  258.         cmpsiza = NOBTIItemGetSize(chkiitem) - sizeof(NOBTIItemData);
  259.         }
  260.         if (bcmp(stkstra, chkstra, cmpsiza) != 0)
  261.         inconsistent = true;
  262.     }
  263.     } else {
  264.     if (stack == NULL || stack->nobts_nxtitem == (NOBTIItem) NULL) {
  265.         inconsistent = true;
  266.     } else if (opaque->nobtpo_prev == P_NONE) {
  267.         if (stack->nobts_offset != 0)
  268.         inconsistent = true;
  269.     } else {
  270.         /* stack tuple a, check tuple a are the low key on the page */
  271.         stkstra = ((char *) stack->nobts_btitem) + sizeof(NOBTIItemData);
  272.         if (isleaf) {
  273.         chklitem = (NOBTLItem) PageGetItem(page, PageGetItemId(page, 1));
  274.         chktupa = &(chklitem->nobtli_itup);
  275.         chkstra = ((char *) chktupa) + sizeof(IndexTupleData);
  276.         cmpsiza = IndexTupleSize(chktupa) - sizeof(IndexTupleData);
  277.         } else {
  278.         chkiitem = (NOBTIItem) PageGetItem(page, PageGetItemId(page, 1));
  279.         chkstra = ((char *) chkiitem) + sizeof(NOBTIItemData);
  280.         cmpsiza = NOBTIItemGetSize(chkiitem) - sizeof(NOBTIItemData);
  281.         }
  282.  
  283.         /* stack tuple b, check tuple b are the high key */
  284.         stkstrb = ((char *) stack->nobts_nxtitem) + sizeof(NOBTIItemData);
  285.         if (isleaf) {
  286.         chklitem = (NOBTLItem) PageGetItem(page, PageGetItemId(page, 0));
  287.         chktupb = &(chklitem->nobtli_itup);
  288.         chkstrb = ((char *) chktupb) + sizeof(IndexTupleData);
  289.         cmpsizb = IndexTupleSize(chktupb) - sizeof(IndexTupleData);
  290.         } else {
  291.         chkiitem = (NOBTIItem) PageGetItem(page, PageGetItemId(page, 0));
  292.         chkstrb = ((char *) chkiitem) + sizeof(NOBTIItemData);
  293.         cmpsizb = NOBTIItemGetSize(chkiitem) - sizeof(NOBTIItemData);
  294.         }
  295.  
  296.         if (bcmp(stkstra, chkstra, cmpsiza) != 0
  297.         || bcmp(stkstrb, chkstrb, cmpsizb) != 0)
  298.         inconsistent = true;
  299.     }
  300.     }
  301. #endif    /* ndef NORMAL */
  302.  
  303.     /* XXX XXX XXX peer pointer check? */
  304.  
  305.     /* if the tree may be inconsistent, it has to be fixed... */
  306.     if (inconsistent) {
  307.     elog(NOTICE, "inconsistency detected, should do something here...");
  308.     }
  309.  
  310.     return (buf);
  311. }
  312.  
  313. /*
  314.  *  _nobt_skeycmp() -- compare a scan key to a particular item on a page using
  315.  *             a requested strategy (<, <=, =, >=, >).
  316.  *
  317.  *    We ignore the sequence numbers stored in the btree item here.  Those
  318.  *    numbers are intended for use internally only, in repositioning a
  319.  *    scan after a page split.  They do not impose any meaningful ordering.
  320.  *
  321.  *    The comparison is A <op> B, where A is the scan key and B is the
  322.  *    tuple pointed at by itemid on page.
  323.  */
  324.  
  325. bool
  326. _nobt_skeycmp(rel, keysz, scankey, page, itemid, strat)
  327.     Relation rel;
  328.     Size keysz;
  329.     ScanKey scankey;
  330.     Page page;
  331.     ItemId itemid;
  332.     StrategyNumber strat;
  333. {
  334.     NOBTIItem iitem;
  335.     NOBTLItem litem;
  336.     IndexTuple indexTuple;
  337.     TupleDescriptor tupDes;
  338.     ScanKeyEntry entry;
  339.     int i;
  340.     Datum attrDatum;
  341.     Datum keyDatum;
  342.     NOBTPageOpaque opaque;
  343.     bool compare;
  344.     bool isNull;
  345.     char *tempd;
  346.  
  347.     opaque = (NOBTPageOpaque) PageGetSpecialPointer(page);
  348.  
  349.     tupDes = RelationGetTupleDescriptor(rel);
  350.     if (opaque->nobtpo_flags & NOBTP_LEAF) {
  351.     litem = (NOBTLItem) PageGetItem(page, itemid);
  352.     indexTuple = &(litem->nobtli_itup);
  353.  
  354.     /* see if the comparison is true for all of the key attributes */
  355.     for (i=1; i <= keysz; i++) {
  356.  
  357.         entry = &scankey->data[i-1];
  358.         attrDatum = IndexTupleGetAttributeValue(indexTuple,
  359.                             entry->attributeNumber,
  360.                             tupDes, &isNull);
  361.         keyDatum  = entry->argument;
  362.  
  363.         compare = _nobt_invokestrat(rel, i, strat, keyDatum, attrDatum);
  364.         if (!compare)
  365.         return (false);
  366.     }
  367.     } else {
  368.     iitem = (NOBTIItem) PageGetItem(page, itemid);
  369.     tempd = ((char *) iitem) + sizeof(NOBTIItemData);
  370.  
  371.     entry = &scankey->data[0];
  372.     keyDatum  = entry->argument;
  373.     attrDatum = (Datum) fetchatt(((struct attribute **) tupDes), tempd);
  374.  
  375.     compare = _nobt_invokestrat(rel, 1, strat, keyDatum, attrDatum);
  376.     if (!compare)
  377.         return (false);
  378.     }
  379.     return (true);
  380. }
  381.  
  382. /*
  383.  *  _nobt_binsrch() -- Do a binary search for a key on a particular page.
  384.  *
  385.  *    The scankey we get has the compare function stored in the procedure
  386.  *    entry of each data struct.  We invoke this regproc to do the
  387.  *    comparison for every key in the scankey.  _nobt_binsrch() returns
  388.  *    the OffsetIndex of the first matching key on the page, or the
  389.  *    OffsetIndex at which the matching key would appear if it were
  390.  *    on this page.
  391.  *
  392.  *    By the time this procedure is called, we're sure we're looking
  393.  *    at the right page -- don't need to walk right.  _nobt_binsrch() has
  394.  *    no lock or refcount side effects on the buffer.
  395.  */
  396.  
  397. OffsetIndex
  398. _nobt_binsrch(rel, buf, keysz, scankey, srchtype)
  399.     Relation rel;
  400.     Buffer buf;
  401.     int keysz;
  402.     ScanKey scankey;
  403.     int srchtype;
  404. {
  405.     TupleDescriptor itupdesc;
  406.     Page page;
  407.     NOBTPageOpaque opaque;
  408.     OffsetIndex low, mid, high;
  409.     bool match;
  410.     int result;
  411.  
  412.     page = BufferGetPage(buf, 0);
  413.     opaque = (NOBTPageOpaque) PageGetSpecialPointer(page);
  414.  
  415.     /* by convention, item 0 on any non-leftmost page is the high key */
  416.     if (opaque->nobtpo_next != P_NONE)
  417.     low = 1;
  418.     else
  419.     low = 0;
  420.  
  421.     high = PageGetMaxOffsetIndex(page);
  422.  
  423.     /*
  424.      *  Since for non-leftmost pages, the zeroeth item on the page is the
  425.      *  high key, there are two notions of emptiness.  One is if nothing
  426.      *  appears on the page.  The other is if nothing but the high key does.
  427.      */
  428.  
  429.     if (PageIsEmpty(page) || (opaque->nobtpo_next != P_NONE && high == low))
  430.     return (low);
  431.  
  432.     itupdesc = RelationGetTupleDescriptor(rel);
  433.     match = false;
  434.  
  435.     while ((high - low) > 1) {
  436.     mid = low + ((high - low) / 2);
  437.     result = _nobt_compare(rel, itupdesc, page, keysz, scankey, mid);
  438.  
  439.     if (result > 0)
  440.         low = mid;
  441.     else if (result < 0)
  442.         high = mid - 1;
  443.     else {
  444.         match = true;
  445.         break;
  446.     }
  447.     }
  448.  
  449.     /* if we found a match, we want to find the first one on the page */
  450.     if (match) {
  451.     return (_nobt_firsteq(rel, itupdesc, page, keysz, scankey, mid));
  452.     } else {
  453.  
  454.     /*
  455.      *  We terminated because the endpoints got too close together.  There
  456.      *  are two cases to take care of.
  457.      *
  458.      *  For non-insertion searches on internal pages, we want to point at
  459.      *  the last key <, or first key =, the scankey on the page.  This
  460.      *  guarantees that we'll descend the tree correctly.
  461.      *
  462.      *  For all other cases, we want to point at the first key >=
  463.      *  the scankey on the page.  This guarantees that scans and
  464.      *  insertions will happen correctly.
  465.      */
  466.  
  467.     opaque = (NOBTPageOpaque) PageGetSpecialPointer(page);
  468.     if (!(opaque->nobtpo_flags & NOBTP_LEAF) && srchtype == NOBT_DESCENT) {
  469.  
  470.         /*
  471.          *  We want the last key <, or first key ==, the scan key.
  472.          */
  473.  
  474.         result = _nobt_compare(rel, itupdesc, page, keysz, scankey, high);
  475.  
  476.         if (result == 0) {
  477.         return (_nobt_firsteq(rel, itupdesc, page, keysz, scankey, high));
  478.         } else if (result > 0) {
  479.         return (high);
  480.         } else {
  481.         return (low);
  482.         }
  483.         /* NOTREACHED */
  484.     } else {
  485.  
  486.         /* we want the first key >= the scan key */
  487.         result = _nobt_compare(rel, itupdesc, page, keysz, scankey, low);
  488.         if (result <= 0) {
  489.         return (low);
  490.         } else {
  491.         if (low == high)
  492.             return (low + 1);
  493.  
  494.         result = _nobt_compare(rel, itupdesc, page, keysz, scankey, high);
  495.         if (result <= 0)
  496.             return (high);
  497.         else
  498.             return (high + 1);
  499.         }
  500.     }
  501.     /* NOTREACHED */
  502.     }
  503. }
  504.  
  505. OffsetIndex
  506. _nobt_firsteq(rel, itupdesc, page, keysz, scankey, offind)
  507.     Relation rel;
  508.     TupleDescriptor itupdesc;
  509.     Page page;
  510.     Size keysz;
  511.     ScanKey scankey;
  512.     OffsetIndex offind;
  513. {
  514.     NOBTPageOpaque opaque;
  515.     OffsetIndex limit;
  516.  
  517.     opaque = (NOBTPageOpaque) PageGetSpecialPointer(page);
  518.  
  519.     /* skip the high key, if any */
  520.     if (opaque->nobtpo_next != P_NONE)
  521.     limit = 1;
  522.     else
  523.     limit = 0;
  524.  
  525.     /* walk backwards looking for the first key in the chain of duplicates */
  526.     while (offind > limit
  527.        && _nobt_compare(rel, itupdesc, page,
  528.               keysz, scankey, offind - 1) == 0) {
  529.     offind--;
  530.     }
  531.  
  532.     return (offind);
  533. }
  534.  
  535. /*
  536.  *  _nobt_compare() -- Compare scankey to a particular tuple on the page.
  537.  *
  538.  *    This routine returns:
  539.  *        -1 if scankey < tuple at offind;
  540.  *         0 if scankey == tuple at offind;
  541.  *        +1 if scankey > tuple at offind.
  542.  *
  543.  *    In order to avoid having to propogate changes up the tree any time
  544.  *    a new minimal key is inserted, the leftmost entry on the leftmost
  545.  *    page is less than all possible keys, by definition.
  546.  */
  547.  
  548. int
  549. _nobt_compare(rel, itupdesc, page, keysz, scankey, offind)
  550.     Relation rel;
  551.     TupleDescriptor itupdesc;
  552.     Page page;
  553.     int keysz;
  554.     ScanKey scankey;
  555.     OffsetIndex offind;
  556. {
  557.     Datum datum;
  558.     NOBTLItem btlitem;
  559.     NOBTIItem btiitem;
  560.     ItemId itemid;
  561.     IndexTuple itup;
  562.     NOBTPageOpaque opaque;
  563.     ScanKeyEntry entry;
  564.     AttributeNumber attno;
  565.     int result;
  566.     int i;
  567.     char *tempd;
  568.     Boolean null;
  569.     bool isleaf;
  570.  
  571.     /*
  572.      *  If this is a leftmost internal page, and if our comparison is
  573.      *  with the first key on the page, then the item at that position is
  574.      *  by definition less than the scan key.
  575.      */
  576.  
  577.     opaque = (NOBTPageOpaque) PageGetSpecialPointer(page);
  578.  
  579.     if (opaque->nobtpo_flags & NOBTP_LEAF)
  580.     isleaf = true;
  581.     else
  582.     isleaf = false;
  583.  
  584.     if (!isleaf && opaque->nobtpo_prev == P_NONE && offind == 0) {
  585.         itemid = PageGetItemId(page, offind);
  586.  
  587.         /*
  588.          *  If the item on the page is equal to the scankey, that's
  589.          *  okay to admit.  We just can't claim that the first key on
  590.          *  the page is greater than anything.
  591.          */
  592.  
  593.         if (_nobt_skeycmp(rel, keysz, scankey, page, itemid,
  594.                 NOBTEqualStrategyNumber)) {
  595.         return (0);
  596.         }
  597.         return (1);
  598.     }
  599.  
  600.     /*
  601.      *  The scan key is set up with the attribute number associated with each
  602.      *  term in the key.  It is important that, if the index is multi-key,
  603.      *  the scan contain the first k key attributes, and that they be in
  604.      *  order.  If you think about how multi-key ordering works, you'll
  605.      *  understand why this is.
  606.      *
  607.      *  We don't test for violation of this condition here.
  608.      */
  609.  
  610.     entry = &(scankey->data[0]);
  611.  
  612.     if (isleaf) {
  613.     btlitem = (NOBTLItem) PageGetItem(page, PageGetItemId(page, offind));
  614.     itup = &(btlitem->nobtli_itup);
  615.     attno = entry->attributeNumber;
  616.     datum = (Datum) IndexTupleGetAttributeValue(itup, attno,
  617.                             itupdesc, &null);
  618.     } else {
  619.     btiitem = (NOBTIItem) PageGetItem(page, PageGetItemId(page, offind));
  620.     tempd = ((char *) btiitem) + sizeof(NOBTIItemData);
  621.     datum = (Datum) fetchatt(((struct attribute **) itupdesc), tempd);
  622.     }
  623.  
  624.     result = (int) (*(entry->func))(entry->argument, datum);
  625.     return (result);
  626. }
  627.  
  628. /*
  629.  *  _nobt_next() -- Get the next item in a scan.
  630.  *
  631.  *    On entry, we have a valid currentItemData in the scan, and a
  632.  *    read lock on the page that contains that item.  We do not have
  633.  *    the page pinned.  We return the next item in the scan.  On
  634.  *    exit, we have the page containing the next item locked but not
  635.  *    pinned.
  636.  */
  637.  
  638. RetrieveIndexResult
  639. _nobt_next(scan, dir)
  640.     IndexScanDesc scan;
  641.     ScanDirection dir;
  642. {
  643.     Relation rel;
  644.     NOBTPageOpaque opaque;
  645.     Buffer buf;
  646.     Page page;
  647.     OffsetIndex offind, maxoff;
  648.     RetrieveIndexResult res;
  649.     BlockNumber blkno;
  650.     ItemPointer current;
  651.     ItemPointer iptr;
  652.     NOBTLItem btitem;
  653.     IndexTuple itup;
  654.     NOBTScanOpaque so;
  655.  
  656.     rel = scan->relation;
  657.     so = (NOBTScanOpaque) scan->opaque;
  658.     current = &(scan->currentItemData);
  659.  
  660.     if (!BufferIsValid(so->nobtso_curbuf))
  661.     so->nobtso_curbuf = _nobt_getbuf(rel, ItemPointerGetBlockNumber(current),
  662.                      NOBT_READ);
  663.  
  664.     /* we still have the buffer pinned and locked */
  665.     buf = so->nobtso_curbuf;
  666.     blkno = BufferGetBlockNumber(buf);
  667.  
  668.     /* step one tuple in the appropriate direction */
  669.     if (!_nobt_step(scan, &buf, dir))
  670.     return ((RetrieveIndexResult) NULL);
  671.  
  672.     /* by here, current is the tuple we want to return */
  673.     offind = ItemPointerGetOffsetNumber(current, 0) - 1;
  674.     page = BufferGetPage(buf, 0);
  675.     btitem = (NOBTLItem) PageGetItem(page, PageGetItemId(page, offind));
  676.     itup = &btitem->nobtli_itup;
  677.  
  678.     if (_nobt_checkqual(scan, itup)) {
  679.     iptr = (ItemPointer) palloc(sizeof(ItemPointerData));
  680.     bcopy((char *) &(itup->t_tid), (char *) iptr, sizeof(ItemPointerData));
  681.     res = ItemPointerFormRetrieveIndexResult(current, iptr);
  682.  
  683.     /* remember which buffer we have pinned and locked */
  684.     so->nobtso_curbuf = buf;
  685.     } else {
  686.     ItemPointerSetInvalid(current);
  687.     so->nobtso_curbuf = InvalidBuffer;
  688.     _nobt_relbuf(rel, buf, NOBT_READ);
  689.     res = (RetrieveIndexResult) NULL;
  690.     }
  691.  
  692.     return (res);
  693. }
  694.  
  695. /*
  696.  *  _nobt_first() -- Find the first item in a scan.
  697.  *
  698.  *    We need to be clever about the type of scan, the operation it's
  699.  *    performing, and the tree ordering.  We return the RetrieveIndexResult
  700.  *    of the first item in the tree that satisfies the qualification
  701.  *    associated with the scan descriptor.  On exit, the page containing
  702.  *    the current index tuple is read locked and pinned, and the scan's
  703.  *    opaque data entry is updated to include the buffer.
  704.  */
  705.  
  706. RetrieveIndexResult
  707. _nobt_first(scan, dir)
  708.     IndexScanDesc scan;
  709.     ScanDirection dir;
  710. {
  711.     Relation rel;
  712.     TupleDescriptor itupdesc;
  713.     Buffer buf;
  714.     Page page;
  715.     NOBTStack stack;
  716.     OffsetIndex offind, maxoff;
  717.     NOBTLItem btitem;
  718.     IndexTuple itup;
  719.     ItemPointer current;
  720.     ItemPointer iptr;
  721.     BlockNumber blkno;
  722.     StrategyNumber strat;
  723.     RetrieveIndexResult res;
  724.     RegProcedure proc;
  725.     int result;
  726.     NOBTScanOpaque so;
  727.     ScanKeyData skdata;
  728.  
  729.     /* if we just need to walk down one edge of the tree, do that */
  730.     if (scan->scanFromEnd)
  731.     return (_nobt_endpoint(scan, dir));
  732.  
  733.     rel = scan->relation;
  734.     itupdesc = RelationGetTupleDescriptor(scan->relation);
  735.     current = &(scan->currentItemData);
  736.     so = (NOBTScanOpaque) scan->opaque;
  737.  
  738.     /*
  739.      *  Okay, we want something more complicated.  What we'll do is use
  740.      *  the first item in the scan key passed in (which has been correctly
  741.      *  ordered to take advantage of index ordering) to position ourselves
  742.      *  at the right place in the scan.
  743.      */
  744.  
  745.     /*
  746.      *  XXX -- The attribute number stored in the scan key is the attno
  747.      *           in the heap relation.  We need to transmogrify this into
  748.      *         the index relation attno here.  For the moment, we have
  749.      *           hardwired attno == 1.
  750.      */
  751.     proc = index_getprocid(rel, 1, NOBTORDER_PROC);
  752.     ScanKeyEntryInitialize(&skdata, 0x0, 1, proc,
  753.                scan->keyData.data[0].argument);
  754.  
  755.     stack = _nobt_search(rel, 1, &skdata, &buf);
  756.     _nobt_freestack(stack);
  757.  
  758.     /* find the nearest match to the manufactured scan key on the page */
  759.     offind = _nobt_binsrch(rel, buf, 1, &skdata, NOBT_DESCENT);
  760.     page = BufferGetPage(buf, 0);
  761.     maxoff = PageGetMaxOffsetIndex(page);
  762.  
  763.     if (offind > maxoff)
  764.     offind = maxoff;
  765.  
  766.     blkno = BufferGetBlockNumber(buf);
  767.     ItemPointerSet(current, 0, blkno, 0, offind + 1);
  768.  
  769.     /* now find the right place to start the scan */
  770.     result = _nobt_compare(rel, itupdesc, page, 1, &skdata, offind);
  771.     strat = _nobt_getstrat(rel, 1, scan->keyData.data[0].procedure);
  772.  
  773.     switch (strat) {
  774.       case NOBTLessStrategyNumber:
  775.     if (result <= 0) {
  776.         do {
  777.         if (!_nobt_twostep(scan, &buf, BackwardScanDirection))
  778.             break;
  779.  
  780.         offind = ItemPointerGetOffsetNumber(current, 0) - 1;
  781.         page = BufferGetPage(buf, 0);
  782.         result = _nobt_compare(rel, itupdesc, page, 1, &skdata, offind);
  783.         } while (result <= 0);
  784.  
  785.         /* if this is true, the key we just looked at is gone */
  786.         if (result > 0)
  787.         (void) _nobt_twostep(scan, &buf, ForwardScanDirection);
  788.     }
  789.     break;
  790.  
  791.       case NOBTLessEqualStrategyNumber:
  792.     if (result >= 0) {
  793.         do {
  794.         if (!_nobt_twostep(scan, &buf, ForwardScanDirection))
  795.             break;
  796.  
  797.         offind = ItemPointerGetOffsetNumber(current, 0) - 1;
  798.         page = BufferGetPage(buf, 0);
  799.         result = _nobt_compare(rel, itupdesc, page, 1, &skdata, offind);
  800.         } while (result >= 0);
  801.  
  802.         if (result < 0)
  803.         (void) _nobt_twostep(scan, &buf, BackwardScanDirection);
  804.     }
  805.     break;
  806.  
  807.       case NOBTEqualStrategyNumber:
  808.     if (result != 0) {
  809.       _nobt_relbuf(scan->relation, buf, NOBT_READ);
  810.       so->nobtso_curbuf = InvalidBuffer;
  811.       ItemPointerSetInvalid(&(scan->currentItemData));
  812.       return ((RetrieveIndexResult) NULL);
  813.     }
  814.     break;
  815.  
  816.       case NOBTGreaterEqualStrategyNumber:
  817.     if (result < 0) {
  818.         do {
  819.         if (!_nobt_twostep(scan, &buf, BackwardScanDirection))
  820.             break;
  821.  
  822.         offind = ItemPointerGetOffsetNumber(current, 0) - 1;
  823.         page = BufferGetPage(buf, 0);
  824.         result = _nobt_compare(rel, itupdesc, page, 1, &skdata, offind);
  825.         } while (result < 0);
  826.     }
  827.     break;
  828.  
  829.       case NOBTGreaterStrategyNumber:
  830.     if (result >= 0) {
  831.         do {
  832.         if (!_nobt_twostep(scan, &buf, ForwardScanDirection))
  833.             break;
  834.  
  835.         offind = ItemPointerGetOffsetNumber(current, 0) - 1;
  836.         page = BufferGetPage(buf, 0);
  837.         result = _nobt_compare(rel, itupdesc, page, 1, &skdata, offind);
  838.         } while (result >= 0);
  839.     }
  840.     break;
  841.     }
  842.  
  843.     /* okay, current item pointer for the scan is right */
  844.     offind = ItemPointerGetOffsetNumber(current, 0) - 1;
  845.     page = BufferGetPage(buf, 0);
  846.     btitem = (NOBTLItem) PageGetItem(page, PageGetItemId(page, offind));
  847.     itup = &btitem->nobtli_itup;
  848.  
  849.     if (_nobt_checkqual(scan, itup)) {
  850.     iptr = (ItemPointer) palloc(sizeof(ItemPointerData));
  851.     bcopy((char *) &(itup->t_tid), (char *) iptr, sizeof(ItemPointerData));
  852.     res = ItemPointerFormRetrieveIndexResult(current, iptr);
  853.  
  854.     /* remember which buffer we have pinned */
  855.     so->nobtso_curbuf = buf;
  856.     } else {
  857.     ItemPointerSetInvalid(current);
  858.     so->nobtso_curbuf = InvalidBuffer;
  859.     _nobt_relbuf(rel, buf, NOBT_READ);
  860.     res = (RetrieveIndexResult) NULL;
  861.     }
  862.  
  863.     return (res);
  864. }
  865.  
  866. /*
  867.  *  _nobt_step() -- Step one item in the requested direction in a scan on
  868.  *          the tree.
  869.  *
  870.  *    If no adjacent record exists in the requested direction, return
  871.  *    false.  Else, return true and set the currentItemData for the
  872.  *    scan to the right thing.
  873.  */
  874.  
  875. bool
  876. _nobt_step(scan, bufP, dir)
  877.     IndexScanDesc scan;
  878.     Buffer *bufP;
  879.     ScanDirection dir;
  880. {
  881.     Page page;
  882.     NOBTPageOpaque opaque;
  883.     OffsetIndex offind, maxoff;
  884.     OffsetIndex start;
  885.     BlockNumber blkno;
  886.     BlockNumber obknum;
  887.     NOBTScanOpaque so;
  888.     ItemPointer current;
  889.     Relation rel;
  890.  
  891.     rel = scan->relation;
  892.     current = &(scan->currentItemData);
  893.     offind = ItemPointerGetOffsetNumber(current, 0) - 1;
  894.     page = BufferGetPage(*bufP, 0);
  895.     opaque = (NOBTPageOpaque) PageGetSpecialPointer(page);
  896.     so = (NOBTScanOpaque) scan->opaque;
  897.     maxoff = PageGetMaxOffsetIndex(page);
  898.  
  899.     /* get the next tuple */
  900.     if (ScanDirectionIsForward(dir)) {
  901.     if (offind < maxoff) {
  902.         offind++;
  903.     } else {
  904.  
  905.         /* if we're at end of scan, release the buffer and return */
  906.         if ((blkno = opaque->nobtpo_next) == P_NONE) {
  907.         _nobt_relbuf(rel, *bufP, NOBT_READ);
  908.         ItemPointerSetInvalid(current);
  909.         *bufP = so->nobtso_curbuf = InvalidBuffer;
  910.         return (false);
  911.         } else {
  912.  
  913.         /* walk right to the next page with data */
  914.         _nobt_relbuf(rel, *bufP, NOBT_READ);
  915.         for (;;) {
  916.             *bufP = _nobt_getbuf(rel, blkno, NOBT_READ);
  917.             page = BufferGetPage(*bufP, 0);
  918.             opaque = (NOBTPageOpaque) PageGetSpecialPointer(page);
  919.             maxoff = PageGetMaxOffsetIndex(page);
  920.             if (opaque->nobtpo_next != P_NONE)
  921.             start = 1;
  922.             else
  923.             start = 0;
  924.  
  925.             if (!PageIsEmpty(page) && start <= maxoff) {
  926.             break;
  927.             } else {
  928.             blkno = opaque->nobtpo_next;
  929.             _nobt_relbuf(rel, *bufP, NOBT_READ);
  930.             if (blkno == P_NONE) {
  931.                 *bufP = so->nobtso_curbuf = InvalidBuffer;
  932.                 ItemPointerSetInvalid(current);
  933.                 return (false);
  934.             }
  935.             }
  936.         }
  937.         offind = start;
  938.         }
  939.     }
  940.     } else if (ScanDirectionIsBackward(dir)) {
  941.  
  942.     /* remember that high key is item zero on non-rightmost pages */
  943.     if (opaque->nobtpo_next == P_NONE)
  944.         start = 0;
  945.     else
  946.         start = 1;
  947.  
  948.     if (offind > start) {
  949.         offind--;
  950.     } else {
  951.  
  952.         /* if we're at end of scan, release the buffer and return */
  953.         if ((blkno = opaque->nobtpo_prev) == P_NONE) {
  954.         _nobt_relbuf(rel, *bufP, NOBT_READ);
  955.         *bufP = so->nobtso_curbuf = InvalidBuffer;
  956.         ItemPointerSetInvalid(current);
  957.         return (false);
  958.         } else {
  959.  
  960.         obknum = BufferGetBlockNumber(*bufP);
  961.  
  962.         /* walk right to the next page with data */
  963.         _nobt_relbuf(rel, *bufP, NOBT_READ);
  964.         for (;;) {
  965.             *bufP = _nobt_getbuf(rel, blkno, NOBT_READ);
  966.             page = BufferGetPage(*bufP, 0);
  967.             opaque = (NOBTPageOpaque) PageGetSpecialPointer(page);
  968.             maxoff = PageGetMaxOffsetIndex(page);
  969.  
  970.             /*
  971.              *  If the adjacent page just split, then we may have the
  972.              *  wrong block.  Handle this case.  Because pages only
  973.              *  split right, we don't have to worry about this failing
  974.              *  to terminate.
  975.              */
  976.  
  977.             while (opaque->nobtpo_next != obknum) {
  978.             blkno = opaque->nobtpo_next;
  979.             _nobt_relbuf(rel, *bufP, NOBT_READ);
  980.             *bufP = _nobt_getbuf(rel, blkno, NOBT_READ);
  981.             page = BufferGetPage(*bufP, 0);
  982.             opaque = (NOBTPageOpaque) PageGetSpecialPointer(page);
  983.             maxoff = PageGetMaxOffsetIndex(page);
  984.             }
  985.  
  986.             /* don't consider the high key */
  987.             if (opaque->nobtpo_next == P_NONE)
  988.             start = 0;
  989.             else
  990.             start = 1;
  991.  
  992.             /* anything to look at here? */
  993.             if (!PageIsEmpty(page) && maxoff >= start) {
  994.             break;
  995.             } else {
  996.             blkno = opaque->nobtpo_prev;
  997.             obknum = BufferGetBlockNumber(*bufP);
  998.             _nobt_relbuf(rel, *bufP, NOBT_READ);
  999.             if (blkno == P_NONE) {
  1000.                 *bufP = so->nobtso_curbuf = InvalidBuffer;
  1001.                 ItemPointerSetInvalid(current);
  1002.                 return (false);
  1003.             }
  1004.             }
  1005.         }
  1006.         offind = maxoff;
  1007.         }
  1008.     }
  1009.     }
  1010.     blkno = BufferGetBlockNumber(*bufP);
  1011.     so->nobtso_curbuf = *bufP;
  1012.     ItemPointerSet(current, 0, blkno, 0, offind + 1);
  1013.  
  1014.     return (true);
  1015. }
  1016.  
  1017. /*
  1018.  *  _nobt_twostep() -- Move to an adjacent record in a scan on the tree,
  1019.  *             if an adjacent record exists.
  1020.  *
  1021.  *    This is like _nobt_step, except that if no adjacent record exists
  1022.  *    it restores us to where we were before trying the step.  This is
  1023.  *    only hairy when you cross page boundaries, since the page you cross
  1024.  *    from could have records inserted or deleted, or could even split.
  1025.  *    This is unlikely, but we try to handle it correctly here anyway.
  1026.  *
  1027.  *    This routine contains the only case in which our changes to Lehman
  1028.  *    and Yao's algorithm.
  1029.  *
  1030.  *    Like step, this routine leaves the scan's currentItemData in the
  1031.  *    proper state and acquires a lock and pin on *bufP.  If the twostep
  1032.  *    succeeded, we return true; otherwise, we return false.
  1033.  */
  1034.  
  1035. bool
  1036. _nobt_twostep(scan, bufP, dir)
  1037.     IndexScanDesc scan;
  1038.     Buffer *bufP;
  1039.     ScanDirection dir;
  1040. {
  1041.     Page page;
  1042.     NOBTPageOpaque opaque;
  1043.     OffsetIndex offind, maxoff;
  1044.     OffsetIndex start;
  1045.     ItemPointer current;
  1046.     ItemId itemid;
  1047.     int itemsz;
  1048.     NOBTLItem btitem;
  1049.     NOBTLItem svitem;
  1050.     BlockNumber blkno;
  1051.     int nbytes;
  1052.  
  1053.     blkno = BufferGetBlockNumber(*bufP);
  1054.     page = BufferGetPage(*bufP, 0);
  1055.     opaque = (NOBTPageOpaque) PageGetSpecialPointer(page);
  1056.     maxoff = PageGetMaxOffsetIndex(page);
  1057.     current = &(scan->currentItemData);
  1058.     offind = ItemPointerGetOffsetNumber(current, 0) - 1;
  1059.  
  1060.     if (opaque->nobtpo_next != P_NONE)
  1061.     start = 1;
  1062.     else
  1063.     start = 0;
  1064.  
  1065.     /* if we're safe, just do it */
  1066.     if (ScanDirectionIsForward(dir) && offind < maxoff) {
  1067.     ItemPointerSet(current, 0, blkno, 0, offind + 2);
  1068.     return (true);
  1069.     } else if (ScanDirectionIsBackward(dir) && offind < start) {
  1070.     ItemPointerSet(current, 0, blkno, 0, offind);
  1071.     return (true);
  1072.     }
  1073.  
  1074.     /* if we've hit end of scan we don't have to do any work */
  1075.     if (ScanDirectionIsForward(dir) && opaque->nobtpo_next == P_NONE)
  1076.     return (false);
  1077.     else if (ScanDirectionIsBackward(dir) && opaque->nobtpo_prev == P_NONE)
  1078.     return (false);
  1079.  
  1080.     /*
  1081.      *  Okay, it's off the page; let _nobt_step() do the hard work, and we'll
  1082.      *  try to remember where we were.  This is not guaranteed to work; this
  1083.      *  is the only place in the code where concurrency can screw us up,
  1084.      *  and it's because we want to be able to move in two directions in
  1085.      *  the scan.
  1086.      */
  1087.  
  1088.     itemid = PageGetItemId(page, offind);
  1089.     itemsz = ItemIdGetLength(itemid);
  1090.     btitem = (NOBTLItem) PageGetItem(page, itemid);
  1091.     svitem = (NOBTLItem) palloc(itemsz);
  1092.     bcopy((char *) btitem, (char *) svitem, itemsz);
  1093.  
  1094.     if (_nobt_step(scan, bufP, dir)) {
  1095.     pfree(svitem);
  1096.     return (true);
  1097.     }
  1098.  
  1099.     /* try to find our place again */
  1100.     *bufP = _nobt_getbuf(scan->relation, blkno, NOBT_READ);
  1101.     page = BufferGetPage(*bufP, 0);
  1102.     maxoff = PageGetMaxOffsetIndex(page);
  1103.     nbytes = sizeof(NOBTLItemData) - sizeof(IndexTupleData);
  1104.  
  1105.     while (offind <= maxoff) {
  1106.     itemid = PageGetItemId(page, offind);
  1107.     btitem = (NOBTLItem) PageGetItem(page, itemid);
  1108.     if (bcmp((char *) btitem, (char *) svitem, nbytes) == 0) {
  1109.         pfree (svitem);
  1110.         ItemPointerSet(current, 0, blkno, 0, offind + 1);
  1111.         return (false);
  1112.     }
  1113.     }
  1114.  
  1115.     /*
  1116.      *  XXX crash and burn -- can't find our place.  We can be a little
  1117.      *  smarter -- walk to the next page to the right, for example, since
  1118.      *  that's the only direction that splits happen in.  Deletions screw
  1119.      *  us up less often since they're only done by the vacuum daemon.
  1120.      */
  1121.  
  1122.     elog(WARN, "btree synchronization error:  concurrent update botched scan");
  1123.  
  1124.     /* NOTREACHED */
  1125. }
  1126.  
  1127. /*
  1128.  *  _nobt_endpoint() -- Find the first or last key in the index.
  1129.  */
  1130.  
  1131. RetrieveIndexResult
  1132. _nobt_endpoint(scan, dir)
  1133.     IndexScanDesc scan;
  1134.     ScanDirection dir;
  1135. {
  1136.     Relation rel;
  1137.     Buffer buf;
  1138.     Page page;
  1139.     NOBTPageOpaque opaque;
  1140.     ItemPointer current;
  1141.     ItemPointer iptr;
  1142.     OffsetIndex offind, maxoff;
  1143.     OffsetIndex start;
  1144.     BlockNumber blkno;
  1145.     NOBTLItem btitem;
  1146.     NOBTIItem btiitem;
  1147.     IndexTuple itup;
  1148.     NOBTScanOpaque so;
  1149.     RetrieveIndexResult res;
  1150.  
  1151.     rel = scan->relation;
  1152.     current = &(scan->currentItemData);
  1153.  
  1154.     buf = _nobt_getroot(rel, NOBT_READ);
  1155.     blkno = BufferGetBlockNumber(buf);
  1156.     page = BufferGetPage(buf, 0);
  1157.     opaque = (NOBTPageOpaque) PageGetSpecialPointer(page);
  1158.  
  1159.     for (;;) {
  1160.     if (opaque->nobtpo_flags & NOBTP_LEAF)
  1161.         break;
  1162.  
  1163.     if (ScanDirectionIsForward(dir)) {
  1164.         if (opaque->nobtpo_next == P_NONE)
  1165.         offind = 0;
  1166.         else
  1167.         offind = 1;
  1168.     } else
  1169.         offind = PageGetMaxOffsetIndex(page);
  1170.  
  1171.     btiitem = (NOBTIItem) PageGetItem(page, PageGetItemId(page, offind));
  1172.     blkno = btiitem->nobtii_child;
  1173.  
  1174.     _nobt_relbuf(rel, buf, NOBT_READ);
  1175.     buf = _nobt_getbuf(rel, blkno, NOBT_READ);
  1176.     page = BufferGetPage(buf, 0);
  1177.     opaque = (NOBTPageOpaque) PageGetSpecialPointer(page);
  1178.  
  1179.     /*
  1180.      *  Race condition:  If the child page we just stepped onto is in
  1181.      *  the process of being split, we need to make sure we're all the
  1182.      *  way at the edge of the tree.  See the paper by Lehman and Yao.
  1183.      */
  1184.  
  1185.     if (ScanDirectionIsBackward(dir) && opaque->nobtpo_next != P_NONE) {
  1186.         do {
  1187.         blkno = opaque->nobtpo_next;
  1188.         _nobt_relbuf(rel, buf, NOBT_READ);
  1189.         buf = _nobt_getbuf(rel, blkno, NOBT_READ);
  1190.         page = BufferGetPage(buf, 0);
  1191.         opaque = (NOBTPageOpaque) PageGetSpecialPointer(page);
  1192.         } while (opaque->nobtpo_next != P_NONE);
  1193.     }
  1194.     }
  1195.  
  1196.     /* okay, we've got the {left,right}-most page in the tree */
  1197.     maxoff = PageGetMaxOffsetIndex(page);
  1198.  
  1199.     if (ScanDirectionIsForward(dir)) {
  1200.     maxoff = PageGetMaxOffsetIndex(page);
  1201.     if (opaque->nobtpo_next == P_NONE)
  1202.         start = 0;
  1203.     else
  1204.         start = 1;
  1205.  
  1206.     if (PageIsEmpty(page) || start > maxoff) {
  1207.         ItemPointerSet(current, 0, blkno, 0, maxoff + 1);
  1208.         if (!_nobt_step(scan, &buf, BackwardScanDirection))
  1209.         return ((RetrieveIndexResult) NULL);
  1210.  
  1211.         start = ItemPointerGetOffsetNumber(current, 0);
  1212.         page = BufferGetPage(buf);
  1213.     } else {
  1214.         ItemPointerSet(current, 0, blkno, 0, start + 1);
  1215.     }
  1216.     } else if (ScanDirectionIsBackward(dir)) {
  1217.     start = PageGetMaxOffsetIndex(page);
  1218.     if (PageIsEmpty(page)) {
  1219.         ItemPointerSet(current, 0, blkno, 0, start + 1);
  1220.         if (!_nobt_step(scan, &buf, ForwardScanDirection))
  1221.         return ((RetrieveIndexResult) NULL);
  1222.  
  1223.         start = ItemPointerGetOffsetNumber(current, 0);
  1224.         page = BufferGetPage(buf);
  1225.     } else {
  1226.         ItemPointerSet(current, 0, blkno, 0, start + 1);
  1227.     }
  1228.     } else {
  1229.     elog(WARN, "Illegal scan direction %d", dir);
  1230.     }
  1231.  
  1232.     btitem = (NOBTLItem) PageGetItem(page, PageGetItemId(page, start));
  1233.     itup = &(btitem->nobtli_itup);
  1234.  
  1235.     /* see if we picked a winner */
  1236.     if (_nobt_checkqual(scan, itup)) {
  1237.     iptr = (ItemPointer) palloc(sizeof(ItemPointerData));
  1238.     bcopy((char *) &(itup->t_tid), (char *) iptr, sizeof(ItemPointerData));
  1239.     res = ItemPointerFormRetrieveIndexResult(current, iptr);
  1240.  
  1241.     /* remember which buffer we have pinned */
  1242.     so = (NOBTScanOpaque) scan->opaque;
  1243.     so->nobtso_curbuf = buf;
  1244.     } else {
  1245.     _nobt_relbuf(rel, buf, NOBT_READ);
  1246.     res = (RetrieveIndexResult) NULL;
  1247.     }
  1248.  
  1249.     return (res);
  1250. }
  1251.  
  1252. #endif /* NOBTREE */
  1253.