home *** CD-ROM | disk | FTP | other *** search
/ Complete Linux / Complete Linux.iso / docs / apps / database / postgres / postgre4.z / postgre4 / src / storage / smgr / sj.c < prev    next >
Encoding:
C/C++ Source or Header  |  1992-08-27  |  56.5 KB  |  2,167 lines

  1. /*
  2.  *  sj.c -- sony jukebox storage manager.
  3.  *
  4.  *    This code manages relations that reside on the sony write-once
  5.  *    optical disk jukebox.
  6.  */
  7.  
  8. #include "tmp/c.h"
  9. #include "tmp/postgres.h"
  10.  
  11. #ifdef SONY_JUKEBOX
  12.  
  13. #include <sys/file.h>
  14. #include <math.h>
  15. #include "machine.h"
  16.  
  17. #include "tmp/miscadmin.h"
  18.  
  19. #include "storage/ipc.h"
  20. #include "storage/ipci.h"
  21. #include "storage/smgr.h"
  22. #include "storage/shmem.h"
  23. #include "storage/spin.h"
  24.  
  25. #include "utils/hsearch.h"
  26. #include "utils/rel.h"
  27. #include "utils/log.h"
  28.  
  29. #include "access/htup.h"
  30. #include "access/relscan.h"
  31. #include "access/heapam.h"
  32.  
  33. #include "catalog/pg_platter.h"
  34. #include "catalog/pg_plmap.h"
  35. #include "catalog/pg_proc.h"
  36.  
  37. #include "storage/sj.h"
  38.  
  39. RcsId("$Header: /private/postgres/src/storage/smgr/RCS/sj.c,v 1.35 1992/06/30 22:31:43 mao Exp $");
  40.  
  41. /* globals used in this file */
  42. SPINLOCK        SJCacheLock;    /* lock for cache metadata */
  43. extern ObjectId        MyDatabaseId;    /* OID of database we have open */
  44. extern Name        MyDatabaseName;    /* name of database we have open */
  45.  
  46. static File        SJCacheVfd;    /* vfd for cache data file */
  47. static File        SJMetaVfd;    /* vfd for cache metadata file */
  48. static File        SJBlockVfd;    /* vfd for nblocks file */
  49. static SJCacheHeader    *SJHeader;    /* pointer to cache header in shmem */
  50. static HTAB        *SJCacheHT;    /* pointer to hash table in shmem */
  51. static SJCacheItem    *SJCache;    /* pointer to cache metadata in shmem */
  52. static SJCacheTag    *SJNBlockCache;    /* pointer to nblock cache */
  53.  
  54. #ifndef    HAS_TEST_AND_SET
  55.  
  56. /*
  57.  *  If we don't have test-and-set locks, then we need a semaphore for
  58.  *  concurrency control.  This semaphore is in addition to the metadata
  59.  *  lock, SJCacheLock, that we acquire before touching the cache metadata.
  60.  *
  61.  *  This semaphore is used in two ways.  During cache initialization, we
  62.  *  use it to lock out all other backends that want cache access.  During
  63.  *  normal processing, we control access to groups on which IO is in
  64.  *  progress by holding this lock.  When we're done with initialization or
  65.  *  IO, we do enough V's on the semaphore to satisfy all outstanding P's.
  66.  */
  67.  
  68. static IpcSemaphoreId    SJWaitSemId;    /* wait semaphore */
  69. static long        *SJNWaiting;    /* # procs sleeping on the wait sem */
  70.  
  71. #endif /* ndef HAS_TEST_AND_SET */
  72.  
  73. /* static buffer is for data transfer */
  74. static char    SJCacheBuf[SJBUFSIZE];
  75.  
  76. /*
  77.  *  When we have to do IO on a group, we avoid holding an exclusive lock on
  78.  *  the cache metadata for the duration of the operation.  We do this by
  79.  *  setting a finer-granularity lock on the group itself.  How we do this
  80.  *  depends on whether we have test-and-set locks or not.  If so, it's
  81.  *  easy; we set the TASlock on the item itself.  Otherwise, we use the
  82.  *  'wait' semaphore described above.
  83.  */
  84.  
  85. #ifdef HAS_TEST_AND_SET
  86. #define SET_IO_LOCK(item) \
  87.     item->sjc_gflags |= SJC_IOINPROG; \
  88.     SpinRelease(SJCacheLock); \
  89.     S_LOCK(&(item->sjc_iolock));
  90. #else /* HAS_TEST_AND_SET */
  91. #define SET_IO_LOCK(item) \
  92.     item->sjc_gflags |= SJC_IOINPROG; \
  93.     (*SJNWaiting)++; \
  94.     SpinRelease(SJCacheLock); \
  95.     IpcSemaphoreLock(SJWaitSemId, 0, 1);
  96. #endif /* HAS_TEST_AND_SET */
  97.  
  98. #define GROUPNO(item)    (((char *) item) - ((char *) &(SJCache[0])))/sizeof(SJCacheItem)
  99.  
  100. /* routines declared in this file */
  101. static void        _sjcacheinit();
  102. static void        _sjwait_init();
  103. static void        _sjunwait_init();
  104. static void        _sjwait_io();
  105. static void        _sjunwait_io();
  106. static void        _sjtouch();
  107. static void        _sjunpin();
  108. static void        _sjregister();
  109. static void        _sjregnblocks();
  110. static void        _sjnewextent();
  111. static void        _sjrdextent();
  112. static void        _sjdirtylast();
  113. static int        _sjfindnblocks();
  114. static int        _sjwritegrp();
  115. static int        _sjreadgrp();
  116. static int        _sjgroupvrfy();
  117. static Form_pg_plmap    _sjchoose();
  118. static SJCacheItem    *_sjallocgrp();
  119. static SJCacheItem    *_sjfetchgrp();
  120. static SJHashEntry    *_sjhashop();
  121. static int        _sjgetgrp();
  122. static void        _sjdump();
  123.  
  124. /* routines declared elsewhere */
  125. extern HTAB        *ShmemInitHash();
  126. extern int        *ShmemInitStruct();
  127. extern Relation        RelationIdGetRelation();
  128. extern BlockNumber    pgjb_offset();
  129. extern bool        pgjb_freespc();
  130.  
  131. /*
  132.  *  sjinit() -- initialize the Sony jukebox storage manager.
  133.  *
  134.  *    We need to find (or establish) the mag-disk buffer cache metadata
  135.  *    in shared memory and open the cache on mag disk.  The first backend
  136.  *    to run that touches the cache initializes it.  All other backends
  137.  *    running simultaneously will only wait for this initialization to
  138.  *    complete if they need to get data out of the cache.  Otherwise,
  139.  *    they'll return successfully immediately after attaching the cache
  140.  *    memory, and will let their older sibling do all the work.
  141.  */
  142.  
  143. int
  144. sjinit()
  145. {
  146.     unsigned int metasize;
  147.     bool metafound;
  148.     HASHCTL info;
  149.     bool initcache;
  150.     char *cacheblk, *cachesave;
  151.     int status;
  152.     char *pghome;
  153.     char path[SJPATHLEN];
  154.  
  155.     /*
  156.      *  First attach the shared memory block that contains the disk
  157.      *  cache metadata.  At the end of this block in shared memory is
  158.      *  the hash table we use to do fast lookup on groups in the cache.
  159.      */
  160.  
  161.     SpinAcquire(SJCacheLock);
  162.  
  163. #ifdef HAS_TEST_AND_SET
  164.     metasize = (SJCACHESIZE * sizeof(SJCacheItem)) + sizeof(SJCacheHeader)
  165.         + (SJNBLKSIZE * sizeof(SJCacheTag));
  166. #else /* HAS_TEST_AND_SET */
  167.     metasize = (SJCACHESIZE * sizeof(SJCacheItem)) + sizeof(SJCacheHeader)
  168.         + (SJNBLKSIZE * sizeof(SJCacheTag)) + sizeof(*SJNWaiting);
  169. #endif /* HAS_TEST_AND_SET */
  170.     cachesave = cacheblk = (char *) ShmemInitStruct("Jukebox cache metadata",
  171.                             metasize, &metafound);
  172.  
  173.     if (cacheblk == (char *) NULL) {
  174.     SpinRelease(SJCacheLock);
  175.     return (SM_FAIL);
  176.     }
  177.  
  178.     /*
  179.      *  Order of items in shared memory is metadata header, number of
  180.      *  processes sleeping on the wait semaphore (if no test-and-set locks),
  181.      *  nblock cache, and jukebox cache entries.
  182.      */
  183.  
  184.     SJHeader = (SJCacheHeader *) cacheblk;
  185.     cacheblk += sizeof(SJCacheHeader);
  186.  
  187. #ifndef HAS_TEST_AND_SET
  188.     SJNWaiting = (long *) cacheblk;
  189.     cacheblk += sizeof(long);
  190. #endif /* ndef HAS_TEST_AND_SET */
  191.  
  192.     SJNBlockCache = (SJCacheTag *) cacheblk;
  193.     cacheblk += SJNBLKSIZE * sizeof(SJCacheTag);
  194.  
  195.     SJCache = (SJCacheItem *) cacheblk;
  196.  
  197.     /*
  198.      *  Now initialize the pointer to the shared memory hash table.
  199.      */
  200.  
  201.     info.keysize = sizeof(SJCacheTag);
  202.     info.datasize = sizeof(int);
  203.     info.hash = tag_hash;
  204.  
  205.     SJCacheHT = ShmemInitHash("Jukebox cache hash table",
  206.                   SJCACHESIZE, SJCACHESIZE,
  207.                   &info, (HASH_ELEM|HASH_FUNCTION));
  208.  
  209.     if (SJCacheHT == (HTAB *) NULL) {
  210.     SpinRelease(SJCacheLock);
  211.     return (SM_FAIL);
  212.     }
  213.  
  214.     /*
  215.      *  Okay, all our shared memory pointers are set up.  If we did not
  216.      *  find the cache metadata entries in shared memory, or if the cache
  217.      *  has not been initialized from disk, initialize it in this backend.
  218.      */
  219.  
  220.     if (!metafound || !(SJHeader->sjh_flags & (SJH_INITING|SJH_INITED))) {
  221.     initcache = true;
  222.     bzero((char *) cachesave, metasize);
  223.     SJHeader->sjh_flags = SJH_INITING;
  224. #ifdef HAS_TEST_AND_SET
  225.     S_LOCK(&(SJHeader->sjh_initlock));
  226. #else /* HAS_TEST_AND_SET */
  227.     IpcSemaphoreLock(SJWaitSemId, 0, 1);
  228.     *SJNWaiting = 1;
  229. #endif /* HAS_TEST_AND_SET */
  230.     } else {
  231.     initcache = false;
  232.     }
  233.  
  234.     /* don't need exclusive access anymore */
  235.     SpinRelease(SJCacheLock);
  236.  
  237.     pghome = GetPGHome();
  238.     sprintf(path, "%s/data/%s", pghome, SJCACHENAME);
  239.  
  240.     SJCacheVfd = PathNameOpenFile(path, O_RDWR|O_CREAT|O_EXCL, 0600);
  241.     if (SJCacheVfd < 0) {
  242.     SJCacheVfd = PathNameOpenFile(path, O_RDWR, 0600);
  243.     if (SJCacheVfd < 0) {
  244.  
  245.         /* if we were initializing the metadata, note our surrender */
  246.         if (!metafound) {
  247.         SJHeader->sjh_flags &= ~SJH_INITING;
  248.         _sjunwait_init();
  249.         }
  250.  
  251.         return (SM_FAIL);
  252.     }
  253.     }
  254.  
  255.     sprintf(path, "%s/data/%s", pghome, SJMETANAME);
  256.     SJMetaVfd = PathNameOpenFile(path, O_RDWR|O_CREAT|O_EXCL, 0600);
  257.     if (SJMetaVfd < 0) {
  258.     SJMetaVfd = PathNameOpenFile(path, O_RDWR, 0600);
  259.     if (SJMetaVfd < 0) {
  260.  
  261.         /* if we were initializing the metadata, note our surrender */
  262.         if (!metafound) {
  263.         SJHeader->sjh_flags &= ~SJH_INITING;
  264.         _sjunwait_init();
  265.         }
  266.  
  267.         return (SM_FAIL);
  268.     }
  269.     }
  270.  
  271.     sprintf(path, "%s/data/%s", pghome, SJBLOCKNAME);
  272.     SJBlockVfd = PathNameOpenFile(path, O_RDWR|O_CREAT|O_EXCL, 0600);
  273.     if (SJBlockVfd < 0) {
  274.     SJBlockVfd = PathNameOpenFile(path, O_RDWR, 0600);
  275.     if (SJBlockVfd < 0) {
  276.  
  277.         /* if we were initializing the metadata, note our surrender */
  278.         if (!metafound) {
  279.         SJHeader->sjh_flags &= ~SJH_INITING;
  280.         _sjunwait_init();
  281.         }
  282.  
  283.         return (SM_FAIL);
  284.     }
  285.     }
  286.  
  287.     /*
  288.      *  If it's our responsibility to initialize the shared-memory cache
  289.      *  metadata, then go do that.  Sjcacheinit() will elog(FATAL, ...) if
  290.      *  it can't initialize the cache, so we don't need to worry about a
  291.      *  return value here.
  292.      */
  293.  
  294.     if (initcache) {
  295.     _sjcacheinit();
  296.     }
  297.  
  298.     /*
  299.      *  Finally, we need to initialize the data structures we use for
  300.      *  communicating with the jukebox.
  301.      */
  302.  
  303.     if (pgjb_init() == SM_FAIL)
  304.     return (SM_FAIL);
  305.  
  306.     return (SM_SUCCESS);
  307. }
  308.  
  309. static void
  310. _sjcacheinit()
  311. {
  312.     int nbytes, nread;
  313.     int nentries;
  314.     int nblocks;
  315.     int i;
  316.     SJCacheItem *cur;
  317.     SJHashEntry *result;
  318.     bool found;
  319.  
  320.     /* sanity check */
  321.     if ((SJHeader->sjh_flags & SJH_INITED)
  322.     || !(SJHeader->sjh_flags & SJH_INITING)) {
  323.     elog(FATAL, "sj cache header metadata corrupted.");
  324.     }
  325.  
  326.     /* suck in the metadata */
  327.     nbytes = SJCACHESIZE * sizeof(SJCacheItem);
  328.     nread = FileRead(SJMetaVfd, (char *) SJCache, nbytes);
  329.  
  330.     /* be sure we got an integral number of entries */
  331.     nentries = nread / sizeof(SJCacheItem);
  332.     if ((nentries * sizeof(SJCacheItem)) != nread) {
  333.     SJHeader->sjh_flags &= ~SJH_INITING;
  334.     _sjunwait_init();
  335.     elog(FATAL, "sj cache metadata file corrupted.");
  336.     }
  337.  
  338.     /*
  339.      *  Clear out the nblock cache
  340.      */
  341.     bzero((char *) SJNBlockCache, SJNBLKSIZE * sizeof(SJCacheTag));
  342.  
  343.     /* add every group that appears in the cache to the hash table */
  344.     for (i = 0; i < nentries; i++) {
  345.     cur = &(SJCache[i]);
  346.     result = _sjhashop(&(cur->sjc_tag), HASH_ENTER, &found);
  347.  
  348.     /* store the group number for this key in the hash table */
  349.     result->sjhe_groupno = i;
  350.  
  351.     /* no io in progress */
  352.     cur->sjc_gflags &= ~SJC_IOINPROG;
  353.     cur->sjc_refcount = 0;
  354.  
  355. #ifdef HAS_TEST_AND_SET
  356.     S_UNLOCK(&(cur->sjc_iolock));
  357. #endif HAS_TEST_AND_SET
  358.     }
  359.  
  360.     /*
  361.      *  Now construct the LRU list (free list).  Extents will be nominated
  362.      *  for reuse in this order.  Since we have no usage information, we
  363.      *  adopt the following policy:  any extents not yet allocated in the
  364.      *  cache are come first in the list, in order.  These are followed by
  365.      *  the allocated extents, in order.  The free list head is the first
  366.      *  unallocated extent, and its tail is the last allocated one.  This
  367.      *  list is doubly-linked and is not circular.
  368.      */
  369.  
  370.     if (nentries == SJCACHESIZE || nentries == 0) {
  371.     cur = &(SJCache[i]);
  372.     cur->sjc_freeprev = i - 1;
  373.  
  374.     if (i == SJCACHESIZE - 1) {
  375.         cur->sjc_freenext = -1;
  376.     } else {
  377.         cur->sjc_freenext = i + 1;
  378.     }
  379.  
  380.     /* list head, tail pointers */
  381.     SJHeader->sjh_freehead = 0;
  382.     SJHeader->sjh_freetail = SJCACHESIZE - 1;
  383.     } else {
  384.     for (i = 0; i < nentries; i++) {
  385.         cur = &(SJCache[i]);
  386.  
  387.         if (i == 0)
  388.         cur->sjc_freeprev = SJCACHESIZE - 1;
  389.         else
  390.         cur->sjc_freeprev = i - 1;
  391.  
  392.         if (i == nentries - 1)
  393.         cur->sjc_freenext = -1;
  394.         else
  395.         cur->sjc_freenext = i + 1;
  396.     }
  397.  
  398.     for (i = nentries; i < SJCACHESIZE; i++) {
  399.         cur = &(SJCache[i]);
  400.  
  401.         /* mark this as unused by setting oid to invalid object id */
  402.         cur->sjc_oid = InvalidObjectId;
  403.  
  404.         if (i == nentries)
  405.         cur->sjc_freeprev = -1;
  406.         else
  407.         cur->sjc_freeprev = i - 1;
  408.  
  409.         if (i == SJCACHESIZE - 1)
  410.         cur->sjc_freenext = 0;
  411.         else
  412.         cur->sjc_freenext = i + 1;
  413.     }
  414.  
  415.     /* list head, tail pointers */
  416.     SJHeader->sjh_freehead = nentries;
  417.     SJHeader->sjh_freetail = nentries - 1;
  418.     }
  419.  
  420.     /* set up cache metadata header struct */
  421.     SJHeader->sjh_nentries = 0;
  422.     SJHeader->sjh_flags = SJH_INITED;
  423. }
  424.  
  425. /*
  426.  *  _sjunwait_init() -- Release initialization lock on the jukebox cache.
  427.  *
  428.  *    When we initialize the cache, we don't keep the cache semaphore
  429.  *    locked.  Instead, we set a flag in the metadata to let other
  430.  *    backends know that we're doing the initialization.  This lets
  431.  *    others start running queries immediately, even if the cache is
  432.  *    not yet populated.  If they want to look something up in the
  433.  *    cache, they'll block on the flag we set, and wait for us to finish.
  434.  *    If they don't need the jukebox, they can run unimpeded.  When we
  435.  *    finish, we call _sjunwait_init() to release the initialization lock
  436.  *    that we hold during initialization.
  437.  *
  438.  *    When we do this, either the cache is properly initialized, or
  439.  *    we detected some error we couldn't deal with.  In either case,
  440.  *    we no longer need exclusive access to the cache metadata.
  441.  */
  442.  
  443. static void
  444. _sjunwait_init()
  445. {
  446. #ifdef HAS_TEST_AND_SET
  447.  
  448.     S_UNLOCK(&(SJHeader->sjh_initlock));
  449.  
  450. #else /* HAS_TEST_AND_SET */
  451.  
  452.     /* atomically V the semaphore once for every waiting process */
  453.     SpinAcquire(SJCacheLock);
  454.     IpcSemaphoreUnlock(SJWaitSemId, 0, *SJNWaiting);
  455.     *SJNWaiting = 0;
  456.     SpinRelease(SJCacheLock);
  457.  
  458. #endif /* HAS_TEST_AND_SET */
  459. }
  460.  
  461. /*
  462.  *  _sjunwait_io() -- Release IO lock on the jukebox cache.
  463.  *
  464.  *    While we're doing IO on a particular group in the cache, any other
  465.  *    process that wants to touch that group needs to wait until we're
  466.  *    finished.  If we have TASlocks, then a wait lock appears on the
  467.  *    group entry in the cache metadata.  Otherwise, we use the wait
  468.  *    semaphore in the same way as for initialization, above.
  469.  */
  470.  
  471. static void
  472. _sjunwait_io(item)
  473.     SJCacheItem *item;
  474. {
  475.     item->sjc_gflags &= ~SJC_IOINPROG;
  476.  
  477. #ifdef HAS_TEST_AND_SET
  478.     S_UNLOCK(&(item->sjc_iolock));
  479. #else /* HAS_TEST_AND_SET */
  480.  
  481.     /* atomically V the wait semaphore once for each sleeping process */
  482.     SpinAcquire(SJCacheLock);
  483.  
  484.     if (*SJNWaiting > 0) {
  485.     IpcSemaphoreUnlock(SJWaitSemId, 0, *SJNWaiting);
  486.     *SJNWaiting = 0;
  487.     }
  488.  
  489.     SpinRelease(SJCacheLock);
  490. #endif /* HAS_TEST_AND_SET */
  491. }
  492.  
  493. /*
  494.  *  _sjwait_init() -- Wait for cache initialization to complete.
  495.  *
  496.  *    This routine is called when we want to access jukebox cache metadata,
  497.  *    but someone else is initializing it.  When we return, the init lock
  498.  *    has been released and we can retry our access.  On entry, we must be
  499.  *    holding the cache metadata lock.
  500.  */
  501.  
  502. static void
  503. _sjwait_init()
  504. {
  505. #ifdef HAS_TEST_AND_SET
  506.     SpinRelease(SJCacheLock);
  507.     S_LOCK(&(SJHeader->sjh_initlock));
  508.     S_UNLOCK(&(SJHeader->sjh_initlock));
  509. #else /* HAS_TEST_AND_SET */
  510.     (*SJNWaiting)++;
  511.     SpinRelease(SJCacheLock);
  512.     IpcSemaphoreLock(SJWaitSemId, 0, 1);
  513. #endif /* HAS_TEST_AND_SET */
  514. }
  515.  
  516. /*
  517.  *  _sjwait_io() -- Wait for group IO to complete.
  518.  *
  519.  *    This routine is called when we discover that some other process is
  520.  *    doing IO on a group in the cache that we want to use.  We need to
  521.  *    wait for that IO to complete before we can use the group.  On entry,
  522.  *    we must hold the cache metadata lock.  On return, we don't hold that
  523.  *    lock, and the IO completed.  We can retry our access.
  524.  */
  525.  
  526. static void
  527. _sjwait_io(item)
  528.     SJCacheItem *item;
  529. {
  530. #ifdef HAS_TEST_AND_SET
  531.     SpinRelease(SJCacheLock);
  532.     S_LOCK(&(item->sjc_iolock));
  533.     S_UNLOCK(&(item->sjc_iolock));
  534. #else /* HAS_TEST_AND_SET */
  535.     (*SJNWaiting)++;
  536.     SpinRelease(SJCacheLock);
  537.     IpcSemaphoreLock(SJWaitSemId, 0, 1);
  538. #endif /* HAS_TEST_AND_SET */
  539. }
  540.  
  541. /*
  542.  *  sjshutdown() -- shut down the jukebox storage manager.
  543.  *
  544.  *    We want to close the cache and metadata files, release all our open
  545.  *    jukebox connections, and let the caller know we're done.
  546.  */
  547.  
  548. int
  549. sjshutdown()
  550. {
  551.     FileClose(SJCacheVfd);
  552.     FileClose(SJMetaVfd);
  553.  
  554.     return (SM_SUCCESS);
  555. }
  556.  
  557. /*
  558.  *  sjcreate() -- Create the requested relation on the jukebox.
  559.  *
  560.  *    Creating a new relation requires us to make a new cache group,
  561.  *    fill in the descriptor page, make sure everything is on disk,
  562.  *    and create the new relation file to store the last page of data
  563.  *    on magnetic disk.
  564.  */
  565.  
  566. int
  567. sjcreate(reln)
  568.     Relation reln;
  569. {
  570.     SJCacheItem *item;
  571.     SJGroupDesc *group;
  572.     SJCacheTag tag;
  573.     ObjectId dbid;
  574.     ObjectId relid;
  575.     File vfd;
  576.     int grpno;
  577.     int i;
  578.     char path[SJPATHLEN];
  579.  
  580.     /*
  581.      *  If the cache is in the process of being initialized, then we need
  582.      *  to wait for initialization to complete.  If the cache is not yet
  583.      *  initialized, and no one else is doing it, then we need to initialize
  584.      *  it ourselves.  Sjwait_init() or sj_init() will release the cache
  585.      *  lock for us.
  586.      */
  587.  
  588.     SpinAcquire(SJCacheLock);
  589.     if (!(SJHeader->sjh_flags & SJH_INITED)) {
  590.     if (SJHeader->sjh_flags & SJH_INITING) {
  591.         _sjwait_init();
  592.     } else {
  593.         sjinit();
  594.     }
  595.     return (sjcreate(reln));
  596.     }
  597.     SpinRelease(SJCacheLock);
  598.  
  599.     /*
  600.      *  By here, cache is initialized.  We are aggressively lazy, and
  601.      *  will not allocate an initial extent for this relation until it's
  602.      *  actually used.  We just register an initial block count of zero.
  603.      */
  604.  
  605.     if (reln->rd_rel->relisshared)
  606.     tag.sjct_dbid = (ObjectId) 0;
  607.     else
  608.     tag.sjct_dbid = MyDatabaseId;
  609.  
  610.     tag.sjct_relid = reln->rd_id;
  611.     tag.sjct_base = (BlockNumber) 0;
  612.  
  613.     _sjregnblocks(&tag);
  614.  
  615.     /* last thing to do is to create the mag-disk file to hold last page */
  616.     if (reln->rd_rel->relisshared)
  617.     strcpy(path, "../");
  618.     else
  619.     path[0] = '\0';
  620.  
  621.     strncpy(path, &(reln->rd_rel->relname.data[0]), sizeof(NameData));
  622.  
  623.     vfd = FileNameOpenFile(path, O_CREAT|O_RDWR|O_EXCL, 0600);
  624.  
  625.     return (vfd);
  626. }
  627.  
  628. /*
  629.  *  _sjregister() -- Make catalog entry for a new extent
  630.  *
  631.  *    When we create a new jukebox relation, or when we add a new extent
  632.  *    to an existing relation, we need to make the appropriate entry in
  633.  *    pg_plmap().  This routine does that.
  634.  *
  635.  *    On entry, we have item pinned; on exit, it's still pinned, and the
  636.  *    system catalogs have been updated to reflect the presence of the
  637.  *    new extent.
  638.  */
  639.  
  640. static void
  641. _sjregister(item, group)
  642.     SJCacheItem *item;
  643.     SJGroupDesc *group;
  644. {
  645.     Relation plmap;
  646.     ObjectId plid;
  647.     Form_pg_plmap plmdata;
  648.     HeapTuple plmtup;
  649.  
  650.     /*
  651.      *  Choose a platter to put the new extent on.  This returns a filled-in
  652.      *  pg_plmap tuple data part to insert into the system catalogs.  The
  653.      *  choose routine also figures out where to place the extent on the
  654.      *  platter.
  655.      *
  656.      *    Sjchoose() palloc's and fills in plmdata; we free it later in this
  657.      *  routine.
  658.      */
  659.  
  660.     plmdata = _sjchoose(item);
  661.  
  662.     /* record plid, offset, extent size for caller */
  663.     group->sjgd_plid = plmdata->plid;
  664.     group->sjgd_jboffset = plmdata->ploffset;
  665.     group->sjgd_extentsz = plmdata->plextentsz;
  666.  
  667.     plmtup = (HeapTuple) heap_addheader(Natts_pg_plmap,
  668.                     sizeof(FormData_pg_plmap),
  669.                     (char *) plmdata);
  670.  
  671.     /* clean up the memory that heap_addheader() palloc()'ed for us */
  672.     plmtup->t_oid = InvalidObjectId;
  673.     bzero((char *) &(plmtup->t_chain), sizeof(plmtup->t_chain));
  674.  
  675.     /* open the relation and lock it */
  676.     plmap = heap_openr(Name_pg_plmap);
  677.     RelationSetLockForWrite(plmap);
  678.  
  679.     /* insert the new catalog tuple */
  680.     heap_insert(plmap, plmtup, (double *) NULL);
  681.  
  682.     /* done */
  683.     heap_close(plmap);
  684.  
  685.     /* be tidy */
  686.     pfree((char *) plmtup);
  687.     pfree((char *) plmdata);
  688. }
  689.  
  690. /*
  691.  *  _sjchoose() -- Choose a platter to receive a new extent.
  692.  *
  693.  *    Allocation strategy is:
  694.  *
  695.  *      + For the first extent of a new relation, put it on the first
  696.  *        with room for a new relation.  The policy for allocating new
  697.  *        relations to a platter is implemented by pgjb_freespc().
  698.  *
  699.  *      + For second and subsequent extents of an existing relation:
  700.  *
  701.  *        -  If there's a platter holding another extent for this
  702.  *           relation, and that platter has room for this extent,
  703.  *           allocate it there.  NOTE:  this is true in the current
  704.  *           implementation, but it's a side effect of the way in which
  705.  *           we scan for free space on platters (we consider platters
  706.  *           in the same order every time we look).
  707.  *
  708.  *        -  Otherwise, allocate the extent on the first platter with
  709.  *           space for a new extent.
  710.  */
  711.  
  712. static Form_pg_plmap
  713. _sjchoose(item)
  714.     SJCacheItem *item;
  715. {
  716.     Relation plat;
  717.     TupleDescriptor platdesc;
  718.     HeapScanDesc platscan;
  719.     HeapTuple plattup;
  720.     Buffer buf;
  721.     Form_pg_plmap plmdata;
  722.     ObjectId plid;
  723.     Datum d;
  724.     Name platname;
  725.     char *plname;
  726.     bool isnull;
  727.     bool done;
  728.     int alloctype;
  729.  
  730.     /* allocate the tuple form */
  731.     plmdata = (Form_pg_plmap) palloc(sizeof(FormData_pg_plmap));
  732.     plname = (char *) palloc(sizeof(NameData) + 1);
  733.  
  734.     plat = heap_openr(Name_pg_platter);
  735.  
  736.     /*
  737.      *  We do short-term (non-two-phase) locking on the platter relation
  738.      *  in order to guarantee serial allocations.
  739.      */
  740.  
  741.     RelationSetLockForWrite(plat);
  742.  
  743.     platdesc = RelationGetTupleDescriptor(plat);
  744.     platscan = heap_beginscan(plat, false, NowTimeQual, 0, NULL);
  745.  
  746.     /* figure out if this is a new or an old relation allocation */
  747.     alloctype = (item->sjc_tag.sjct_base > 0 ? SJOLDRELN : SJNEWRELN);
  748.  
  749.     /* find a qualifying tuple in pg_platter */
  750.     plattup = heap_getnext(platscan, false, &buf);
  751.     if (!HeapTupleIsValid(plattup))
  752.     elog(WARN, "_sjchoose: no platters in pg_plmap");
  753.  
  754.     done = false;
  755.     do {
  756.     /* get platter OID, name */
  757.     plid = plmdata->plid = plattup->t_oid;
  758.     d = (Datum) heap_getattr(plattup, buf, Anum_pg_platter_plname,
  759.                  platdesc, &isnull);
  760.     platname = DatumGetName(d);
  761.     strncpy(plname, &(platname->data[0]), sizeof(NameData));
  762.     plname[sizeof(NameData)] = '\0';
  763.  
  764.     done = pgjb_freespc(plname, plid, alloctype);
  765.  
  766.     /* done with this tuple */
  767.     ReleaseBuffer(buf);
  768.  
  769.     /* next tuple */
  770.     if (!done) {
  771.         plattup = heap_getnext(platscan, false, &buf);
  772.         if (!HeapTupleIsValid(plattup))
  773.         elog(WARN, "_sjchoose: no space on platters in pg_plmap");
  774.     }
  775.     } while (!done);
  776.  
  777.     /* init the rest of the fields */
  778.     plmdata->pldbid = item->sjc_tag.sjct_dbid;
  779.     plmdata->plrelid = item->sjc_tag.sjct_relid;
  780.     plmdata->plblkno = item->sjc_tag.sjct_base;
  781.     plmdata->plextentsz = SJEXTENTSZ;
  782.     plmdata->ploffset = pgjb_offset(plname, plmdata->plid, plmdata->plextentsz);
  783.  
  784.     /* no longer need an exclusive lock for the allocation */
  785.     RelationUnsetLockForWrite(plat);
  786.  
  787.     heap_endscan(platscan);
  788.     heap_close(plat);
  789.  
  790.     /* save platter name, id, offset in item */
  791.     bcopy(plname, &(item->sjc_plname.data[0]), sizeof(NameData));
  792.     item->sjc_plid = plmdata->plid;
  793.     item->sjc_jboffset = plmdata->ploffset;
  794.  
  795.     return (plmdata);
  796. }
  797.  
  798. /*
  799.  *  _sjallocgrp() -- Allocate a new group in the cache for use by some
  800.  *            relation.
  801.  *
  802.  *    If there are any unused slots in the cache, we just return one
  803.  *    of those.  Otherwise, we need to kick out the least-recently-used
  804.  *    group and make room for another.
  805.  *
  806.  *    On entry, we hold the cache metadata lock.  On exit, we still hold
  807.  *    it.  In between, we may release it in order to do I/O on the cache
  808.  *    group we're kicking out, if we have to do that.
  809.  */
  810.  
  811. static SJCacheItem *
  812. _sjallocgrp(grpno)
  813.     int *grpno;
  814. {
  815.     SJCacheItem *item;
  816.  
  817.     /* free list had better not be empty */
  818.     if (SJHeader->sjh_nentries == SJCACHESIZE)
  819.     elog(FATAL, "_sjallocgrp:  no groups on free list!");
  820.  
  821.     /*
  822.      *  Get a new group off the free list.  As a side effect, _sjgetgrp()
  823.      *  bumps the ref count on the group for us.
  824.      */
  825.  
  826.     *grpno = _sjgetgrp();
  827.  
  828.     item = &SJCache[*grpno];
  829.  
  830.     return (item);
  831. }
  832.  
  833. /*
  834.  *  _sjgetgrp() -- Get a group off the free list.
  835.  *
  836.  *    This routine returns the least-recently used group on the free list
  837.  *    to the caller.  If necessary, the (old) contents of the group are
  838.  *    forced to the platter.  On entry, we hold the cache metadata lock.
  839.  *    We release it and mark IOINPROG on the group if we need to do any
  840.  *    io.  We reacquire the lock before returning.
  841.  *
  842.  *    We know that there's something on the free list when we call this
  843.  *    routine.
  844.  *
  845.  *    There's an interesting problem with write-once media that we have
  846.  *    to deal with here.  It is possible in postgres for a half-full
  847.  *    buffer to be flushed to stable storage, then to be reloaded into
  848.  *    the buffer cache, filled completely, and for a new page to be
  849.  *    allocated before the old page is flushed again.  If this happens
  850.  *    to us, it's possible for the half-full page to get flushed all the
  851.  *    way through to an optical disk platter, where it can never be
  852.  *    overwritten.
  853.  *
  854.  *    In order to deal with this, we probe the buffer manager for all
  855.  *    dirty blocks it has that live on an extent before we flush the
  856.  *    extent to permanent storage.
  857.  */
  858.  
  859. static int
  860. _sjgetgrp()
  861. {
  862.     SJCacheItem *item;
  863.     int grpno;
  864.     int where;
  865.     long loc;
  866.     bool found;
  867.     int grpoffset;
  868.     BlockNumber nblocks;
  869.     Relation reln;
  870.     bool dirty;
  871.     int i;
  872.     int offset;
  873.     ObjectId dbid;
  874.     ObjectId relid;
  875.     BlockNumber base;
  876.  
  877.     /* pull the least-recently-used group off the free list */
  878.     grpno = SJHeader->sjh_freehead;
  879.     item = &(SJCache[grpno]);
  880.     _sjtouch(item);
  881.  
  882.     /* if it was previously a valid group, remove it from the hash table */
  883.     if (item->sjc_oid != InvalidObjectId)
  884.     _sjhashop(&(item->sjc_tag), HASH_REMOVE, &found);
  885.  
  886.     /*
  887.      *  See if we need to flush the group to the jukebox.  If we're working
  888.      *  with an entirely new item (the corresponding cache slot is empty),
  889.      *  dbid == relid == base == 0, so we can ignore the flags.  Otherwise,
  890.      *  we check every flags entry in the group descriptor to see if anyone
  891.      *  wants to get flushed.
  892.      */
  893.  
  894.     dirty = false;
  895.     if (item->sjc_tag.sjct_dbid != 0 || item->sjc_tag.sjct_relid != 0) {
  896.     if (MUST_FLUSH(item->sjc_gflags)) {
  897.         dirty = true;
  898.     } else {
  899.         for (i = 0; i < SJGRPSIZE; i++) {
  900.         if (MUST_FLUSH(item->sjc_flags[i])) {
  901.             dirty = true;
  902.             break;
  903.         }
  904.         }
  905.     }
  906.     }
  907.  
  908.     if (!dirty)
  909.     return (grpno);
  910.  
  911.     /*
  912.      *  By here, we need to force the group to stable storage outside the
  913.      *  cache.  Mark IOINPROG on the group (in fact, this shouldn't matter,
  914.      *  since no one should be able to get at it -- we just got it off the
  915.      *  free list and removed its hash table entry), release our exclusive
  916.      *  lock, and write it out.
  917.      */
  918.  
  919.     SET_IO_LOCK(item);
  920.  
  921.     if (_sjreadgrp(item, grpno) == SM_FAIL) {
  922.     _sjunwait_io(item);
  923.     elog(FATAL, "_sjgetgrp:  cannot read group %d", grpno);
  924.     }
  925.  
  926.     /*
  927.      *  Probe the buffer manager for dirty blocks that belong in this
  928.      *  extent.  The buffer manager will copy them into the space we
  929.      *  pass in, and will mark them clean in the buffer cache.
  930.      */
  931.  
  932.     dbid = item->sjc_tag.sjct_dbid;
  933.     relid = item->sjc_tag.sjct_relid;
  934.     base = item->sjc_tag.sjct_base;
  935.  
  936.     for (i = 0; i < SJGRPSIZE; i++) {
  937.     if (MUST_FLUSH(item->sjc_flags[i])) {
  938.         offset = (i * BLCKSZ) + JBBLOCKSZ;
  939.         DirtyBufferCopy(dbid, relid, base + i, &(SJCacheBuf[offset]));
  940.     }
  941.     }
  942.  
  943.     nblocks = _sjfindnblocks(&(item->sjc_tag));
  944.  
  945.     if (pgjb_wrtextent(item, nblocks, &(SJCacheBuf[0])) == SM_FAIL) {
  946.     _sjunwait_io(item);
  947.     elog(FATAL, "_sjfree:  cannot free group.");
  948.     }
  949.  
  950.     _sjunwait_io(item);
  951.  
  952.     /* give us back our exclusive lock */
  953.     SpinAcquire(SJCacheLock);
  954.  
  955.     return (grpno);
  956. }
  957.  
  958. static SJCacheItem *
  959. _sjfetchgrp(dbid, relid, blkno, grpno)
  960.     ObjectId dbid;
  961.     ObjectId relid;
  962.     int blkno;
  963.     int *grpno;
  964. {
  965.     SJCacheItem *item;
  966.     SJHashEntry *entry;
  967.     bool found;
  968.     SJCacheTag tag;
  969.  
  970.     SpinAcquire(SJCacheLock);
  971.  
  972.     tag.sjct_dbid = dbid;
  973.     tag.sjct_relid = relid;
  974.     tag.sjct_base = blkno;
  975.  
  976.     entry = _sjhashop(&tag, HASH_FIND, &found);
  977.  
  978.     if (found) {
  979.     *grpno = entry->sjhe_groupno;
  980.     item = &(SJCache[*grpno]);
  981.  
  982.     if (item->sjc_gflags & SJC_IOINPROG) {
  983.         _sjwait_io(item);
  984.         return (_sjfetchgrp(dbid, relid, blkno, grpno));
  985.     }
  986.  
  987.     _sjtouch(item);
  988.  
  989.     SpinRelease(SJCacheLock);
  990.     } else {
  991.     item = _sjallocgrp(grpno);
  992.  
  993.     /*
  994.      *  Possible race condition:  someone else instantiated the extent
  995.      *  we want while we were off allocating a group for it.  If that
  996.      *  happened, we want to put our just-allocated group back on the
  997.      *  free list for someone else to use.
  998.      */
  999.  
  1000.     entry = _sjhashop(&tag, HASH_FIND, &found);
  1001.     if (found) {
  1002.         /*
  1003.          *  Put the just-allocated group back on the free list.  This
  1004.          *  requires us to reenter it into the hash table if it refers
  1005.          *  to actual data.  We only want to do this if we got a different
  1006.          *  free group from the other process.
  1007.          */
  1008.  
  1009.         if (entry->sjhe_groupno != *grpno) {
  1010.         if (item->sjc_oid != InvalidObjectId)
  1011.             (void) _sjhashop(&(item->sjc_tag), HASH_ENTER, &found);
  1012.         _sjunpin(item);
  1013.         }
  1014.  
  1015.         item = &(SJCache[entry->sjhe_groupno]);
  1016.  
  1017.         /* if io in progress, wait for it to complete and try again */
  1018.         if (item->sjc_gflags & SJC_IOINPROG) {
  1019.         _sjunpin(item);
  1020.         _sjwait_io(item);
  1021.         return (_sjfetchgrp(dbid, relid, blkno));
  1022.         }
  1023.  
  1024.         SpinRelease(SJCacheLock);
  1025.     } else {
  1026.  
  1027.         /* okay, we need to read the extent from a platter */
  1028.         bcopy((char *) &tag, (char *) &(item->sjc_tag), sizeof(tag));
  1029.         entry = _sjhashop(&tag, HASH_ENTER, &found);
  1030.         entry->sjhe_groupno = *grpno;
  1031.  
  1032.         SET_IO_LOCK(item);
  1033.  
  1034.         /* read the extent off the optical platter */
  1035.         _sjrdextent(item);
  1036.  
  1037.         /* update the magnetic disk cache */
  1038.         _sjwritegrp(item, *grpno);
  1039.  
  1040.         /* done, release IO lock */
  1041.         _sjunwait_io(item);
  1042.     }
  1043.     }
  1044.  
  1045.     return (item);
  1046. }
  1047.  
  1048. /*
  1049.  *  _sjrdextent() -- Read an extent from an optical platter.
  1050.  *
  1051.  *    This routine prepares the SJCacheItem group for the pgjb_rdextent()
  1052.  *    routine to work with, and passes it along.  We don't have exclusive
  1053.  *    access to the cache metadata on entry, but we do have the IOINPROGRESS
  1054.  *    bit set on the item we're working with, so on one else will screw
  1055.  *    around with it.
  1056.  */
  1057.  
  1058. static void
  1059. _sjrdextent(item)
  1060.     SJCacheItem *item;
  1061. {
  1062.     Relation reln;
  1063.     HeapScanDesc hscan;
  1064.     HeapTuple htup;
  1065.     TupleDescriptor tupdesc;
  1066.     Datum d;
  1067.     Boolean n;
  1068.     Name plname;
  1069.     ScanKeyEntryData skey[3];
  1070.  
  1071.     /* first get platter id and offset from pg_plmap */
  1072.     reln = heap_openr(Name_pg_plmap);
  1073.     tupdesc = RelationGetTupleDescriptor(reln);
  1074.     ScanKeyEntryInitialize(&skey[0], 0x0, Anum_pg_plmap_pldbid,
  1075.                ObjectIdEqualRegProcedure,
  1076.                ObjectIdGetDatum(item->sjc_tag.sjct_dbid));
  1077.     ScanKeyEntryInitialize(&skey[1], 0x0, Anum_pg_plmap_plrelid,
  1078.                ObjectIdEqualRegProcedure,
  1079.                ObjectIdGetDatum(item->sjc_tag.sjct_relid));
  1080.     ScanKeyEntryInitialize(&skey[2], 0x0, Anum_pg_plmap_plblkno,
  1081.                Integer32EqualRegProcedure,
  1082.                Int32GetDatum(item->sjc_tag.sjct_base));
  1083.     hscan = heap_beginscan(reln, false, NowTimeQual, 3, &skey[0]);
  1084.  
  1085.     /*
  1086.      *  if there is no matching entry in the platter map, then we're
  1087.      *  asking for an extent that has not yet been allocated.  in this
  1088.      *  case, we return a zero-filled extent.  this happens, for example,
  1089.      *  when we try to read the initial block of a relation before one
  1090.      *  has been written.
  1091.      */
  1092.  
  1093.     if (!HeapTupleIsValid(htup = heap_getnext(hscan, false, (Buffer *) NULL))) {
  1094.     heap_endscan(hscan);
  1095.     heap_close(reln);
  1096.     bzero(&(SJCacheBuf[0]), SJBUFSIZE);
  1097.     return;
  1098.     }
  1099.  
  1100.     d = (Datum) heap_getattr(htup, InvalidBuffer, Anum_pg_plmap_plid,
  1101.                  tupdesc, &n);
  1102.     item->sjc_plid = DatumGetObjectId(d);
  1103.     d = (Datum) heap_getattr(htup, InvalidBuffer, Anum_pg_plmap_ploffset,
  1104.                  tupdesc, &n);
  1105.     item->sjc_jboffset = DatumGetInt32(d);
  1106.  
  1107.     heap_endscan(hscan);
  1108.     heap_close(reln);
  1109.  
  1110.     /* now figure out the platter's name from pg_platter */
  1111.     reln = heap_openr(Name_pg_platter);
  1112.     tupdesc = RelationGetTupleDescriptor(reln);
  1113.     ScanKeyEntryInitialize(&skey[0], 0x0, ObjectIdAttributeNumber,
  1114.                ObjectIdEqualRegProcedure,
  1115.                ObjectIdGetDatum(item->sjc_plid));
  1116.     hscan = heap_beginscan(reln, false, NowTimeQual, 1, &skey[0]);
  1117.  
  1118.     if (!HeapTupleIsValid(htup = heap_getnext(hscan, false, (Buffer *) NULL))) {
  1119.     _sjunwait_io(item);
  1120.     elog(WARN, "_sjrdextent: cannot find platter oid %d", item->sjc_plid);
  1121.     }
  1122.  
  1123.     d = (Datum) heap_getattr(htup, InvalidBuffer, Anum_pg_platter_plname,
  1124.                  tupdesc, &n);
  1125.     plname = DatumGetName(d);
  1126.     bcopy(&(plname->data[0]), &(item->sjc_plname.data[0]), sizeof(NameData));
  1127.  
  1128.     heap_endscan(hscan);
  1129.     heap_close(reln);
  1130.  
  1131.     /*
  1132.      *  Okay, by here, we have all the fields in item filled in except for
  1133.      *  sjc_oid, sjc_gflags, and sjc_flags[].  Those are all filled in by
  1134.      *  pgjb_rdextent(), so we call that routine to do the work.
  1135.      */
  1136.  
  1137.     if (pgjb_rdextent(item, &SJCacheBuf[0]) == SM_FAIL) {
  1138.     _sjunwait_io(item);
  1139.     elog(WARN, "read of extent <%d,%d,%d> from platter %d failed",
  1140.            item->sjc_tag.sjct_dbid, item->sjc_tag.sjct_relid,
  1141.            item->sjc_tag.sjct_base, item->sjc_plid);
  1142.     }
  1143. }
  1144.  
  1145. /*
  1146.  *  _sjtouch() -- Increment reference count on the supplied item.
  1147.  *
  1148.  *    If this is the first reference to the item, we remove it from the
  1149.  *    free list.  On entry and exit, we hold SJCacheLock.  If we pulled
  1150.  *    the item off the free list, we adjust SJHeader->sjh_nentries.
  1151.  */
  1152.  
  1153. static void
  1154. _sjtouch(item)
  1155.     SJCacheItem *item;
  1156. {
  1157.     /*
  1158.      *  Bump the reference count to this group.  If it's the first
  1159.      *  reference, pull the group off the free list.
  1160.      */
  1161.  
  1162.     if (++(item->sjc_refcount) == 1) {
  1163.  
  1164.     /* if at the start of the free list, adjust 'head' pointer */
  1165.     if (item->sjc_freeprev != -1)
  1166.         SJCache[item->sjc_freeprev].sjc_freenext = item->sjc_freenext;
  1167.     else
  1168.         SJHeader->sjh_freehead = item->sjc_freenext;
  1169.  
  1170.     /* if at the end of the free list, adjust 'tail' pointer */
  1171.     if (item->sjc_freenext != -1)
  1172.         SJCache[item->sjc_freenext].sjc_freeprev = item->sjc_freeprev;
  1173.     else
  1174.         SJHeader->sjh_freetail = item->sjc_freeprev;
  1175.  
  1176.     /* disconnect from free list */
  1177.     item->sjc_freeprev = item->sjc_freenext = -1;
  1178.  
  1179.     /* keep track of number of groups allocated */
  1180.     (SJHeader->sjh_nentries)++;
  1181.     }
  1182. }
  1183.  
  1184. /*
  1185.  *  _sjunpin() -- Decrement reference count on the supplied item.
  1186.  *
  1187.  *    If we are releasing the last reference to the supplied item, we put
  1188.  *    it back on the free list.  On entry and exit, we do not hold the
  1189.  *    cache lock.  We must acquire it in order to carry out the requested
  1190.  *    release.
  1191.  */
  1192.  
  1193. static void
  1194. _sjunpin(item)
  1195.     SJCacheItem *item;
  1196. {
  1197.     int grpno;
  1198.  
  1199.     /* exclusive access */
  1200.     SpinAcquire(SJCacheLock);
  1201.  
  1202.     /* item had better be pinned */
  1203.     if (item->sjc_refcount <= 0)
  1204.     elog(FATAL, "_sjunpin: illegal reference count");
  1205.  
  1206.     /*
  1207.      *  Unpin the item.  If this is the last reference, put the item at the
  1208.      *  end of the free list.  Implemenation note:  if SJHeader->sjh_freehead
  1209.      *  is -1, then the list is empty, and SJHeader->sjh_freetail is also -1.
  1210.      */
  1211.  
  1212.     if (--(item->sjc_refcount) == 0) {
  1213.  
  1214.     grpno = GROUPNO(item);
  1215.  
  1216.     if (SJHeader->sjh_freehead == -1) {
  1217.         SJHeader->sjh_freehead = grpno;
  1218.     } else {
  1219.         item->sjc_freeprev = SJHeader->sjh_freetail;
  1220.         SJCache[SJHeader->sjh_freetail].sjc_freenext = grpno;
  1221.     }
  1222.  
  1223.     /* put item at end of free list */
  1224.     SJHeader->sjh_freetail = grpno;
  1225.     (SJHeader->sjh_nentries)--;
  1226.     }
  1227.  
  1228.     SpinRelease(SJCacheLock);
  1229. }
  1230.  
  1231. static int
  1232. _sjwritegrp(item, grpno)
  1233.     SJCacheItem *item;
  1234.     int grpno;
  1235. {
  1236.     long seekpos;
  1237.     long loc;
  1238.     int nbytes, i;
  1239.     char *buf;
  1240.  
  1241.     /* first update the metadata file */
  1242.     seekpos = grpno * sizeof(*item);
  1243.  
  1244.     if ((loc = FileSeek(SJMetaVfd, seekpos, L_SET)) != seekpos)
  1245.     return (SM_FAIL);
  1246.  
  1247.     nbytes = sizeof(*item);
  1248.     buf = (char *) item;
  1249.     while (nbytes > 0) {
  1250.     i = FileWrite(SJMetaVfd, buf, nbytes);
  1251.     if (i < 0)
  1252.         return (SM_FAIL);
  1253.     nbytes -= i;
  1254.     buf += i;
  1255.     }
  1256.  
  1257.     FileSync(SJMetaVfd);
  1258.  
  1259.     /* now update the cache file */
  1260.     seekpos = grpno * SJBUFSIZE;
  1261.     if ((loc = FileSeek(SJCacheVfd, seekpos, L_SET)) != seekpos)
  1262.     return (SM_FAIL);
  1263.  
  1264.     nbytes = SJBUFSIZE;
  1265.     buf = &(SJCacheBuf[0]);
  1266.     while (nbytes > 0) {
  1267.     i = FileWrite(SJCacheVfd, buf, nbytes);
  1268.     if (i < 0)
  1269.         return (SM_FAIL);
  1270.     nbytes -= i;
  1271.     buf += i;
  1272.     }
  1273.  
  1274.     FileSync(SJCacheVfd);
  1275.  
  1276.     return (SM_SUCCESS);
  1277. }
  1278.  
  1279. /*
  1280.  *  sjextend() -- extend a relation by one block.
  1281.  */
  1282.  
  1283. int
  1284. sjextend(reln, buffer)
  1285.     Relation reln;
  1286.     char *buffer;
  1287. {
  1288.     SJCacheItem *item;
  1289.     SJHashEntry *entry;
  1290.     SJCacheTag tag;
  1291.     int grpno;
  1292.     int nblocks;
  1293.     int base;
  1294.     int offset;
  1295.     bool found;
  1296.     int grpoffset;
  1297.     long seekpos;
  1298.  
  1299.     RelationSetLockForExtend(reln);
  1300.     nblocks = sjnblocks(reln);
  1301.     base = (nblocks / SJGRPSIZE) * SJGRPSIZE;
  1302.  
  1303.     SpinAcquire(SJCacheLock);
  1304.  
  1305.     /*
  1306.      *  If the highest extent is full, we need to allocate a new group in
  1307.      *  the cache.  As a side effect, _sjnewextent will release SJCacheLock.
  1308.      *  We need to reacquire it immediately afterwards.
  1309.      */
  1310.  
  1311.     if ((nblocks % SJGRPSIZE) == 0) {
  1312.     _sjnewextent(reln, base);
  1313.     SpinAcquire(SJCacheLock);
  1314.     }
  1315.  
  1316.     if (reln->rd_rel->relisshared)
  1317.     tag.sjct_dbid = (ObjectId) 0;
  1318.     else
  1319.     tag.sjct_dbid = MyDatabaseId;
  1320.  
  1321.     tag.sjct_relid = reln->rd_id;
  1322.     tag.sjct_base = base;
  1323.  
  1324.     entry = _sjhashop(&tag, HASH_FIND, &found);
  1325.  
  1326.     if (!found) {
  1327.     SpinRelease(SJCacheLock);
  1328.     elog(WARN, "sjextend:  hey mao:  your group is missing.");
  1329.     }
  1330.  
  1331.     /* find the item and block in the item to write */
  1332.     grpno = entry->sjhe_groupno;
  1333.     item = &SJCache[grpno];
  1334.     grpoffset = nblocks % SJGRPSIZE;
  1335.  
  1336.     /*
  1337.      *  Okay, allocate the next block in this extent by marking it 'not
  1338.      *  missing'.  Once we've done this, we must hold the extend lock
  1339.      *  until end of transaction, since the number of allocated blocks no
  1340.      *  longer matches the block count visible to other backends.
  1341.      */
  1342.  
  1343.     if (!(item->sjc_flags[grpoffset] & SJC_MISSING)) {
  1344.     SpinRelease(SJCacheLock);
  1345.     elog(WARN, "sjextend: cache botch: next block in group present");
  1346.     } else {
  1347.     item->sjc_flags[grpoffset] &= ~SJC_MISSING;
  1348.     }
  1349.  
  1350.     _sjtouch(item);
  1351.  
  1352.     SET_IO_LOCK(item);
  1353.  
  1354.     /* page is allocated */
  1355.     item->sjc_flags[grpoffset] = SJC_CLEAR;
  1356.  
  1357.     /* verify group descriptor data in the cache file */
  1358.     if (_sjgroupvrfy(item, grpno) == SM_FAIL) {
  1359.     _sjunpin(item);
  1360.     _sjunwait_io(item);
  1361.     return (SM_FAIL);
  1362.     }
  1363.  
  1364.     /* write the page */
  1365.     seekpos = (grpno * SJBUFSIZE) + ((nblocks % SJGRPSIZE) * BLCKSZ)
  1366.           + JBBLOCKSZ;
  1367.     if (FileSeek(SJCacheVfd, seekpos, L_SET) != seekpos) {
  1368.     elog(NOTICE, "sjextend: failed to seek to buffer lock (%d)", seekpos);
  1369.     _sjunpin(item);
  1370.     _sjunwait_io(item);
  1371.     return (SM_FAIL);
  1372.     }
  1373.  
  1374.     if (FileWrite(SJCacheVfd, buffer, BLCKSZ) != BLCKSZ) {
  1375.     elog(NOTICE, "sjextend: can't write page %d", nblocks);
  1376.     _sjunwait_io(item);
  1377.     _sjunpin(item);
  1378.     return (SM_FAIL);
  1379.     }
  1380.  
  1381.     /* write the updated cache metadata entry */
  1382.     seekpos = grpno * sizeof(*item);
  1383.  
  1384.     if (FileSeek(SJMetaVfd, seekpos, L_SET) != seekpos) {
  1385.     elog(NOTICE, "sjextend: seek to %d on metadata file failed", seekpos);
  1386.     _sjunwait_io(item);
  1387.     _sjunpin(item);
  1388.     return (SM_FAIL);
  1389.     }
  1390.  
  1391.     if (FileWrite(SJMetaVfd, (char *) item, sizeof(*item)) < 0) {
  1392.     elog(NOTICE, "sjextend: write of metadata file failed");
  1393.     _sjunwait_io(item);
  1394.     _sjunpin(item);
  1395.     return (SM_FAIL);
  1396.     }
  1397.  
  1398.     /* success */
  1399.     _sjunwait_io(item);
  1400.     _sjunpin(item);
  1401.  
  1402.     tag.sjct_base = ++nblocks;
  1403.     _sjregnblocks(&tag);
  1404.  
  1405.     return (SM_SUCCESS);
  1406. }
  1407.  
  1408. static int
  1409. _sjreadgrp(item, grpno)
  1410.     SJCacheItem *item;
  1411.     int grpno;
  1412. {
  1413.     long seekpos;
  1414.     long loc;
  1415.     int nbytes, i;
  1416.     char *buf;
  1417.     SJGroupDesc *gdesc;
  1418.  
  1419.     /* get the group from the cache file */
  1420.     seekpos = grpno * SJBUFSIZE;
  1421.     if ((loc = FileSeek(SJCacheVfd, seekpos, L_SET)) != seekpos) {
  1422.     elog(NOTICE, "_sjreadgrp: cannot seek");
  1423.     return (SM_FAIL);
  1424.     }
  1425.  
  1426.     nbytes = SJBUFSIZE;
  1427.     buf = &(SJCacheBuf[0]);
  1428.     while (nbytes > 0) {
  1429.     i = FileRead(SJCacheVfd, buf, nbytes);
  1430.     if (i < 0) {
  1431.         elog(NOTICE, "_sjreadgrp: read failed");
  1432.         return (SM_FAIL);
  1433.     }
  1434.     nbytes -= i;
  1435.     buf += i;
  1436.     }
  1437.  
  1438.     gdesc = (SJGroupDesc *) &(SJCacheBuf[0]);
  1439.  
  1440.     if (gdesc->sjgd_magic != SJGDMAGIC
  1441.     || gdesc->sjgd_version != SJGDVERSION
  1442.     || gdesc->sjgd_groupoid != item->sjc_oid) {
  1443.  
  1444.     elog(NOTICE, "_sjreadgrp: trashed cache");
  1445.     return (SM_FAIL);
  1446.     }
  1447.  
  1448.     return (SM_SUCCESS);
  1449. }
  1450.  
  1451. int
  1452. sjunlink(reln)
  1453.     Relation reln;
  1454. {
  1455.     return (SM_FAIL);
  1456. }
  1457.  
  1458. /*
  1459.  *  _sjnewextent() -- Add a new extent to a relation in the jukebox cache.
  1460.  */
  1461.  
  1462. static void
  1463. _sjnewextent(reln, base)
  1464.     Relation reln;
  1465.     BlockNumber base;
  1466. {
  1467.     SJHashEntry *entry;
  1468.     SJGroupDesc *group;
  1469.     SJCacheItem *item;
  1470.     bool found;
  1471.     int grpno;
  1472.     int i;
  1473.  
  1474.     item = _sjallocgrp(&grpno);
  1475.  
  1476.     if (reln->rd_rel->relisshared)
  1477.     item->sjc_tag.sjct_dbid = (ObjectId) 0;
  1478.     else
  1479.     item->sjc_tag.sjct_dbid = MyDatabaseId;
  1480.  
  1481.     item->sjc_tag.sjct_relid = (ObjectId) reln->rd_id;
  1482.     item->sjc_tag.sjct_base = base;
  1483.  
  1484.     entry = _sjhashop(&(item->sjc_tag), HASH_ENTER, &found);
  1485.  
  1486.     entry->sjhe_groupno = grpno;
  1487.  
  1488.     SET_IO_LOCK(item);
  1489.  
  1490.     /* set flags on item, initialize group descriptor block */
  1491.     item->sjc_gflags = SJC_CLEAR;
  1492.     for (i = 0; i < SJGRPSIZE; i++)
  1493.     item->sjc_flags[i] = SJC_MISSING;
  1494.  
  1495.     /* should be smarter and only bzero what we need to */
  1496.     bzero(SJCacheBuf, SJBUFSIZE);
  1497.  
  1498.     group = (SJGroupDesc *) (&SJCacheBuf[0]);
  1499.     group->sjgd_magic = SJGDMAGIC;
  1500.     group->sjgd_version = SJGDVERSION;
  1501.  
  1502.     if (reln->rd_rel->relisshared) {
  1503.     group->sjgd_dbid = (ObjectId) 0;
  1504.     } else {
  1505.     strncpy(&(group->sjgd_dbname.data[0]),
  1506.         &(MyDatabaseName->data[0]),
  1507.         sizeof(NameData));
  1508.     group->sjgd_dbid = (ObjectId) MyDatabaseId;
  1509.     }
  1510.  
  1511.     strncpy(&(group->sjgd_relname.data[0]),
  1512.         &(reln->rd_rel->relname.data[0]),
  1513.         sizeof(NameData));
  1514.     group->sjgd_relid = reln->rd_id;
  1515.     group->sjgd_relblkno = base;
  1516.     item->sjc_oid = group->sjgd_groupoid = newoid();
  1517.  
  1518.     /*
  1519.      *  Record the presence of the new extent in the system catalogs.  The
  1520.      *  plid, jboffset, and extentsz fields are filled in by _sjregister()
  1521.      *  or the routines that it calls.  Note that we do not force the new
  1522.      *  group descriptor block all the way to the optical platter here.
  1523.      *  We do decide where to place it, however, and must go to a fair amount
  1524.      *  of trouble elsewhere in the code to avoid allocating the same extent
  1525.      *  to a different relation, or block within the same relation.
  1526.      */
  1527.  
  1528.     _sjregister(item, group);
  1529.  
  1530.     /*
  1531.      *  Write the new group cache entry to disk.  Sjwritegrp() knows where
  1532.      *  the cache buffer begins, and forces out the group descriptor we
  1533.      *  just set up.
  1534.      */
  1535.  
  1536.     if (_sjwritegrp(item, grpno) == SM_FAIL) {
  1537.     _sjunwait_io(item);
  1538.     elog(FATAL, "_sjnewextent: cannot write new extent to disk");
  1539.     }
  1540.  
  1541.     _sjregnblocks(&(item->sjc_tag));
  1542.  
  1543.     /* can now release i/o lock on the item we just added */
  1544.     _sjunwait_io(item);
  1545.  
  1546.     /* no longer need the reference */
  1547.     _sjunpin(item);
  1548. }
  1549.  
  1550. /*
  1551.  *  _sjhashop() -- Do lookup, insertion, or deletion on the metadata hash
  1552.  *           table in shared memory.
  1553.  *
  1554.  *    We don't worry about the number of entries in the hash table here;
  1555.  *    that's handled at a higher level (_sjallocgrp and _sjgetgrp).  We
  1556.  *    hold SJCacheLock on entry.
  1557.  */
  1558.  
  1559. static SJHashEntry *
  1560. _sjhashop(tagP, op, foundP)
  1561.     SJCacheTag *tagP;
  1562.     HASHACTION op;
  1563.     bool *foundP;
  1564. {
  1565.     SJHashEntry *entry;
  1566.  
  1567.     entry = (SJHashEntry *) hash_search(SJCacheHT, (char *) tagP, op, foundP);
  1568.  
  1569.     if (entry == (SJHashEntry *) NULL) {
  1570.     SpinRelease(SJCacheLock);
  1571.     elog(FATAL, "_sjhashop: hash table corrupt.");
  1572.     }
  1573.  
  1574.     if (*foundP) {
  1575.     if (op == HASH_ENTER) {
  1576.         SpinRelease(SJCacheLock);
  1577.         elog(WARN, "_sjhashop: cannot enter <%d,%d,%d>: already exists",
  1578.          tagP->sjct_dbid, tagP->sjct_relid, tagP->sjct_base);
  1579.     }
  1580.     } else {
  1581.     if (op == HASH_REMOVE) {
  1582.         SpinRelease(SJCacheLock);
  1583.         elog(WARN, "_sjhashop: cannot delete <%d,%d,%d>: missing",
  1584.          tagP->sjct_dbid, tagP->sjct_relid, tagP->sjct_base);
  1585.     }
  1586.     }
  1587.  
  1588.     return (entry);
  1589. }
  1590.  
  1591. int
  1592. sjopen(reln)
  1593.     Relation reln;
  1594. {
  1595.     char *path;
  1596.     int fd;
  1597.     extern char *relpath();
  1598.  
  1599.     path = relpath(&(reln->rd_rel->relname.data[0]));
  1600.  
  1601.     fd = FileNameOpenFile(path, O_RDWR, 0600);
  1602.  
  1603.     return (fd);
  1604. }
  1605.  
  1606. int
  1607. sjclose(reln)
  1608.     Relation reln;
  1609. {
  1610.     FileClose(reln->rd_fd);
  1611.  
  1612.     return (SM_SUCCESS);
  1613. }
  1614.  
  1615. int
  1616. sjread(reln, blocknum, buffer)
  1617.     Relation reln;
  1618.     BlockNumber blocknum;
  1619.     char *buffer;
  1620. {
  1621.     SJCacheItem *item;
  1622.     ObjectId reldbid;
  1623.     BlockNumber base;
  1624.     int offset;
  1625.     int grpno;
  1626.     long seekpos;
  1627.  
  1628.     /* fake successful read on non-existent data */
  1629.     if (sjnblocks(reln) <= blocknum) {
  1630.     bzero(buffer, BLCKSZ);
  1631.     return (SM_SUCCESS);
  1632.     }
  1633.  
  1634.     if (reln->rd_rel->relisshared)
  1635.     reldbid = (ObjectId) 0;
  1636.     else
  1637.     reldbid = MyDatabaseId;
  1638.  
  1639.     base = (blocknum / SJGRPSIZE) * SJGRPSIZE;
  1640.  
  1641.     item = _sjfetchgrp(reldbid, reln->rd_id, base, &grpno);
  1642.  
  1643.     /* shd expand _sjfetchgrp() inline to avoid extra semop()s */
  1644.     SpinAcquire(SJCacheLock);
  1645.  
  1646.     SET_IO_LOCK(item);
  1647.  
  1648.     /* First read and verify the group descriptor metadata */
  1649.     if (_sjgroupvrfy(item, grpno) == SM_FAIL) {
  1650.     _sjunpin(item);
  1651.     _sjunwait_io(item);
  1652.     return (SM_FAIL);
  1653.     }
  1654.  
  1655.     /* By here, group descriptor metadata is okay */
  1656.     seekpos = (grpno * SJBUFSIZE) + ((blocknum % SJGRPSIZE) * BLCKSZ)
  1657.           + JBBLOCKSZ;
  1658.     if (FileSeek(SJCacheVfd, seekpos, L_SET) != seekpos) {
  1659.     elog(NOTICE, "sjread: failed to seek to buffer lock (%d)", seekpos);
  1660.     _sjunpin(item);
  1661.     _sjunwait_io(item);
  1662.     return (SM_FAIL);
  1663.     }
  1664.  
  1665.     /* read the requested page */
  1666.     if (FileRead(SJCacheVfd, buffer, BLCKSZ) != BLCKSZ) {
  1667.     elog(NOTICE, "sjread: can't read page %d", blocknum);
  1668.     _sjunwait_io(item);
  1669.     return (SM_FAIL);
  1670.     }
  1671.  
  1672.     _sjunwait_io(item);
  1673.     _sjunpin(item);
  1674.  
  1675.     return (SM_SUCCESS);
  1676. }
  1677.  
  1678. static int
  1679. _sjgroupvrfy(item, grpno)
  1680.     SJCacheItem *item;
  1681.     int grpno;
  1682. {
  1683.     long seekpos;
  1684.     SJGroupDesc gdesc;
  1685.  
  1686.     seekpos = SJBUFSIZE * grpno;
  1687.     if (FileSeek(SJCacheVfd, seekpos, L_SET) != seekpos) {
  1688.     elog(NOTICE, "sjgroupvrfy: Cannot seek to %d on sj cache file",
  1689.              seekpos);
  1690.     return (SM_FAIL);
  1691.     }
  1692.  
  1693.     if (FileRead(SJCacheVfd, (char *) &gdesc, sizeof(gdesc)) < 0) {
  1694.     elog(NOTICE, "sjgroupvrfy: Cannot read group desc from sj cache file");
  1695.     return (SM_FAIL);
  1696.     }
  1697.  
  1698.     if (gdesc.sjgd_magic != SJGDMAGIC
  1699.     || gdesc.sjgd_version != SJGDVERSION
  1700.     || gdesc.sjgd_groupoid != item->sjc_oid) {
  1701.  
  1702.     elog(NOTICE, "sjgroupvrfy: trashed cache");
  1703.     return (SM_FAIL);
  1704.     }
  1705.  
  1706.     return (SM_SUCCESS);
  1707. }
  1708.  
  1709. int
  1710. sjwrite(reln, blocknum, buffer)
  1711.     Relation reln;
  1712.     BlockNumber blocknum;
  1713.     char *buffer;
  1714. {
  1715.     SJCacheItem *item;
  1716.     ObjectId reldbid;
  1717.     BlockNumber base;
  1718.     int offset;
  1719.     int grpno;
  1720.     int which;
  1721.     long seekpos;
  1722.  
  1723.     if (reln->rd_rel->relisshared)
  1724.     reldbid = (ObjectId) 0;
  1725.     else
  1726.     reldbid = MyDatabaseId;
  1727.  
  1728.     base = (blocknum / SJGRPSIZE) * SJGRPSIZE;
  1729.  
  1730.     item = _sjfetchgrp(reldbid, reln->rd_id, base, &grpno);
  1731.  
  1732.     /* shd expand _sjfetchgrp() inline to avoid extra semop()s */
  1733.     SpinAcquire(SJCacheLock);
  1734.  
  1735.     which = blocknum % SJGRPSIZE;
  1736.  
  1737.     if (item->sjc_flags[which] & SJC_ONPLATTER) {
  1738.     SpinRelease(SJCacheLock);
  1739.     _sjunpin(item);
  1740.     elog(WARN, "sjwrite: optical platters are write-once, cannot rewrite");
  1741.     }
  1742.  
  1743.     SET_IO_LOCK(item);
  1744.  
  1745.     item->sjc_flags[which] &= ~SJC_MISSING;
  1746.  
  1747.     /* verify group descriptor data in the cache file */
  1748.     if (_sjgroupvrfy(item, grpno) == SM_FAIL) {
  1749.     _sjunpin(item);
  1750.     _sjunwait_io(item);
  1751.     return (SM_FAIL);
  1752.     }
  1753.  
  1754.     /* write the page */
  1755.     seekpos = (grpno * SJBUFSIZE) + ((blocknum % SJGRPSIZE) * BLCKSZ)
  1756.           + JBBLOCKSZ;
  1757.     if (FileSeek(SJCacheVfd, seekpos, L_SET) != seekpos) {
  1758.     elog(NOTICE, "sjwrite: failed to seek to buffer lock (%d)", seekpos);
  1759.     _sjunpin(item);
  1760.     _sjunwait_io(item);
  1761.     return (SM_FAIL);
  1762.     }
  1763.  
  1764.     if (FileWrite(SJCacheVfd, buffer, BLCKSZ) != BLCKSZ) {
  1765.     elog(NOTICE, "sjwrite: can't read page %d", blocknum);
  1766.     _sjunwait_io(item);
  1767.     _sjunpin(item);
  1768.     return (SM_FAIL);
  1769.     }
  1770.  
  1771.     /* write the updated cache metadata entry */
  1772.     seekpos = grpno * sizeof(*item);
  1773.  
  1774.     if (FileSeek(SJMetaVfd, seekpos, L_SET) != seekpos) {
  1775.     elog(NOTICE, "sjwrite: seek to %d on metadata file failed", seekpos);
  1776.     _sjunwait_io(item);
  1777.     _sjunpin(item);
  1778.     return (SM_FAIL);
  1779.     }
  1780.  
  1781.     if (FileWrite(SJMetaVfd, (char *) item, sizeof(*item)) < 0) {
  1782.     elog(NOTICE, "sjwrite: write of metadata file failed");
  1783.     _sjunwait_io(item);
  1784.     _sjunpin(item);
  1785.     return (SM_FAIL);
  1786.     }
  1787.  
  1788.     _sjunwait_io(item);
  1789.     _sjunpin(item);
  1790.  
  1791.     return (SM_SUCCESS);
  1792. }
  1793.  
  1794. int
  1795. sjflush(reln, blocknum, buffer)
  1796.     Relation reln;
  1797.     BlockNumber blocknum;
  1798.     char *buffer;
  1799. {
  1800.     return (sjwrite(reln, blocknum, buffer));
  1801. }
  1802.  
  1803. int
  1804. sjblindwrt(dbstr, relstr, dbid, relid, blkno, buffer)
  1805.     char *dbstr;
  1806.     char *relstr;
  1807.     OID dbid;
  1808.     OID relid;
  1809.     BlockNumber blkno;
  1810.     char *buffer;
  1811. {
  1812.     return (SM_FAIL);
  1813. }
  1814.  
  1815. /*
  1816.  *  sjnblocks() -- Return the number of blocks that appear in this relation.
  1817.  *
  1818.  *    Rather than compute this by walking through pg_plmap and fetching
  1819.  *    groups off of platters, we store the number of blocks currently
  1820.  *    allocated to a relation in a special Unix file.
  1821.  */
  1822.  
  1823. int
  1824. sjnblocks(reln)
  1825.     Relation reln;
  1826. {
  1827.     SJCacheTag tag;
  1828.     int nblocks;
  1829.  
  1830.     if (reln->rd_rel->relisshared)
  1831.     tag.sjct_dbid = (ObjectId) 0;
  1832.     else
  1833.     tag.sjct_dbid = MyDatabaseId;
  1834.  
  1835.     tag.sjct_relid = reln->rd_id;
  1836.  
  1837.     tag.sjct_base = (BlockNumber) _sjfindnblocks(&tag);
  1838.  
  1839.     return ((int) (tag.sjct_base));
  1840. }
  1841.  
  1842. /*
  1843.  *  _sjfindnblocks() -- Find block count for the (dbid,relid) pair.
  1844.  */
  1845.  
  1846. static int
  1847. _sjfindnblocks(tag)
  1848.     SJCacheTag *tag;
  1849. {
  1850.     int nbytes;
  1851.     int i;
  1852.     SJCacheTag *cachetag;
  1853.     SJCacheTag mytag;
  1854.  
  1855.     cachetag = SJNBlockCache;
  1856.     i = 0;
  1857.     while (i < SJNBLKSIZE && cachetag->sjct_relid != (ObjectId) 0) {
  1858.     if (cachetag->sjct_dbid == tag->sjct_dbid
  1859.         && cachetag->sjct_relid == tag->sjct_relid) {
  1860.         return (cachetag->sjct_base);
  1861.     }
  1862.     i++;
  1863.     cachetag++;
  1864.     }
  1865.  
  1866.     if (FileSeek(SJBlockVfd, 0L, L_SET) != 0) {
  1867.     elog(FATAL, "_sjfindnblocks: cannot seek to zero on block count file");
  1868.     }
  1869.  
  1870.     while ((nbytes = FileRead(SJBlockVfd, (char *)&mytag, sizeof(mytag))) > 0) {
  1871.     if (mytag.sjct_dbid == tag->sjct_dbid
  1872.         && mytag.sjct_relid == tag->sjct_relid) {
  1873.  
  1874.         if (i == SJNBLKSIZE) {
  1875.         /* fast pseudo-random function */
  1876.         i = mytag.sjct_relid % SJNBLKSIZE;
  1877.         cachetag = &(SJNBlockCache[i]);
  1878.         }
  1879.  
  1880.         /* save cache tag */
  1881.         cachetag->sjct_dbid = mytag.sjct_dbid;
  1882.         cachetag->sjct_relid = mytag.sjct_relid;
  1883.         cachetag->sjct_base = mytag.sjct_base;
  1884.  
  1885.         return (mytag.sjct_base);
  1886.     }
  1887.     }
  1888.  
  1889.     elog(FATAL, "_sjfindnblocks: cannot get block count for <%d,%d>",
  1890.         tag->sjct_dbid, tag->sjct_relid);
  1891. }
  1892.  
  1893. /*
  1894.  *  _sjregnblocks() -- Remember the count of blocks for this relid.
  1895.  */
  1896.  
  1897. static void
  1898. _sjregnblocks(tag)
  1899.     SJCacheTag *tag;
  1900. {
  1901.     int loc;
  1902.     int i;
  1903.     SJCacheTag *cachetag;
  1904.     SJCacheTag mytag;
  1905.  
  1906.     cachetag = SJNBlockCache;
  1907.     i = 0;
  1908.     while (i < SJNBLKSIZE && cachetag->sjct_relid != (ObjectId) 0) {
  1909.     if (cachetag->sjct_dbid == tag->sjct_dbid
  1910.         && cachetag->sjct_relid == tag->sjct_relid)
  1911.         break;
  1912.  
  1913.     i++;
  1914.     cachetag++;
  1915.     }
  1916.  
  1917.     if (i == SJNBLKSIZE) {
  1918.     i = tag->sjct_relid % SJNBLKSIZE;
  1919.     cachetag = &(SJNBlockCache[i]);
  1920.     }
  1921.  
  1922.     cachetag->sjct_dbid = tag->sjct_dbid;
  1923.     cachetag->sjct_relid = tag->sjct_relid;
  1924.     cachetag->sjct_base = tag->sjct_base;
  1925.  
  1926.     /* update block count file */
  1927.     if (FileSeek(SJBlockVfd, 0L, L_SET) < 0) {
  1928.     elog(FATAL, "_sjregnblocks: cannot seek to zero on block count file");
  1929.     }
  1930.  
  1931.     loc = 0;
  1932.     mytag.sjct_base = tag->sjct_base;
  1933.  
  1934.     /* overwrite existing entry, if any */
  1935.     while (FileRead(SJBlockVfd, (char *) &mytag, sizeof(mytag)) > 0) {
  1936.     if (mytag.sjct_dbid == tag->sjct_dbid
  1937.         && mytag.sjct_relid == tag->sjct_relid) {
  1938.         if (FileSeek(SJBlockVfd, (loc * sizeof(SJCacheTag)), L_SET) < 0)
  1939.         elog(FATAL, "_sjregnblocks: cannot seek to loc");
  1940.         if (FileWrite(SJBlockVfd, (char *) tag, sizeof(*tag)) < 0)
  1941.         elog(FATAL, "_sjregnblocks: cannot write nblocks");
  1942.         return;
  1943.     }
  1944.     loc++;
  1945.     }
  1946.  
  1947.     /* new relation -- write at end of file */
  1948.     if (FileWrite(SJBlockVfd, (char *) tag, sizeof(*tag)) < 0)
  1949.     elog(FATAL, "_sjregnblocks: cannot write nblocks for new reln");
  1950. }
  1951. int
  1952. sjcommit()
  1953. {
  1954.     FileSync(SJMetaVfd);
  1955.     FileSync(SJCacheVfd);
  1956.     FileSync(SJBlockVfd);
  1957.  
  1958.     return (SM_SUCCESS);
  1959. }
  1960.  
  1961. int
  1962. sjabort()
  1963. {
  1964.     return (SM_SUCCESS);
  1965. }
  1966.  
  1967. /*
  1968.  *  SJShmemSize() -- Declare amount of shared memory we require.
  1969.  *
  1970.  *    The shared memory initialization code creates a block of shared
  1971.  *    memory exactly big enough to hold all the structures it needs to.
  1972.  *    This routine declares how much space the Sony jukebox cache will
  1973.  *    use.
  1974.  */
  1975.  
  1976. int
  1977. SJShmemSize()
  1978. {
  1979.     int size;
  1980.     int nbuckets;
  1981.     int nsegs;
  1982.     int tmp;
  1983.  
  1984.     /* size of cache metadata */
  1985.     size = ((SJCACHESIZE + 1) * sizeof(SJCacheItem)) + sizeof(SJCacheHeader);
  1986. #ifndef HAS_TEST_AND_SET
  1987.     size += sizeof(*SJNWaiting);
  1988. #endif /* ndef HAS_TEST_AND_SET */
  1989.  
  1990.     /* size of hash table */
  1991.     nbuckets = 1 << (int)my_log2((SJCACHESIZE - 1) / DEF_FFACTOR + 1);
  1992.     nsegs = 1 << (int)my_log2((nbuckets - 1) / DEF_SEGSIZE + 1);
  1993.     size += my_log2(SJCACHESIZE) + sizeof(HHDR);
  1994.     size += nsegs * DEF_SEGSIZE * sizeof(SEGMENT);
  1995.     tmp = (int)ceil((double)SJCACHESIZE/BUCKET_ALLOC_INCR);
  1996.     size += tmp * BUCKET_ALLOC_INCR *
  1997.             (sizeof(BUCKET_INDEX) + sizeof(SJHashEntry));
  1998.  
  1999.     /* nblock cache */
  2000.     size += SJNBLKSIZE * sizeof(SJCacheTag);
  2001.  
  2002.     /* count shared memory required for jukebox state */
  2003.     size += JBShmemSize();
  2004.  
  2005.     return (size);
  2006. }
  2007.  
  2008. /*
  2009.  *  sjmaxseg() -- Find highest segment number occupied by platter id plid
  2010.  *          in the on-disk cache.
  2011.  *
  2012.  *    This routine is called from _pgjb_findoffset().  On entry here,
  2013.  *    we hold JBSpinLock, but not SJCacheLock.  We do something a little
  2014.  *    dangerous here; we trust the group descriptor metadata that is in
  2015.  *    shared memory to reflect accurately the state of the actual cache
  2016.  *    file.  This isn't so bad; if there's an inconsistency, there are
  2017.  *    exactly two possibilities:
  2018.  *
  2019.  *        +  There was a crash between metadata and cache update,
  2020.  *           and we'll figure that out later;
  2021.  *
  2022.  *        +  Some other backend has IO_IN_PROG set on the group we
  2023.  *           are examining, and we need to look at the group desc
  2024.  *           on disk in order to find out if the group is on plid.
  2025.  *
  2026.  *    The second case basically means that we wind up holding SJCacheLock
  2027.  *    during a disk io, but that's a sufficiently rare event that we don't
  2028.  *    care.  I can't think of any cleaner way to do this, anyway.
  2029.  *
  2030.  *    We return the address of the first block of the highest-numbered
  2031.  *    extent that we have cached for plid.  If we have none cached, we
  2032.  *    return InvalidBlockNumber.
  2033.  */
  2034.  
  2035. BlockNumber
  2036. sjmaxseg(plid)
  2037.     ObjectId plid;
  2038. {
  2039.     int i;
  2040.     long seekpos, loc;
  2041.     int nbytes;
  2042.     BlockNumber last;
  2043.     SJGroupDesc *group;
  2044.  
  2045.     /* XXX hold the lock for a walk of the entire cache */
  2046.     SpinAcquire(SJCacheLock);
  2047.  
  2048.     last = InvalidBlockNumber;
  2049.     group = (SJGroupDesc *) &(SJCacheBuf[0]);
  2050.  
  2051.     /*
  2052.      *  Walk backwards along the free list.  If we ever hit an unallocated
  2053.      *  block, we can stop searching.  Otherwise, we'll hit the head of the
  2054.      *  list when freeprev == -1.
  2055.      */
  2056.  
  2057.     for (i = SJHeader->sjh_freetail;
  2058.      i != -1 && SJCache[i].sjc_oid != InvalidObjectId;
  2059.      i = SJCache[i].sjc_freeprev) {
  2060.  
  2061.     /* if IO_IN_PROG is set, we need to look at the group desc on disk */
  2062.     if (SJCache[i].sjc_gflags & SJC_IOINPROG) {
  2063.         seekpos = i * SJBUFSIZE;
  2064.         if ((loc = FileSeek(SJCacheVfd, seekpos, L_SET)) != seekpos) {
  2065.         SpinRelease(SJCacheLock);
  2066.         elog(NOTICE, "sjmaxseg: cannot seek");
  2067.         return (-1);
  2068.         }
  2069.  
  2070.         nbytes = FileRead(SJCacheVfd, (char *) group, sizeof(SJGroupDesc));
  2071.         if (nbytes != sizeof(SJGroupDesc)) {
  2072.         SpinRelease(SJCacheLock);
  2073.         elog(NOTICE, "sjmaxseg: read of group desc %d failed", i);
  2074.         return (-1);
  2075.         }
  2076.  
  2077.         /* sanity checks */
  2078.         if (group->sjgd_magic != SJGDMAGIC
  2079.         || group->sjgd_version != SJGDVERSION) {
  2080.         elog(FATAL, "sjmaxseg: cache file corrupt.");
  2081.         }
  2082.  
  2083.         if (group->sjgd_plid == plid) {
  2084.         if (group->sjgd_jboffset > last || last == InvalidBlockNumber)
  2085.             last = group->sjgd_jboffset;
  2086.         }
  2087.     } else {
  2088.         if (SJCache[i].sjc_plid == plid) {
  2089.         if (SJCache[i].sjc_jboffset > last
  2090.             || last == InvalidBlockNumber) {
  2091.  
  2092.             last = SJCache[i].sjc_jboffset;
  2093.         }
  2094.         }
  2095.     }
  2096.     }
  2097.  
  2098.     SpinRelease(SJCacheLock);
  2099.  
  2100.     return (last);
  2101. }
  2102.  
  2103. static void
  2104. _sjdump()
  2105. {
  2106.     int i, j;
  2107.     int nentries;
  2108.     SJCacheItem *item;
  2109.  
  2110.     SpinAcquire(SJCacheLock);
  2111.  
  2112.     nentries = SJHeader->sjh_nentries;
  2113.  
  2114.     printf("jukebox cache metdata: size %d, %d entries, free head %d tail %d",
  2115.        SJCACHESIZE, nentries, SJHeader->sjh_freehead,
  2116.        SJHeader->sjh_freetail);
  2117.     if (SJHeader->sjh_flags & SJH_INITING)
  2118.     printf(", INITING");
  2119.     if (SJHeader->sjh_flags & SJH_INITED)
  2120.     printf(", INITED");
  2121.     printf("\n");
  2122.  
  2123.     for (i = 0; i < SJCACHESIZE; i++) {
  2124.     item = &SJCache[i];
  2125.     printf("    [%2d] <%ld,%ld,%ld> %d@%d next %d prev %d flags %s oid %ld\n",
  2126.            i, item->sjc_tag.sjct_dbid, item->sjc_tag.sjct_relid,
  2127.            item->sjc_tag.sjct_base, item->sjc_plid, item->sjc_jboffset,
  2128.            item->sjc_freenext, item->sjc_freeprev,
  2129.            (item->sjc_gflags & SJC_IOINPROG ? "IO_IN_PROG" : "CLEAR"),
  2130.            item->sjc_oid);
  2131.     printf("         ");
  2132.     for (j = 0; j < SJGRPSIZE; j++) {
  2133.         printf("[%d %c%c]", j,
  2134.                (item->sjc_flags[j] & SJC_MISSING ? 'm' : '-'),
  2135.                (item->sjc_flags[j] & SJC_ONPLATTER ? 'o' : '-'));
  2136.     }
  2137.     printf("\n");
  2138.     }
  2139.  
  2140.     SpinRelease(SJCacheLock);
  2141. }
  2142.  
  2143. /*
  2144.  *  SJInitSemaphore() -- Initialize the 'wait' semaphore for jukebox cache
  2145.  *             pages.
  2146.  *
  2147.  *    We only do this if we don't have test-and-set locks.
  2148.  */
  2149.  
  2150. SJInitSemaphore(key)
  2151.     IPCKey key;
  2152. {
  2153. #ifndef HAS_TEST_AND_SET
  2154.     int status;
  2155.  
  2156.     SJWaitSemId = IpcSemaphoreCreate(IPCKeyGetSJWaitSemaphoreKey(key),
  2157.                      1, IPCProtection, 0, &status);
  2158.     if (SJWaitSemId < 0) {
  2159.     elog(FATAL, "cannot create/attach jukebox semaphore");
  2160.     }
  2161. #else /* ndef HAS_TEST_AND_SET */
  2162.     return;
  2163. #endif /* ndef HAS_TEST_AND_SET */
  2164. }
  2165.  
  2166. #endif /* SONY_JUKEBOX */
  2167.