home *** CD-ROM | disk | FTP | other *** search
/ Programming Languages Suite / JBuilder8.iso / Solaris / resource / jre / demo / jni / Poller / PollingServer.java < prev    next >
Encoding:
Java Source  |  2002-09-06  |  6.6 KB  |  233 lines

  1. /*
  2.  * @(#)PollingServer.java    1.6 01/12/03
  3.  *
  4.  * Copyright 2002 Sun Microsystems, Inc. All rights reserved.
  5.  * SUN PROPRIETARY/CONFIDENTIAL. Use is subject to license terms.
  6.  */
  7.  
  8. import java.io.*;
  9. import java.net.*;
  10. import java.lang.Byte;
  11.  
  12. /**
  13.  * Simple Java "server" using the Poller class
  14.  * to multiplex on incoming connections.  Note
  15.  * that handoff of events, via linked Q is not
  16.  * actually be a performance booster here, since
  17.  * the processing of events is cheaper than
  18.  * the overhead in scheduling/executing them.
  19.  * Although this demo does allow for concurrency
  20.  * in handling connections, it uses a rather
  21.  * primitive "gang scheduling" policy to keep
  22.  * the code simpler.
  23.  */
  24.  
  25. public class PollingServer
  26. {
  27.   public final static int MAXCONN    = 10000;
  28.   public final static int PORTNUM    = 4444;
  29.   public final static int BYTESPEROP = 10;
  30.  
  31.   /**
  32.    * This synchronization object protects access to certain
  33.    * data (bytesRead,eventsToProcess) by concurrent Consumer threads.
  34.    */
  35.   private final static Object eventSync = new Object();
  36.  
  37.   private static InputStream[] instr = new InputStream[MAXCONN];
  38.   private static int[] mapping = new int[65535];
  39.   private static LinkedQueue linkedQ = new LinkedQueue();
  40.   private static int bytesRead = 0;
  41.   private static int bytesToRead;
  42.   private static int eventsToProcess=0;
  43.  
  44.   public PollingServer(int concurrency) {
  45.     Socket[] sockArr = new Socket[MAXCONN];
  46.     long timestart, timestop;
  47.     short[] revents = new short[MAXCONN];
  48.     int[] fds = new int[MAXCONN];
  49.     int bytes;
  50.     Poller Mux;
  51.     int serverFd;
  52.     int totalConn=0;
  53.     int connects=0;
  54.  
  55.     System.out.println ("Serv: Initializing port " + PORTNUM);
  56.     try {
  57.  
  58.       ServerSocket skMain = new ServerSocket (PORTNUM);
  59.       /*
  60.        * Create the Poller object Mux, allow for up to MAXCONN
  61.        * sockets/filedescriptors to be polled.
  62.        */
  63.       Mux = new Poller(MAXCONN);
  64.       serverFd = Mux.add(skMain, Poller.POLLIN);
  65.  
  66.       Socket ctrlSock = skMain.accept();
  67.  
  68.       BufferedReader ctrlReader =
  69.     new BufferedReader(new InputStreamReader(ctrlSock.getInputStream()));
  70.       String ctrlString = ctrlReader.readLine();
  71.       bytesToRead = Integer.valueOf(ctrlString).intValue();
  72.       ctrlString = ctrlReader.readLine();
  73.       totalConn = Integer.valueOf(ctrlString).intValue();
  74.  
  75.       System.out.println("Receiving " + bytesToRead + " bytes from " +
  76.              totalConn + " client connections");
  77.       
  78.       timestart = System.currentTimeMillis();
  79.  
  80.       /*
  81.        * Start the consumer threads to read data.
  82.        */
  83.       for (int consumerThread = 0;
  84.        consumerThread < concurrency; consumerThread++ ) {
  85.     new Consumer(consumerThread).start();
  86.       }
  87.       
  88.       /*
  89.        * Take connections, read Data
  90.        */
  91.       int numEvents=0;
  92.  
  93.       while ( bytesRead < bytesToRead ) {
  94.  
  95.     int loopWaits=0;
  96.     while (eventsToProcess > 0) {
  97.       synchronized (eventSync) {
  98.         loopWaits++;
  99.         if (eventsToProcess <= 0) break;
  100.         try { eventSync.wait(); } catch (Exception e) {e.printStackTrace();};
  101.       }
  102.     }
  103.     if (loopWaits > 1)
  104.       System.out.println("Done waiting...loops = " + loopWaits +
  105.                  " events " + numEvents +
  106.                  " bytes read : " + bytesRead );
  107.  
  108.     if (bytesRead >= bytesToRead) break; // may be done!
  109.  
  110.     /*
  111.      * Wait for events
  112.      */
  113.     numEvents = Mux.waitMultiple(100, fds, revents);
  114.     synchronized (eventSync) {
  115.       eventsToProcess = numEvents;
  116.     }
  117.     /*
  118.      * Process all the events we got from Mux.waitMultiple
  119.      */
  120.     int cnt = 0;
  121.     while ( (cnt < numEvents) && (bytesRead < bytesToRead) ) {
  122.       int fd = fds[cnt];
  123.       
  124.       if (revents[cnt] == Poller.POLLIN) {
  125.         if (fd == serverFd) {
  126.           /*
  127.            * New connection coming in on the ServerSocket
  128.            * Add the socket to the Mux, keep track of mapping
  129.            * the fdval returned by Mux.add to the connection.
  130.            */
  131.           sockArr[connects] = skMain.accept();
  132.           instr[connects] = sockArr[connects].getInputStream();
  133.           int fdval = Mux.add(sockArr[connects], Poller.POLLIN);
  134.           mapping[fdval] = connects;
  135.           synchronized(eventSync) {
  136.         eventsToProcess--; // just processed this one!
  137.           }
  138.           connects++;
  139.         } else {
  140.           /*
  141.            * We've got data from this client connection.
  142.            * Put it on the queue for the consumer threads to process.
  143.            */
  144.           linkedQ.put(new Integer(fd));
  145.         }
  146.       } else {
  147.         System.out.println("Got revents[" + cnt + "] == " + revents[cnt]);
  148.       }
  149.       cnt++;
  150.     }
  151.       }
  152.       timestop = System.currentTimeMillis();
  153.       System.out.println("Time for all reads (" + totalConn +
  154.              " sockets) : " + (timestop-timestart));
  155.  
  156.       // Tell the client it can now go away
  157.       byte[] buff = new byte[BYTESPEROP];
  158.       ctrlSock.getOutputStream().write(buff,0,BYTESPEROP);
  159.       
  160.       // Tell the cunsumer threads they can exit.
  161.       for (int cThread = 0; cThread < concurrency; cThread++ ) {
  162.     linkedQ.put(new Integer(-1));
  163.       }
  164.     } catch (Exception exc) { exc.printStackTrace(); }
  165.   }
  166.  
  167.   /*
  168.    * main ... just check if a concurrency was specified
  169.    */
  170.   public static void main (String args[])
  171.   {
  172.     int concurrency;
  173.  
  174.     if (args.length == 1)
  175.       concurrency = java.lang.Integer.valueOf(args[0]).intValue();
  176.     else
  177.       concurrency = Poller.getNumCPUs() + 1;
  178.     PollingServer server = new PollingServer(concurrency);
  179.   }
  180.  
  181.   /*
  182.    * This class is for handling the Client data.
  183.    * The PollingServer spawns off a number of these based upon
  184.    * the number of CPUs (or concurrency argument).
  185.    * Each just loops grabbing events off the queue and
  186.    * processing them.
  187.    */
  188.   class Consumer extends Thread {
  189.     private int threadNumber;
  190.     public Consumer(int i) { threadNumber = i; }
  191.  
  192.     public void run() {
  193.       byte[] buff = new byte[BYTESPEROP];
  194.       int bytes = 0;
  195.  
  196.       InputStream instream;
  197.       while (bytesRead < bytesToRead) {
  198.     try {
  199.       Integer Fd = (Integer) linkedQ.take();
  200.       int fd = Fd.intValue();
  201.       if (fd == -1) break; /* got told we could exit */
  202.  
  203.       /*
  204.        * We have to map the fd value returned from waitMultiple
  205.        * to the actual input stream associated with that fd.
  206.        * Take a look at how the Mux.add() was done to see how
  207.        * we stored that.
  208.        */
  209.       int map = mapping[fd];
  210.       instream = instr[map];
  211.       bytes = instream.read(buff,0,BYTESPEROP);
  212.     } catch (Exception e) { System.out.println(e.toString()); }
  213.  
  214.     if (bytes > 0) {
  215.       /*
  216.        * Any real server would do some synchronized and some
  217.        * unsynchronized work on behalf of the client, and
  218.        * most likely send some data back...but this is a
  219.        * gross oversimplification.
  220.        */
  221.       synchronized(eventSync) {
  222.         bytesRead += bytes;
  223.         eventsToProcess--;
  224.         if (eventsToProcess <= 0) {
  225.           eventSync.notify();
  226.         }
  227.       }
  228.     }
  229.       }
  230.     }
  231.   }
  232. }
  233.