home *** CD-ROM | disk | FTP | other *** search
/ OS/2 Shareware BBS: 35 Internet / 35-Internet.zip / rsync221.zip / io.c < prev    next >
C/C++ Source or Header  |  1999-03-04  |  12KB  |  592 lines

  1. /* 
  2.    Copyright (C) Andrew Tridgell 1996
  3.    Copyright (C) Paul Mackerras 1996
  4.    
  5.    This program is free software; you can redistribute it and/or modify
  6.    it under the terms of the GNU General Public License as published by
  7.    the Free Software Foundation; either version 2 of the License, or
  8.    (at your option) any later version.
  9.    
  10.    This program is distributed in the hope that it will be useful,
  11.    but WITHOUT ANY WARRANTY; without even the implied warranty of
  12.    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  13.    GNU General Public License for more details.
  14.    
  15.    You should have received a copy of the GNU General Public License
  16.    along with this program; if not, write to the Free Software
  17.    Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
  18. */
  19.  
  20. /*
  21.   Utilities used in rsync 
  22.  
  23.   tridge, June 1996
  24.   */
  25. #include "rsync.h"
  26.  
  27. /* if no timeout is specified then use a 60 second select timeout */
  28. #define SELECT_TIMEOUT 60
  29.  
  30. static int io_multiplexing_out;
  31. static int io_multiplexing_in;
  32. static int multiplex_in_fd;
  33. static int multiplex_out_fd;
  34. static time_t last_io;
  35. static int eof_error=1;
  36. extern int verbose;
  37. extern int io_timeout;
  38. extern struct stats stats;
  39.  
  40. static int buffer_f_in = -1;
  41.  
  42. void setup_readbuffer(int f_in)
  43. {
  44.     buffer_f_in = f_in;
  45. }
  46.  
  47. static void check_timeout(void)
  48. {
  49.     time_t t;
  50.     
  51.     if (!io_timeout) return;
  52.  
  53.     if (!last_io) {
  54.         last_io = time(NULL);
  55.         return;
  56.     }
  57.  
  58.     t = time(NULL);
  59.  
  60.     if (last_io && io_timeout && (t-last_io) >= io_timeout) {
  61.         rprintf(FERROR,"io timeout after %d second - exiting\n", 
  62.             (int)(t-last_io));
  63.         exit_cleanup(RERR_TIMEOUT);
  64.     }
  65. }
  66.  
  67.  
  68. static char *read_buffer;
  69. static char *read_buffer_p;
  70. static int read_buffer_len;
  71. static int read_buffer_size;
  72. static int no_flush;
  73. static int no_flush_read;
  74.  
  75. /* read from a socket with IO timeout. return the number of
  76.    bytes read. If no bytes can be read then exit, never return
  77.    a number <= 0 */
  78. static int read_timeout(int fd, char *buf, int len)
  79. {
  80.     int n, ret=0;
  81.  
  82.     no_flush_read++;
  83.     io_flush();
  84.     no_flush_read--;
  85.  
  86.     while (ret == 0) {
  87.         fd_set fds;
  88.         struct timeval tv;
  89.  
  90.         FD_ZERO(&fds);
  91.         FD_SET(fd, &fds);
  92.         tv.tv_sec = io_timeout?io_timeout:SELECT_TIMEOUT;
  93.         tv.tv_usec = 0;
  94.  
  95.         if (select(fd+1, &fds, NULL, NULL, &tv) != 1) {
  96.             check_timeout();
  97.             continue;
  98.         }
  99.  
  100.         n = read(fd, buf, len);
  101.  
  102.         if (n > 0) {
  103.             buf += n;
  104.             len -= n;
  105.             ret += n;
  106.             if (io_timeout)
  107.                 last_io = time(NULL);
  108.             continue;
  109.         }
  110.  
  111.         if (n == -1 && errno == EINTR) {
  112.             continue;
  113.         }
  114.  
  115.         if (n == -1 && 
  116.             (errno == EAGAIN || errno == EWOULDBLOCK)) {
  117.             /* this shouldn't happen, if it does then
  118.                sleep for a short time to prevent us
  119.                chewing too much CPU */
  120.             u_sleep(100);
  121.             continue;
  122.         }
  123.  
  124.         if (n == 0) {
  125.             if (eof_error) {
  126.                 rprintf(FERROR,"unexpected EOF in read_timeout\n");
  127.             }
  128.             exit_cleanup(RERR_STREAMIO);
  129.         }
  130.  
  131.         rprintf(FERROR,"read error: %s\n", strerror(errno));
  132.         exit_cleanup(RERR_STREAMIO);
  133.     }
  134.  
  135.     return ret;
  136. }
  137.  
  138. /* continue trying to read len bytes - don't return until len
  139.    has been read */
  140. static void read_loop(int fd, char *buf, int len)
  141. {
  142.     while (len) {
  143.         int n = read_timeout(fd, buf, len);
  144.  
  145.         buf += n;
  146.         len -= n;
  147.     }
  148. }
  149.  
  150. /* read from the file descriptor handling multiplexing - 
  151.    return number of bytes read
  152.    never return <= 0 */
  153. static int read_unbuffered(int fd, char *buf, int len)
  154. {
  155.     static int remaining;
  156.     char ibuf[4];
  157.     int tag, ret=0;
  158.     char line[1024];
  159.  
  160.     if (!io_multiplexing_in || fd != multiplex_in_fd) 
  161.         return read_timeout(fd, buf, len);
  162.  
  163.     while (ret == 0) {
  164.         if (remaining) {
  165.             len = MIN(len, remaining);
  166.             read_loop(fd, buf, len);
  167.             remaining -= len;
  168.             ret = len;
  169.             continue;
  170.         }
  171.  
  172.         read_loop(fd, ibuf, 4);
  173.         tag = IVAL(ibuf, 0);
  174.  
  175.         remaining = tag & 0xFFFFFF;
  176.         tag = tag >> 24;
  177.  
  178.         if (tag == MPLEX_BASE) continue;
  179.  
  180.         tag -= MPLEX_BASE;
  181.  
  182.         if (tag != FERROR && tag != FINFO) {
  183.             rprintf(FERROR,"unexpected tag %d\n", tag);
  184.             exit_cleanup(RERR_STREAMIO);
  185.         }
  186.  
  187.         if (remaining > sizeof(line)-1) {
  188.             rprintf(FERROR,"multiplexing overflow %d\n\n", 
  189.                 remaining);
  190.             exit_cleanup(RERR_STREAMIO);
  191.         }
  192.  
  193.         read_loop(fd, line, remaining);
  194.         line[remaining] = 0;
  195.  
  196.         rprintf(tag,"%s", line);
  197.         remaining = 0;
  198.     }
  199.  
  200.     return ret;
  201. }
  202.  
  203.  
  204.  
  205. /* This function was added to overcome a deadlock problem when using
  206.  * ssh.  It looks like we can't allow our receive queue to get full or
  207.  * ssh will clag up. Uggh.  */
  208. static void read_check(int f)
  209. {
  210.     int n = 8192;
  211.  
  212.     if (f == -1) return;
  213.  
  214.     if (read_buffer_len == 0) {
  215.         read_buffer_p = read_buffer;
  216.     }
  217.  
  218.     if (n > MAX_READ_BUFFER/4)
  219.         n = MAX_READ_BUFFER/4;
  220.  
  221.     if (read_buffer_p != read_buffer) {
  222.         memmove(read_buffer,read_buffer_p,read_buffer_len);
  223.         read_buffer_p = read_buffer;
  224.     }
  225.  
  226.     if (n > (read_buffer_size - read_buffer_len)) {
  227.         read_buffer_size += n;
  228.         read_buffer = (char *)Realloc(read_buffer,read_buffer_size);
  229.         if (!read_buffer) out_of_memory("read check");      
  230.         read_buffer_p = read_buffer;      
  231.     }
  232.  
  233.     n = read_unbuffered(f,read_buffer+read_buffer_len,n);
  234.     read_buffer_len += n;
  235. }
  236.  
  237.  
  238. /* do a buffered read from fd. don't return until all N bytes
  239.    have been read. If all N can't be read then exit with an error */
  240. static void readfd(int fd,char *buffer,int N)
  241. {
  242.     int  ret;
  243.     int total=0;  
  244.     
  245.     if ((read_buffer_len < N) && (N < 1024)) {
  246.         read_check(buffer_f_in);
  247.     }
  248.     
  249.     while (total < N) {
  250.         if (read_buffer_len > 0 && buffer_f_in == fd) {
  251.             ret = MIN(read_buffer_len,N-total);
  252.             memcpy(buffer+total,read_buffer_p,ret);
  253.             read_buffer_p += ret;
  254.             read_buffer_len -= ret;
  255.             total += ret;
  256.             continue;
  257.         } 
  258.  
  259.         no_flush_read++;
  260.         io_flush();
  261.         no_flush_read--;
  262.  
  263.         ret = read_unbuffered(fd,buffer + total,N-total);
  264.         total += ret;
  265.     }
  266.  
  267.     stats.total_read += total;
  268. }
  269.  
  270.  
  271. int32 read_int(int f)
  272. {
  273.     char b[4];
  274.     int32 ret;
  275.  
  276.     readfd(f,b,4);
  277.     ret = IVAL(b,0);
  278.     if (ret == (int32)0xffffffff) return -1;
  279.     return ret;
  280. }
  281.  
  282. int64 read_longint(int f)
  283. {
  284.     extern int remote_version;
  285.     int64 ret;
  286.     char b[8];
  287.     ret = read_int(f);
  288.  
  289.     if ((int32)ret != (int32)0xffffffff) {
  290.         return ret;
  291.     }
  292.  
  293. #ifdef NO_INT64
  294.     rprintf(FERROR,"Integer overflow - attempted 64 bit offset\n");
  295.     exit_cleanup(RERR_UNSUPPORTED);
  296. #else
  297.     if (remote_version >= 16) {
  298.         readfd(f,b,8);
  299.         ret = IVAL(b,0) | (((int64)IVAL(b,4))<<32);
  300.     }
  301. #endif
  302.  
  303.     return ret;
  304. }
  305.  
  306. void read_buf(int f,char *buf,int len)
  307. {
  308.     readfd(f,buf,len);
  309. }
  310.  
  311. void read_sbuf(int f,char *buf,int len)
  312. {
  313.     read_buf(f,buf,len);
  314.     buf[len] = 0;
  315. }
  316.  
  317. unsigned char read_byte(int f)
  318. {
  319.     unsigned char c;
  320.     read_buf(f,(char *)&c,1);
  321.     return c;
  322. }
  323.  
  324.  
  325.  
  326. /* write len bytes to fd, possibly reading from buffer_f_in if set
  327.    in order to unclog the pipe. don't return until all len
  328.    bytes have been written */
  329. static void writefd_unbuffered(int fd,char *buf,int len)
  330. {
  331.     int total = 0;
  332.     fd_set w_fds, r_fds;
  333.     int fd_count, count;
  334.     struct timeval tv;
  335.     int reading=0;
  336.     int blocked=0;
  337.  
  338.     no_flush++;
  339.  
  340.     while (total < len) {
  341.         FD_ZERO(&w_fds);
  342.         FD_ZERO(&r_fds);
  343.         FD_SET(fd,&w_fds);
  344.         fd_count = fd+1;
  345.  
  346.         if (!no_flush_read) {
  347.             reading = (buffer_f_in != -1);
  348.         }
  349.  
  350.         if (reading) {
  351.             FD_SET(buffer_f_in,&r_fds);
  352.             if (buffer_f_in > fd) 
  353.                 fd_count = buffer_f_in+1;
  354.         }
  355.  
  356.         tv.tv_sec = io_timeout?io_timeout:SELECT_TIMEOUT;
  357.         tv.tv_usec = 0;
  358.  
  359.         count = select(fd_count,
  360.                    reading?&r_fds:NULL,
  361.                    &w_fds,NULL,
  362.                    &tv);
  363.  
  364.         if (count <= 0) {
  365.             check_timeout();
  366.             continue;
  367.         }
  368.  
  369.         if (reading && FD_ISSET(buffer_f_in, &r_fds)) {
  370.             read_check(buffer_f_in);
  371.         }
  372.  
  373.         if (FD_ISSET(fd, &w_fds)) {
  374.             int n = (len-total)>>blocked;
  375.             int ret = write(fd,buf+total,n?n:1);
  376.  
  377.             if (ret == -1 && errno == EINTR) {
  378.                 continue;
  379.             }
  380.  
  381.             if (ret == -1 && 
  382.                 (errno == EAGAIN || errno == EWOULDBLOCK)) {
  383.                 blocked++;
  384.                 continue;
  385.             }
  386.  
  387.             if (ret <= 0) {
  388.                 rprintf(FERROR,"erroring writing %d bytes - exiting\n", len);
  389.                 exit_cleanup(RERR_STREAMIO);
  390.             }
  391.  
  392.             blocked = 0;
  393.             total += ret;
  394.  
  395.             if (io_timeout)
  396.                 last_io = time(NULL);
  397.         }
  398.     }
  399.  
  400.     no_flush--;
  401. }
  402.  
  403.  
  404. static char *io_buffer;
  405. static int io_buffer_count;
  406.  
  407. void io_start_buffering(int fd)
  408. {
  409.     if (io_buffer) return;
  410.     multiplex_out_fd = fd;
  411.     io_buffer = (char *)malloc(IO_BUFFER_SIZE+4);
  412.     if (!io_buffer) out_of_memory("writefd");
  413.     io_buffer_count = 0;
  414.  
  415.     /* leave room for the multiplex header in case it's needed */
  416.     io_buffer += 4;
  417. }
  418.  
  419. void io_flush(void)
  420. {
  421.     int fd = multiplex_out_fd;
  422.     if (!io_buffer_count || no_flush) return;
  423.  
  424.     if (io_multiplexing_out) {
  425.         SIVAL(io_buffer-4, 0, (MPLEX_BASE<<24) + io_buffer_count);
  426.         writefd_unbuffered(fd, io_buffer-4, io_buffer_count+4);
  427.     } else {
  428.         writefd_unbuffered(fd, io_buffer, io_buffer_count);
  429.     }
  430.     io_buffer_count = 0;
  431. }
  432.  
  433. void io_end_buffering(int fd)
  434. {
  435.     io_flush();
  436.     if (!io_multiplexing_out) {
  437.         free(io_buffer-4);
  438.         io_buffer = NULL;
  439.     }
  440. }
  441.  
  442. static void writefd(int fd,char *buf,int len)
  443. {
  444.     stats.total_written += len;
  445.  
  446.     if (!io_buffer) {
  447.         writefd_unbuffered(fd, buf, len);
  448.         return;
  449.     }
  450.  
  451.     while (len) {
  452.         int n = MIN(len, IO_BUFFER_SIZE-io_buffer_count);
  453.         if (n > 0) {
  454.             memcpy(io_buffer+io_buffer_count, buf, n);
  455.             buf += n;
  456.             len -= n;
  457.             io_buffer_count += n;
  458.         }
  459.         
  460.         if (io_buffer_count == IO_BUFFER_SIZE) io_flush();
  461.     }
  462. }
  463.  
  464.  
  465. void write_int(int f,int32 x)
  466. {
  467.     char b[4];
  468.     SIVAL(b,0,x);
  469.     writefd(f,b,4);
  470. }
  471.  
  472. void write_longint(int f, int64 x)
  473. {
  474.     extern int remote_version;
  475.     char b[8];
  476.  
  477.     if (remote_version < 16 || x <= 0x7FFFFFFF) {
  478.         write_int(f, (int)x);
  479.         return;
  480.     }
  481.  
  482.     write_int(f, (int32)0xFFFFFFFF);
  483.     SIVAL(b,0,(x&0xFFFFFFFF));
  484.     SIVAL(b,4,((x>>32)&0xFFFFFFFF));
  485.  
  486.     writefd(f,b,8);
  487. }
  488.  
  489. void write_buf(int f,char *buf,int len)
  490. {
  491.     writefd(f,buf,len);
  492. }
  493.  
  494. /* write a string to the connection */
  495. static void write_sbuf(int f,char *buf)
  496. {
  497.     write_buf(f, buf, strlen(buf));
  498. }
  499.  
  500.  
  501. void write_byte(int f,unsigned char c)
  502. {
  503.     write_buf(f,(char *)&c,1);
  504. }
  505.  
  506. int read_line(int f, char *buf, int maxlen)
  507. {
  508.     eof_error = 0;
  509.  
  510.     while (maxlen) {
  511.         buf[0] = 0;
  512.         read_buf(f, buf, 1);
  513.         if (buf[0] == 0) return 0;
  514.         if (buf[0] == '\n') {
  515.             buf[0] = 0;
  516.             break;
  517.         }
  518.         if (buf[0] != '\r') {
  519.             buf++;
  520.             maxlen--;
  521.         }
  522.     }
  523.     if (maxlen == 0) {
  524.         *buf = 0;
  525.         return 0;
  526.     }
  527.  
  528.     eof_error = 1;
  529.  
  530.     return 1;
  531. }
  532.  
  533.  
  534. void io_printf(int fd, const char *format, ...)
  535. {
  536.     va_list ap;  
  537.     char buf[1024];
  538.     int len;
  539.     
  540.     va_start(ap, format);
  541.     len = vslprintf(buf, sizeof(buf), format, ap);
  542.     va_end(ap);
  543.  
  544.     if (len < 0) exit_cleanup(RERR_STREAMIO);
  545.  
  546.     write_sbuf(fd, buf);
  547. }
  548.  
  549.  
  550. /* setup for multiplexing an error stream with the data stream */
  551. void io_start_multiplex_out(int fd)
  552. {
  553.     multiplex_out_fd = fd;
  554.     io_flush();
  555.     io_start_buffering(fd);
  556.     io_multiplexing_out = 1;
  557. }
  558.  
  559. /* setup for multiplexing an error stream with the data stream */
  560. void io_start_multiplex_in(int fd)
  561. {
  562.     multiplex_in_fd = fd;
  563.     io_flush();
  564.     if (read_buffer_len) {
  565.         fprintf(stderr,"ERROR: data in read buffer at mplx start\n");
  566.         exit_cleanup(RERR_STREAMIO);
  567.     }
  568.  
  569.     io_multiplexing_in = 1;
  570. }
  571.  
  572. /* write an message to the error stream */
  573. int io_multiplex_write(int f, char *buf, int len)
  574. {
  575.     if (!io_multiplexing_out) return 0;
  576.  
  577.     io_flush();
  578.  
  579.     SIVAL(io_buffer-4, 0, ((MPLEX_BASE + f)<<24) + len);
  580.     memcpy(io_buffer, buf, len);
  581.  
  582.     stats.total_written += (len+4);
  583.  
  584.     writefd_unbuffered(multiplex_out_fd, io_buffer-4, len+4);
  585.     return 1;
  586. }
  587.  
  588. void io_close_input(int fd)
  589. {
  590.     buffer_f_in = -1;
  591. }
  592.