home *** CD-ROM | disk | FTP | other *** search
/ ftp.parl.clemson.edu / 2015-02-07.ftp.parl.clemson.edu.tar / ftp.parl.clemson.edu / pub / pvfs2 / orangefs-2.8.3-20110323.tar.gz / orangefs-2.8.3-20110323.tar / orangefs / src / io / bmi / bmi_tcp / bmi-tcp.c < prev    next >
C/C++ Source or Header  |  2009-08-14  |  119KB  |  4,091 lines

  1. /*
  2.  * (C) 2001 Clemson University and The University of Chicago
  3.  *
  4.  * See COPYING in top-level directory.
  5.  */
  6.  
  7. /* TCP/IP implementation of a BMI method */
  8.  
  9. #include <errno.h>
  10. #include <string.h>
  11. #include <unistd.h>
  12. #include <fcntl.h>
  13. #include <sys/poll.h>
  14. #include <netinet/tcp.h>
  15. #include <assert.h>
  16. #include <sys/uio.h>
  17. #include <time.h>
  18. #include <sys/time.h>
  19. #include <sys/socket.h>
  20. #include <netinet/in.h>
  21. #include <arpa/inet.h>
  22. #include "pint-mem.h"
  23.  
  24. #include "pvfs2-config.h"
  25. #ifdef HAVE_NETDB_H
  26. #include <netdb.h>
  27. #endif
  28.  
  29. #include "bmi-method-support.h"
  30. #include "bmi-method-callback.h"
  31. #include "bmi-tcp-addressing.h"
  32. #ifdef __PVFS2_USE_EPOLL__
  33. #include "socket-collection-epoll.h"
  34. #else
  35. #include "socket-collection.h"
  36. #endif
  37. #include "op-list.h"
  38. #include "gossip.h"
  39. #include "sockio.h"
  40. #include "bmi-byteswap.h"
  41. #include "id-generator.h"
  42. #include "pint-event.h"
  43. #include "pvfs2-debug.h"
  44. #ifdef USE_TRUSTED
  45. #include "server-config.h"
  46. #include "bmi-tcp-addressing.h"
  47. #endif
  48. #include "gen-locks.h"
  49. #include "pint-hint.h"
  50. #include "pint-event.h"
  51.  
  52. static gen_mutex_t interface_mutex = GEN_MUTEX_INITIALIZER;
  53. static gen_cond_t interface_cond = GEN_COND_INITIALIZER;
  54. static int sc_test_busy = 0;
  55.  
  56. /* function prototypes */
  57. int BMI_tcp_initialize(bmi_method_addr_p listen_addr,
  58.                int method_id,
  59.                int init_flags);
  60. int BMI_tcp_finalize(void);
  61. int BMI_tcp_set_info(int option,
  62.              void *inout_parameter);
  63. int BMI_tcp_get_info(int option,
  64.              void *inout_parameter);
  65. void *BMI_tcp_memalloc(bmi_size_t size,
  66.                enum bmi_op_type send_recv);
  67. int BMI_tcp_memfree(void *buffer,
  68.             bmi_size_t size,
  69.             enum bmi_op_type send_recv);
  70. int BMI_tcp_unexpected_free(void *buffer);
  71. int BMI_tcp_post_send(bmi_op_id_t * id,
  72.               bmi_method_addr_p dest,
  73.               const void *buffer,
  74.               bmi_size_t size,
  75.               enum bmi_buffer_type buffer_type,
  76.               bmi_msg_tag_t tag,
  77.               void *user_ptr,
  78.               bmi_context_id context_id,
  79.                       PVFS_hint hints);
  80. int BMI_tcp_post_sendunexpected(bmi_op_id_t * id,
  81.                 bmi_method_addr_p dest,
  82.                 const void *buffer,
  83.                 bmi_size_t size,
  84.                 enum bmi_buffer_type buffer_type,
  85.                 bmi_msg_tag_t tag,
  86.                 void *user_ptr,
  87.                 bmi_context_id context_id,
  88.                                 PVFS_hint hints);
  89. int BMI_tcp_post_recv(bmi_op_id_t * id,
  90.               bmi_method_addr_p src,
  91.               void *buffer,
  92.               bmi_size_t expected_size,
  93.               bmi_size_t * actual_size,
  94.               enum bmi_buffer_type buffer_type,
  95.               bmi_msg_tag_t tag,
  96.               void *user_ptr,
  97.               bmi_context_id context_id,
  98.                       PVFS_hint hints);
  99. int BMI_tcp_test(bmi_op_id_t id,
  100.          int *outcount,
  101.          bmi_error_code_t * error_code,
  102.          bmi_size_t * actual_size,
  103.          void **user_ptr,
  104.          int max_idle_time_ms,
  105.          bmi_context_id context_id);
  106. int BMI_tcp_testsome(int incount,
  107.              bmi_op_id_t * id_array,
  108.              int *outcount,
  109.              int *index_array,
  110.              bmi_error_code_t * error_code_array,
  111.              bmi_size_t * actual_size_array,
  112.              void **user_ptr_array,
  113.              int max_idle_time_ms,
  114.              bmi_context_id context_id);
  115. int BMI_tcp_testunexpected(int incount,
  116.                int *outcount,
  117.                struct bmi_method_unexpected_info *info,
  118.                int max_idle_time_ms);
  119. int BMI_tcp_testcontext(int incount,
  120.              bmi_op_id_t * out_id_array,
  121.              int *outcount,
  122.              bmi_error_code_t * error_code_array,
  123.              bmi_size_t * actual_size_array,
  124.              void **user_ptr_array,
  125.              int max_idle_time_ms,
  126.              bmi_context_id context_id);
  127. bmi_method_addr_p BMI_tcp_method_addr_lookup(const char *id_string);
  128. const char* BMI_tcp_addr_rev_lookup_unexpected(bmi_method_addr_p map);
  129. int BMI_tcp_query_addr_range(bmi_method_addr_p, const char *, int);
  130. int BMI_tcp_post_send_list(bmi_op_id_t * id,
  131.                bmi_method_addr_p dest,
  132.                const void *const *buffer_list,
  133.                const bmi_size_t *size_list,
  134.                int list_count,
  135.                bmi_size_t total_size,
  136.                enum bmi_buffer_type buffer_type,
  137.                bmi_msg_tag_t tag,
  138.                void *user_ptr,
  139.                bmi_context_id context_id,
  140.                            PVFS_hint hints);
  141. int BMI_tcp_post_recv_list(bmi_op_id_t * id,
  142.                bmi_method_addr_p src,
  143.                void *const *buffer_list,
  144.                const bmi_size_t *size_list,
  145.                int list_count,
  146.                bmi_size_t total_expected_size,
  147.                bmi_size_t * total_actual_size,
  148.                enum bmi_buffer_type buffer_type,
  149.                bmi_msg_tag_t tag,
  150.                void *user_ptr,
  151.                bmi_context_id context_id,
  152.                            PVFS_hint hints);
  153. int BMI_tcp_post_sendunexpected_list(bmi_op_id_t * id,
  154.                      bmi_method_addr_p dest,
  155.                      const void *const *buffer_list,
  156.                      const bmi_size_t *size_list,
  157.                      int list_count,
  158.                      bmi_size_t total_size,
  159.                      enum bmi_buffer_type buffer_type,
  160.                      bmi_msg_tag_t tag,
  161.                      void *user_ptr,
  162.                      bmi_context_id context_id,
  163.                                      PVFS_hint hints);
  164. int BMI_tcp_open_context(bmi_context_id context_id);
  165. void BMI_tcp_close_context(bmi_context_id context_id);
  166. int BMI_tcp_cancel(bmi_op_id_t id, bmi_context_id context_id);
  167.  
  168. char BMI_tcp_method_name[] = "bmi_tcp";
  169.  
  170. /* size of encoded message header */
  171. #define TCP_ENC_HDR_SIZE 24
  172.  
  173. /* structure internal to tcp for use as a message header */
  174. struct tcp_msg_header
  175. {
  176.     uint32_t magic_nr;          /* magic number */
  177.     uint32_t mode;        /* eager, rendezvous, etc. */
  178.     bmi_msg_tag_t tag;        /* user specified message tag */
  179.     bmi_size_t size;        /* length of trailing message */
  180.     char enc_hdr[TCP_ENC_HDR_SIZE];  /* encoded version of header info */
  181. };
  182.  
  183. #define BMI_TCP_ENC_HDR(hdr)                        \
  184.     do {                                \
  185.     *((uint32_t*)&((hdr).enc_hdr[0])) = htobmi32((hdr).magic_nr);    \
  186.     *((uint32_t*)&((hdr).enc_hdr[4])) = htobmi32((hdr).mode);    \
  187.     *((uint64_t*)&((hdr).enc_hdr[8])) = htobmi64((hdr).tag);    \
  188.     *((uint64_t*)&((hdr).enc_hdr[16])) = htobmi64((hdr).size);    \
  189.     } while(0)                            
  190.  
  191. #define BMI_TCP_DEC_HDR(hdr)                        \
  192.     do {                                \
  193.     (hdr).magic_nr = bmitoh32(*((uint32_t*)&((hdr).enc_hdr[0])));    \
  194.     (hdr).mode = bmitoh32(*((uint32_t*)&((hdr).enc_hdr[4])));    \
  195.     (hdr).tag = bmitoh64(*((uint64_t*)&((hdr).enc_hdr[8])));    \
  196.     (hdr).size = bmitoh64(*((uint64_t*)&((hdr).enc_hdr[16])));    \
  197.     } while(0)                            
  198.  
  199. /* enumerate states that we care about */
  200. enum bmi_tcp_state
  201. {
  202.     BMI_TCP_INPROGRESS,
  203.     BMI_TCP_BUFFERING,
  204.     BMI_TCP_COMPLETE
  205. };
  206.  
  207. /* tcp private portion of operation structure */
  208. struct tcp_op
  209. {
  210.     struct tcp_msg_header env;    /* envelope for this message */
  211.     enum bmi_tcp_state tcp_op_state;
  212.     /* these two fields are used as place holders for the buffer
  213.      * list and size list when we really don't have lists (regular
  214.      * BMI_send or BMI_recv operations); it allows us to use
  215.      * generic code to handle both cases 
  216.      */
  217.     void *buffer_list_stub;
  218.     bmi_size_t size_list_stub;
  219. };
  220.  
  221. /* static io vector for use with readv and writev; we can only use
  222.  * this because BMI serializes module calls
  223.  */
  224. #define BMI_TCP_IOV_COUNT 10
  225. static struct iovec stat_io_vector[BMI_TCP_IOV_COUNT+1];
  226.  
  227. /* internal utility functions */
  228. static int tcp_server_init(void);
  229. static void dealloc_tcp_method_addr(bmi_method_addr_p map);
  230. static int tcp_sock_init(bmi_method_addr_p my_method_addr);
  231. static int enqueue_operation(op_list_p target_list,
  232.                  enum bmi_op_type send_recv,
  233.                  bmi_method_addr_p map,
  234.                  void *const *buffer_list,
  235.                  const bmi_size_t *size_list,
  236.                  int list_count,
  237.                  bmi_size_t amt_complete,
  238.                  bmi_size_t env_amt_complete,
  239.                  bmi_op_id_t * id,
  240.                  int tcp_op_state,
  241.                  struct tcp_msg_header header,
  242.                  void *user_ptr,
  243.                  bmi_size_t actual_size,
  244.                  bmi_size_t expected_size,
  245.                  bmi_context_id context_id,
  246.                              int32_t event_id);
  247. static int tcp_cleanse_addr(bmi_method_addr_p map, int error_code);
  248. static int tcp_shutdown_addr(bmi_method_addr_p map);
  249. static int tcp_do_work(int max_idle_time);
  250. static int tcp_do_work_error(bmi_method_addr_p map);
  251. static int tcp_do_work_recv(bmi_method_addr_p map, int* stall_flag);
  252. static int tcp_do_work_send(bmi_method_addr_p map, int* stall_flag);
  253. static int work_on_recv_op(method_op_p my_method_op,
  254.                int *stall_flag);
  255. static int work_on_send_op(method_op_p my_method_op,
  256.                int *blocked_flag, int* stall_flag);
  257. static int tcp_accept_init(int *socket, char** peer);
  258. static method_op_p alloc_tcp_method_op(void);
  259. static void dealloc_tcp_method_op(method_op_p old_op);
  260. static int handle_new_connection(bmi_method_addr_p map);
  261. static int tcp_post_send_generic(bmi_op_id_t * id,
  262.                                  bmi_method_addr_p dest,
  263.                                  const void *const *buffer_list,
  264.                                  const bmi_size_t *size_list,
  265.                                  int list_count,
  266.                                  enum bmi_buffer_type buffer_type,
  267.                                  struct tcp_msg_header my_header,
  268.                                  void *user_ptr,
  269.                                  bmi_context_id context_id,
  270.                                  PVFS_hint hints);
  271. static int tcp_post_recv_generic(bmi_op_id_t * id,
  272.                                  bmi_method_addr_p src,
  273.                                  void *const *buffer_list,
  274.                                  const bmi_size_t *size_list,
  275.                                  int list_count,
  276.                                  bmi_size_t expected_size,
  277.                                  bmi_size_t * actual_size,
  278.                                  enum bmi_buffer_type buffer_type,
  279.                                  bmi_msg_tag_t tag,
  280.                                  void *user_ptr,
  281.                                  bmi_context_id context_id,
  282.                                  PVFS_hint hints);
  283. static int payload_progress(int s, void *const *buffer_list, const bmi_size_t* 
  284.     size_list, int list_count, bmi_size_t total_size, int* list_index, 
  285.     bmi_size_t* current_index_complete, enum bmi_op_type send_recv, 
  286.     char* enc_hdr, bmi_size_t* env_amt_complete);
  287.  
  288. #if defined(USE_TRUSTED) && defined(__PVFS2_CLIENT__)
  289. static int tcp_enable_trusted(struct tcp_addr *tcp_addr_data);
  290. #endif
  291. #if defined(USE_TRUSTED) && defined(__PVFS2_SERVER__)
  292. static int tcp_allow_trusted(struct sockaddr_in *peer_sockaddr);
  293. #endif
  294.  
  295. static void bmi_set_sock_buffers(int socket);
  296.  
  297. /* exported method interface */
  298. const struct bmi_method_ops bmi_tcp_ops = {
  299.     .method_name = BMI_tcp_method_name,
  300.     .initialize = BMI_tcp_initialize,
  301.     .finalize = BMI_tcp_finalize,
  302.     .set_info = BMI_tcp_set_info,
  303.     .get_info = BMI_tcp_get_info,
  304.     .memalloc = BMI_tcp_memalloc,
  305.     .memfree  = BMI_tcp_memfree,
  306.     .unexpected_free = BMI_tcp_unexpected_free,
  307.     .post_send = BMI_tcp_post_send,
  308.     .post_sendunexpected = BMI_tcp_post_sendunexpected,
  309.     .post_recv = BMI_tcp_post_recv,
  310.     .test = BMI_tcp_test,
  311.     .testsome = BMI_tcp_testsome,
  312.     .testcontext = BMI_tcp_testcontext,
  313.     .testunexpected = BMI_tcp_testunexpected,
  314.     .method_addr_lookup = BMI_tcp_method_addr_lookup,
  315.     .post_send_list = BMI_tcp_post_send_list,
  316.     .post_recv_list = BMI_tcp_post_recv_list,
  317.     .post_sendunexpected_list = BMI_tcp_post_sendunexpected_list,
  318.     .open_context = BMI_tcp_open_context,
  319.     .close_context = BMI_tcp_close_context,
  320.     .cancel = BMI_tcp_cancel,
  321.     .rev_lookup_unexpected = BMI_tcp_addr_rev_lookup_unexpected,
  322.     .query_addr_range = BMI_tcp_query_addr_range,
  323. };
  324.  
  325. /* module parameters */
  326. static struct
  327. {
  328.     int method_flags;
  329.     int method_id;
  330.     bmi_method_addr_p listen_addr;
  331. } tcp_method_params;
  332.  
  333. #if defined(USE_TRUSTED) && defined(__PVFS2_SERVER__)
  334. static struct tcp_allowed_connection_s *gtcp_allowed_connection = NULL;
  335. #endif
  336.  
  337. static int check_unexpected = 1;
  338.  
  339. /* op_list_array indices */
  340. enum
  341. {
  342.     NUM_INDICES = 5,
  343.     IND_SEND = 0,
  344.     IND_RECV = 1,
  345.     IND_RECV_INFLIGHT = 2,
  346.     IND_RECV_EAGER_DONE_BUFFERING = 3,
  347.     IND_COMPLETE_RECV_UNEXP = 4,    /* MAKE THIS COMES LAST */
  348. };
  349.  
  350. /* internal operation lists */
  351. static op_list_p op_list_array[6] = { NULL, NULL, NULL, NULL,
  352.     NULL, NULL
  353. };
  354.  
  355. /* internal completion queues */
  356. static op_list_p completion_array[BMI_MAX_CONTEXTS] = { NULL };
  357.  
  358. /* internal socket collection */
  359. static socket_collection_p tcp_socket_collection_p = NULL;
  360.  
  361. /* tunable parameters */
  362. enum
  363. {
  364.     /* amount of pending connections we'll allow */
  365.     TCP_BACKLOG = 256,
  366.     /* amount of work to be done during a test.  This roughly 
  367.      * translates into the number of sockets that we will perform
  368.      * nonblocking operations on during one function call.
  369.      */
  370.     TCP_WORK_METRIC = 128
  371. };
  372.  
  373. /* TCP message modes */
  374. enum
  375. {
  376.     TCP_MODE_IMMED = 1,        /* not used for TCP/IP */
  377.     TCP_MODE_UNEXP = 2,
  378.     TCP_MODE_EAGER = 4,
  379.     TCP_MODE_REND = 8
  380. };
  381.  
  382. /* Allowable sizes for each mode */
  383. enum
  384. {
  385.     TCP_MODE_EAGER_LIMIT = 16384,    /* 16K */
  386.     TCP_MODE_REND_LIMIT = 16777216    /* 16M */
  387. };
  388.  
  389. /* toggles cancel mode; for bmi_tcp this will result in socket being closed
  390.  * in all cancellation cases
  391.  */
  392. static int forceful_cancel_mode = 0;
  393.  
  394. /*
  395.   Socket buffer sizes, currently these default values will be used 
  396.   for the clients... (TODO)
  397.  */
  398. static int tcp_buffer_size_receive = 0;
  399. static int tcp_buffer_size_send = 0;
  400.  
  401. static PINT_event_type bmi_tcp_send_event_id;
  402. static PINT_event_type bmi_tcp_recv_event_id;
  403.  
  404. static PINT_event_group bmi_tcp_event_group;
  405. static pid_t bmi_tcp_pid;
  406.  
  407. /*************************************************************************
  408.  * Visible Interface 
  409.  */
  410.  
  411. /* BMI_tcp_initialize()
  412.  *
  413.  * Initializes the tcp method.  Must be called before any other tcp
  414.  * method functions.
  415.  *
  416.  * returns 0 on success, -errno on failure
  417.  */
  418. int BMI_tcp_initialize(bmi_method_addr_p listen_addr,
  419.                        int method_id,
  420.                        int init_flags)
  421. {
  422.  
  423.     int ret = -1;
  424.     int tmp_errno = bmi_tcp_errno_to_pvfs(-ENOSYS);
  425.     struct tcp_addr *tcp_addr_data = NULL;
  426.     int i = 0;
  427.  
  428.     gossip_ldebug(GOSSIP_BMI_DEBUG_TCP, "Initializing TCP/IP module.\n");
  429.  
  430.     /* check args */
  431.     if ((init_flags & BMI_INIT_SERVER) && !listen_addr)
  432.     {
  433.         gossip_lerr("Error: bad parameters given to TCP/IP module.\n");
  434.         return (bmi_tcp_errno_to_pvfs(-EINVAL));
  435.     }
  436.  
  437.     gen_mutex_lock(&interface_mutex);
  438.  
  439.     /* zero out our parameter structure and fill it in */
  440.     memset(&tcp_method_params, 0, sizeof(tcp_method_params));
  441.     tcp_method_params.method_id = method_id;
  442.     tcp_method_params.method_flags = init_flags;
  443.  
  444.     if (init_flags & BMI_INIT_SERVER)
  445.     {
  446.         /* hang on to our local listening address if needed */
  447.         tcp_method_params.listen_addr = listen_addr;
  448.         /* and initialize server functions */
  449.         ret = tcp_server_init();
  450.         if (ret < 0)
  451.         {
  452.             tmp_errno = bmi_tcp_errno_to_pvfs(ret);
  453.             gossip_err("Error: tcp_server_init() failure.\n");
  454.             goto initialize_failure;
  455.         }
  456.     }
  457.  
  458.     /* set up the operation lists */
  459.     for (i = 0; i < NUM_INDICES; i++)
  460.     {
  461.         op_list_array[i] = op_list_new();
  462.         if (!op_list_array[i])
  463.         {
  464.             tmp_errno = bmi_tcp_errno_to_pvfs(-ENOMEM);
  465.             goto initialize_failure;
  466.         }
  467.     }
  468.  
  469.     /* set up the socket collection */
  470.     if (tcp_method_params.method_flags & BMI_INIT_SERVER)
  471.     {
  472.         tcp_addr_data = tcp_method_params.listen_addr->method_data;
  473.         tcp_socket_collection_p = BMI_socket_collection_init(tcp_addr_data->socket);
  474.     }
  475.     else
  476.     {
  477.         tcp_socket_collection_p = BMI_socket_collection_init(-1);
  478.     }
  479.  
  480.     if (!tcp_socket_collection_p)
  481.     {
  482.         tmp_errno = bmi_tcp_errno_to_pvfs(-ENOMEM);
  483.         goto initialize_failure;
  484.     }
  485.  
  486.     bmi_tcp_pid = getpid();
  487.     PINT_event_define_group("bmi_tcp", &bmi_tcp_event_group);
  488.  
  489.     /* Define the send event:
  490.      *   START: (client_id, request_id, rank, handle, op_id, send_size)
  491.      *   STOP: (size_sent)
  492.      */
  493.     PINT_event_define_event(
  494.         &bmi_tcp_event_group,
  495. #ifdef __PVFS2_SERVER__
  496.         "bmi_server_send",
  497. #else
  498.         "bmi_client_send",
  499. #endif
  500.         "%d%d%d%llu%d%d",
  501.         "%d", &bmi_tcp_send_event_id);
  502.  
  503.     /* Define the recv event:
  504.      *   START: (client_id, request_id, rank, handle, op_id, recv_size)
  505.      *   STOP: (size_received)
  506.      */
  507.     PINT_event_define_event(
  508.         &bmi_tcp_event_group,
  509. #ifdef __PVFS2_SERVER__
  510.         "bmi_server_recv",
  511. #else
  512.         "bmi_client_recv",
  513. #endif
  514.         "%d%d%d%llu%d%d",
  515.         "%d", &bmi_tcp_recv_event_id);
  516.  
  517.     gen_mutex_unlock(&interface_mutex);
  518.     gossip_ldebug(GOSSIP_BMI_DEBUG_TCP,
  519.                   "TCP/IP module successfully initialized.\n");
  520.     return (0);
  521.  
  522.   initialize_failure:
  523.  
  524.     /* cleanup data structures and bail out */
  525.     for (i = 0; i < NUM_INDICES; i++)
  526.     {
  527.         if (op_list_array[i])
  528.         {
  529.             op_list_cleanup(op_list_array[i]);
  530.         }
  531.     }
  532.     if (tcp_socket_collection_p)
  533.     {
  534.         BMI_socket_collection_finalize(tcp_socket_collection_p);
  535.     }
  536.     gen_mutex_unlock(&interface_mutex);
  537.     return (tmp_errno);
  538. }
  539.  
  540.  
  541. /* BMI_tcp_finalize()
  542.  * 
  543.  * Shuts down the tcp method.
  544.  *
  545.  * returns 0 on success, -errno on failure
  546.  */
  547. int BMI_tcp_finalize(void)
  548. {
  549.     int i = 0;
  550.  
  551.     gen_mutex_lock(&interface_mutex);
  552.  
  553.     /* shut down our listen addr, if we have one */
  554.     if ((tcp_method_params.method_flags & BMI_INIT_SERVER)
  555.         && tcp_method_params.listen_addr)
  556.     {
  557.         dealloc_tcp_method_addr(tcp_method_params.listen_addr);
  558.     }
  559.  
  560.     /* note that this forcefully shuts down operations */
  561.     for (i = 0; i < NUM_INDICES; i++)
  562.     {
  563.         if (op_list_array[i])
  564.         {
  565.             op_list_cleanup(op_list_array[i]);
  566.             op_list_array[i] = NULL;
  567.         }
  568.     }
  569.  
  570.     /* get rid of socket collection */
  571.     if (tcp_socket_collection_p)
  572.     {
  573.         BMI_socket_collection_finalize(tcp_socket_collection_p);
  574.         tcp_socket_collection_p = NULL;
  575.     }
  576.  
  577.     /* NOTE: we are trusting the calling BMI layer to deallocate 
  578.      * all of the method addresses (this will close any open sockets)
  579.      */
  580.     gossip_ldebug(GOSSIP_BMI_DEBUG_TCP, "TCP/IP module finalized.\n");
  581.     gen_mutex_unlock(&interface_mutex);
  582.     return (0);
  583. }
  584.  
  585.  
  586. /*
  587.  * BMI_tcp_method_addr_lookup()
  588.  *
  589.  * resolves the string representation of an address into a method
  590.  * address structure.  
  591.  *
  592.  * returns a pointer to method_addr on success, NULL on failure
  593.  */
  594. bmi_method_addr_p BMI_tcp_method_addr_lookup(const char *id_string)
  595. {
  596.     char *tcp_string = NULL;
  597.     char *delim = NULL;
  598.     char *hostname = NULL;
  599.     bmi_method_addr_p new_addr = NULL;
  600.     struct tcp_addr *tcp_addr_data = NULL;
  601.     int ret = -1;
  602.  
  603.     tcp_string = string_key("tcp", id_string);
  604.     if (!tcp_string)
  605.     {
  606.     /* the string doesn't even have our info */
  607.     return (NULL);
  608.     }
  609.  
  610.     /* start breaking up the method information */
  611.     /* for normal tcp, it is simply hostname:port */
  612.     if ((delim = index(tcp_string, ':')) == NULL)
  613.     {
  614.     gossip_lerr("Error: malformed tcp address.\n");
  615.     free(tcp_string);
  616.     return (NULL);
  617.     }
  618.  
  619.     /* looks ok, so let's build the method addr structure */
  620.     new_addr = alloc_tcp_method_addr();
  621.     if (!new_addr)
  622.     {
  623.     free(tcp_string);
  624.     return (NULL);
  625.     }
  626.     tcp_addr_data = new_addr->method_data;
  627.  
  628.     ret = sscanf((delim + 1), "%d", &(tcp_addr_data->port));
  629.     if (ret != 1)
  630.     {
  631.     gossip_lerr("Error: malformed tcp address.\n");
  632.     dealloc_tcp_method_addr(new_addr);
  633.     free(tcp_string);
  634.     return (NULL);
  635.     }
  636.  
  637.     hostname = (char *) malloc((delim - tcp_string + 1));
  638.     if (!hostname)
  639.     {
  640.     dealloc_tcp_method_addr(new_addr);
  641.     free(tcp_string);
  642.     return (NULL);
  643.     }
  644.     strncpy(hostname, tcp_string, (delim - tcp_string));
  645.     hostname[delim - tcp_string] = '\0';
  646.  
  647.     tcp_addr_data->hostname = hostname;
  648.  
  649.     free(tcp_string);
  650.     return (new_addr);
  651. }
  652.  
  653.  
  654. /* BMI_tcp_memalloc()
  655.  * 
  656.  * Allocates memory that can be used in native mode by tcp.
  657.  *
  658.  * returns 0 on success, -errno on failure
  659.  */
  660. void *BMI_tcp_memalloc(bmi_size_t size,
  661.                enum bmi_op_type send_recv)
  662. {
  663.     /* we really don't care what flags the caller uses, TCP/IP has no
  664.      * preferences about how the memory should be configured.
  665.      */
  666.  
  667. /*    return (calloc(1,(size_t) size)); */
  668.     return PINT_mem_aligned_alloc(size, 4096);
  669. }
  670.  
  671.  
  672. /* BMI_tcp_memfree()
  673.  * 
  674.  * Frees memory that was allocated with BMI_tcp_memalloc()
  675.  *
  676.  * returns 0 on success, -errno on failure
  677.  */
  678. int BMI_tcp_memfree(void *buffer,
  679.             bmi_size_t size,
  680.             enum bmi_op_type send_recv)
  681. {
  682.     PINT_mem_aligned_free(buffer);
  683.     return (0);
  684. }
  685.  
  686. /* BMI_tcp_unexpected_free()
  687.  * 
  688.  * Frees memory that was returned from BMI_tcp_test_unexpected()
  689.  *
  690.  * returns 0 on success, -errno on failure
  691.  */
  692. int BMI_tcp_unexpected_free(void *buffer)
  693. {
  694.     if (buffer)
  695.     {
  696.     free(buffer);
  697.     }
  698.     return (0);
  699. }
  700.  
  701. #ifdef USE_TRUSTED
  702.  
  703. static struct tcp_allowed_connection_s *
  704. alloc_trusted_connection_info(int network_count)
  705. {
  706.     struct tcp_allowed_connection_s *tcp_allowed_connection_info = NULL;
  707.  
  708.     tcp_allowed_connection_info = (struct tcp_allowed_connection_s *)
  709.             calloc(1, sizeof(struct tcp_allowed_connection_s));
  710.     if (tcp_allowed_connection_info)
  711.     {
  712.         tcp_allowed_connection_info->network =
  713.             (struct in_addr *) calloc(network_count, sizeof(struct in_addr));
  714.         if (tcp_allowed_connection_info->network == NULL)
  715.         {
  716.             free(tcp_allowed_connection_info);
  717.             tcp_allowed_connection_info = NULL;
  718.         }
  719.         else
  720.         {
  721.             tcp_allowed_connection_info->netmask =
  722.                 (struct in_addr *) calloc(network_count, sizeof(struct in_addr));
  723.             if (tcp_allowed_connection_info->netmask == NULL)
  724.             {
  725.                 free(tcp_allowed_connection_info->network);
  726.                 free(tcp_allowed_connection_info);
  727.                 tcp_allowed_connection_info = NULL;
  728.             }
  729.             else {
  730.                 tcp_allowed_connection_info->network_count = network_count;
  731.             }
  732.         }
  733.     }
  734.     return tcp_allowed_connection_info;
  735. }
  736.  
  737. static void 
  738. dealloc_trusted_connection_info(void* ptcp_allowed_connection_info)
  739. {
  740.     struct tcp_allowed_connection_s *tcp_allowed_connection_info =
  741.         (struct tcp_allowed_connection_s *) ptcp_allowed_connection_info;
  742.     if (tcp_allowed_connection_info)
  743.     {
  744.         free(tcp_allowed_connection_info->network);
  745.         tcp_allowed_connection_info->network = NULL;
  746.         free(tcp_allowed_connection_info->netmask);
  747.         tcp_allowed_connection_info->netmask = NULL;
  748.         free(tcp_allowed_connection_info);
  749.     }
  750.     return;
  751. }
  752.  
  753. #endif
  754.  
  755. /*
  756.  * This function will convert a mask_bits value to an in_addr
  757.  * representation. i.e for example if
  758.  * mask_bits was 24 then it would be 255.255.255.0
  759.  * if mask_bits was 22 then it would be 255.255.252.0
  760.  * etc
  761.  */
  762. static void convert_mask(int mask_bits, struct in_addr *mask)
  763. {
  764.    uint32_t addr = -1;
  765.    addr = addr & ~~(-1 << (mask_bits ? (32 - mask_bits) : 32));
  766.    mask->s_addr = htonl(addr);
  767.    return;
  768. }
  769.  
  770. /* BMI_tcp_set_info()
  771.  * 
  772.  * Pass in optional parameters.
  773.  *
  774.  * returns 0 on success, -errno on failure
  775.  */
  776. int BMI_tcp_set_info(int option,
  777.              void *inout_parameter)
  778. {
  779.     int ret = -1;
  780.     bmi_method_addr_p tmp_addr = NULL;
  781.  
  782.     gen_mutex_lock(&interface_mutex);
  783.  
  784.     switch (option)
  785.     {
  786.     case BMI_TCP_BUFFER_SEND_SIZE:
  787.        tcp_buffer_size_send = *((int *)inout_parameter);
  788.        ret = 0;
  789. #ifdef __PVFS2_SERVER__
  790.        /* Set the default socket buffer sizes for the server socket */
  791.        bmi_set_sock_buffers(
  792.            ((struct tcp_addr *)
  793.             tcp_method_params.listen_addr->method_data)->socket);
  794. #endif
  795.        break;
  796.     case BMI_TCP_BUFFER_RECEIVE_SIZE:
  797.        tcp_buffer_size_receive = *((int *)inout_parameter);
  798.        ret = 0;
  799. #ifdef __PVFS2_SERVER__
  800.        /* Set the default socket buffer sizes for the server socket */
  801.        bmi_set_sock_buffers(
  802.            ((struct tcp_addr *)
  803.             tcp_method_params.listen_addr->method_data)->socket);
  804. #endif
  805.        break;
  806.     case BMI_TCP_CLOSE_SOCKET: 
  807.         /* this should no longer make it to the bmi_tcp method; see bmi.c */
  808.         ret = 0;
  809.         break;
  810.     case BMI_FORCEFUL_CANCEL_MODE:
  811.     forceful_cancel_mode = 1;
  812.     ret = 0;
  813.     break;
  814.     case BMI_DROP_ADDR:
  815.     if (inout_parameter == NULL)
  816.     {
  817.         ret = bmi_tcp_errno_to_pvfs(-EINVAL);
  818.     }
  819.     else
  820.     {
  821.         tmp_addr = (bmi_method_addr_p) inout_parameter;
  822.         /* take it out of the socket collection */
  823.         tcp_forget_addr(tmp_addr, 1, 0);
  824.         ret = 0;
  825.     }
  826.     break;
  827. #ifdef USE_TRUSTED
  828.     case BMI_TRUSTED_CONNECTION:
  829.     {
  830.         struct tcp_allowed_connection_s *tcp_allowed_connection = NULL;
  831.         if (inout_parameter == NULL)
  832.         {
  833.             ret = bmi_tcp_errno_to_pvfs(-EINVAL);
  834.             break;
  835.         }
  836.         else 
  837.         {
  838.             int    bmi_networks_count = 0;
  839.             char **bmi_networks = NULL;
  840.             int   *bmi_netmasks = NULL;
  841.             struct server_configuration_s *svc_config = NULL;
  842.  
  843.             svc_config = (struct server_configuration_s *) inout_parameter;
  844.             tcp_allowed_connection = alloc_trusted_connection_info(svc_config->allowed_networks_count);
  845.             if (tcp_allowed_connection == NULL)
  846.             {
  847.                 ret = bmi_tcp_errno_to_pvfs(-ENOMEM);
  848.                 break;
  849.             }
  850. #ifdef      __PVFS2_SERVER__
  851.             gtcp_allowed_connection = tcp_allowed_connection;
  852. #endif
  853.             /* Stash this in the server_configuration_s structure. freed later on */
  854.             svc_config->security = tcp_allowed_connection;
  855.             svc_config->security_dtor = &dealloc_trusted_connection_info;
  856.             ret = 0;
  857.             /* Fill up the list of allowed ports */
  858.             PINT_config_get_allowed_ports(svc_config, 
  859.                     &tcp_allowed_connection->port_enforce, 
  860.                     tcp_allowed_connection->ports);
  861.  
  862.             /* if it was enabled, make sure that we know how to deal with it */
  863.             if (tcp_allowed_connection->port_enforce == 1)
  864.             {
  865.                 /* illegal ports */
  866.                 if (tcp_allowed_connection->ports[0] > 65535 
  867.                         || tcp_allowed_connection->ports[1] > 65535
  868.                         || tcp_allowed_connection->ports[1] < tcp_allowed_connection->ports[0])
  869.                 {
  870.                     gossip_lerr("Error: illegal trusted port values\n");
  871.                     ret = bmi_tcp_errno_to_pvfs(-EINVAL);
  872.                     /* don't enforce anything! */
  873.                     tcp_allowed_connection->port_enforce = 0;
  874.                 }
  875.             }
  876.             ret = 0;
  877.             /* Retrieve the list of BMI network addresses and masks  */
  878.             PINT_config_get_allowed_networks(svc_config,
  879.                     &tcp_allowed_connection->network_enforce,
  880.                     &bmi_networks_count,
  881.                     &bmi_networks,
  882.                     &bmi_netmasks);
  883.  
  884.             /* if it was enabled, make sure that we know how to deal with it */
  885.             if (tcp_allowed_connection->network_enforce == 1)
  886.             {
  887.                 int i;
  888.  
  889.                 for (i = 0; i < bmi_networks_count; i++)
  890.                 {
  891.                     char *tcp_string = NULL;
  892.                     /* Convert the network string into an in_addr_t structure */
  893.                     tcp_string = string_key("tcp", bmi_networks[i]);
  894.                     if (!tcp_string)
  895.                     {
  896.                         /* the string doesn't even have our info */
  897.                         gossip_lerr("Error: malformed tcp network address\n");
  898.                         ret = bmi_tcp_errno_to_pvfs(-EINVAL);
  899.                     }
  900.                     else {
  901.                         /* convert this into an in_addr_t */
  902.                         inet_aton(tcp_string, &tcp_allowed_connection->network[i]);
  903.                         free(tcp_string);
  904.                     }
  905.                     convert_mask(bmi_netmasks[i], &tcp_allowed_connection->netmask[i]);
  906.                 }
  907.                 /* don't enforce anything if there were any errors */
  908.                 if (ret != 0)
  909.                 {
  910.                     tcp_allowed_connection->network_enforce = 0;
  911.                 }
  912.             }
  913.         }
  914.         break;
  915.     }
  916. #endif
  917.     case BMI_TCP_CHECK_UNEXPECTED:
  918.     {
  919.         check_unexpected = *(int *)inout_parameter;
  920.         ret = 0;
  921.         break;
  922.     }
  923.  
  924.     default:
  925.     gossip_ldebug(GOSSIP_BMI_DEBUG_TCP,
  926.                       "TCP hint %d not implemented.\n", option);
  927.     ret = 0;
  928.     break;
  929.     }
  930.  
  931.     gen_mutex_unlock(&interface_mutex);
  932.     return (ret);
  933. }
  934.  
  935. /* BMI_tcp_get_info()
  936.  * 
  937.  * Query for optional parameters.
  938.  *
  939.  * returns 0 on success, -errno on failure
  940.  */
  941. int BMI_tcp_get_info(int option,
  942.              void *inout_parameter)
  943. {
  944.     struct method_drop_addr_query* query;
  945.     struct tcp_addr* tcp_addr_data;
  946.     int ret = 0;
  947.  
  948.     gen_mutex_lock(&interface_mutex);
  949.  
  950.     switch (option)
  951.     {
  952.     case BMI_CHECK_MAXSIZE:
  953.     *((int *) inout_parameter) = TCP_MODE_REND_LIMIT;
  954.         ret = 0;
  955.     break;
  956.     case BMI_DROP_ADDR_QUERY:
  957.     query = (struct method_drop_addr_query*)inout_parameter;
  958.     tcp_addr_data=query->addr->method_data;
  959.     /* only suggest that we discard the address if we have experienced
  960.      * an error and there is no way to reconnect
  961.      */
  962.     if(tcp_addr_data->addr_error != 0 &&
  963.            tcp_addr_data->dont_reconnect == 1)
  964.     {
  965.         query->response = 1;
  966.     }
  967.     else
  968.     {
  969.         query->response = 0;
  970.     }
  971.         ret = 0;
  972.     break;
  973.     case BMI_GET_UNEXP_SIZE:
  974.         *((int *) inout_parameter) = TCP_MODE_EAGER_LIMIT;
  975.         ret = 0;
  976.         break;
  977.  
  978.     default:
  979.     gossip_ldebug(GOSSIP_BMI_DEBUG_TCP,
  980.                       "TCP hint %d not implemented.\n", option);
  981.         ret = -ENOSYS;
  982.     break;
  983.     }
  984.  
  985.     gen_mutex_unlock(&interface_mutex);
  986.     return (ret < 0) ? bmi_tcp_errno_to_pvfs(ret) : ret;
  987. }
  988.  
  989.  
  990. /* BMI_tcp_post_send()
  991.  * 
  992.  * Submits send operations.
  993.  *
  994.  * returns 0 on success that requires later poll, returns 1 on instant
  995.  * completion, -errno on failure
  996.  */
  997. int BMI_tcp_post_send(bmi_op_id_t * id,
  998.               bmi_method_addr_p dest,
  999.               const void *buffer,
  1000.               bmi_size_t size,
  1001.               enum bmi_buffer_type buffer_type,
  1002.               bmi_msg_tag_t tag,
  1003.               void *user_ptr,
  1004.               bmi_context_id context_id,
  1005.                       PVFS_hint hints)
  1006. {
  1007.     struct tcp_msg_header my_header;
  1008.     int ret = -1;
  1009.  
  1010.     /* clear the id field for safety */
  1011.     *id = 0;
  1012.  
  1013.     /* fill in the TCP-specific message header */
  1014.     if (size > TCP_MODE_REND_LIMIT)
  1015.     {
  1016.     return (bmi_tcp_errno_to_pvfs(-EMSGSIZE));
  1017.     }
  1018.  
  1019.     if (size <= TCP_MODE_EAGER_LIMIT)
  1020.     {
  1021.     my_header.mode = TCP_MODE_EAGER;
  1022.     }
  1023.     else
  1024.     {
  1025.     my_header.mode = TCP_MODE_REND;
  1026.     }
  1027.     my_header.tag = tag;
  1028.     my_header.size = size;
  1029.     my_header.magic_nr = BMI_MAGIC_NR;
  1030.  
  1031.     gen_mutex_lock(&interface_mutex);
  1032.  
  1033.     ret = tcp_post_send_generic(id, dest, &buffer,
  1034.                                 &size, 1, buffer_type, my_header,
  1035.                                 user_ptr, context_id, hints);
  1036.  
  1037.     gen_mutex_unlock(&interface_mutex);
  1038.     return(ret);
  1039. }
  1040.  
  1041.  
  1042. /* BMI_tcp_post_sendunexpected()
  1043.  * 
  1044.  * Submits unexpected send operations.
  1045.  *
  1046.  * returns 0 on success that requires later poll, returns 1 on instant
  1047.  * completion, -errno on failure
  1048.  */
  1049. int BMI_tcp_post_sendunexpected(bmi_op_id_t * id,
  1050.                 bmi_method_addr_p dest,
  1051.                 const void *buffer,
  1052.                 bmi_size_t size,
  1053.                 enum bmi_buffer_type buffer_type,
  1054.                 bmi_msg_tag_t tag,
  1055.                 void *user_ptr,
  1056.                 bmi_context_id context_id,
  1057.                                 PVFS_hint hints)
  1058. {
  1059.     struct tcp_msg_header my_header;
  1060.     int ret = -1;
  1061.  
  1062.     /* clear the id field for safety */
  1063.     *id = 0;
  1064.  
  1065.     if (size > TCP_MODE_EAGER_LIMIT)
  1066.     {
  1067.     return (bmi_tcp_errno_to_pvfs(-EMSGSIZE));
  1068.     }
  1069.  
  1070.     my_header.mode = TCP_MODE_UNEXP;
  1071.     my_header.tag = tag;
  1072.     my_header.size = size;
  1073.     my_header.magic_nr = BMI_MAGIC_NR;
  1074.  
  1075.     gen_mutex_lock(&interface_mutex);
  1076.  
  1077.     ret = tcp_post_send_generic(id, dest, &buffer,
  1078.                                 &size, 1, buffer_type, my_header,
  1079.                                 user_ptr, context_id, hints);
  1080.     gen_mutex_unlock(&interface_mutex);
  1081.     return(ret);
  1082. }
  1083.  
  1084.  
  1085.  
  1086. /* BMI_tcp_post_recv()
  1087.  * 
  1088.  * Submits recv operations.
  1089.  *
  1090.  * returns 0 on success that requires later poll, returns 1 on instant
  1091.  * completion, -errno on failure
  1092.  */
  1093. int BMI_tcp_post_recv(bmi_op_id_t * id,
  1094.               bmi_method_addr_p src,
  1095.               void *buffer,
  1096.               bmi_size_t expected_size,
  1097.               bmi_size_t * actual_size,
  1098.               enum bmi_buffer_type buffer_type,
  1099.               bmi_msg_tag_t tag,
  1100.               void *user_ptr,
  1101.               bmi_context_id context_id,
  1102.                       PVFS_hint hints)
  1103. {
  1104.     int ret = -1;
  1105.  
  1106.     /* A few things could happen here:
  1107.      * a) rendez. recv with sender not ready yet
  1108.      * b) rendez. recv with sender waiting
  1109.      * c) eager recv, data not available yet
  1110.      * d) eager recv, some/all data already here
  1111.      * e) rendez. recv with sender in eager mode
  1112.      *
  1113.      * b or d could lead to completion without polling.
  1114.      * we don't look for unexpected messages here.
  1115.      */
  1116.  
  1117.     if (expected_size > TCP_MODE_REND_LIMIT)
  1118.     {
  1119.     return (bmi_tcp_errno_to_pvfs(-EINVAL));
  1120.     }
  1121.     gen_mutex_lock(&interface_mutex);
  1122.  
  1123.     ret = tcp_post_recv_generic(id, src, &buffer, &expected_size,
  1124.                                 1, expected_size, actual_size,
  1125.                                 buffer_type, tag,
  1126.                                 user_ptr, context_id, hints);
  1127.  
  1128.     gen_mutex_unlock(&interface_mutex);
  1129.     return (ret);
  1130. }
  1131.  
  1132.  
  1133. /* BMI_tcp_test()
  1134.  * 
  1135.  * Checks to see if a particular message has completed.
  1136.  *
  1137.  * returns 0 on success, -errno on failure
  1138.  */
  1139. int BMI_tcp_test(bmi_op_id_t id,
  1140.          int *outcount,
  1141.          bmi_error_code_t * error_code,
  1142.          bmi_size_t * actual_size,
  1143.          void **user_ptr,
  1144.          int max_idle_time,
  1145.          bmi_context_id context_id)
  1146. {
  1147.     int ret = -1;
  1148.     method_op_p query_op = (method_op_p)id_gen_fast_lookup(id);
  1149.  
  1150.     assert(query_op != NULL);
  1151.  
  1152.     gen_mutex_lock(&interface_mutex);
  1153.  
  1154.     /* do some ``real work'' here */
  1155.     ret = tcp_do_work(max_idle_time);
  1156.     if (ret < 0)
  1157.     {
  1158.     gen_mutex_unlock(&interface_mutex);
  1159.     return (ret);
  1160.     }
  1161.  
  1162.     if(((struct tcp_op*)(query_op->method_data))->tcp_op_state ==
  1163.     BMI_TCP_COMPLETE)
  1164.     {
  1165.     assert(query_op->context_id == context_id);
  1166.     op_list_remove(query_op);
  1167.     if (user_ptr != NULL)
  1168.     {
  1169.         (*user_ptr) = query_op->user_ptr;
  1170.     }
  1171.     (*error_code) = query_op->error_code;
  1172.     (*actual_size) = query_op->actual_size;
  1173.         PINT_EVENT_END(
  1174.             (query_op->send_recv == BMI_SEND ?
  1175.              bmi_tcp_send_event_id : bmi_tcp_recv_event_id), bmi_tcp_pid, NULL,
  1176.              query_op->event_id, id, *actual_size);
  1177.  
  1178.     dealloc_tcp_method_op(query_op);
  1179.     (*outcount)++;
  1180.     }
  1181.  
  1182.     gen_mutex_unlock(&interface_mutex);
  1183.     return (0);
  1184. }
  1185.  
  1186. /* BMI_tcp_testsome()
  1187.  * 
  1188.  * Checks to see if any messages from the specified list have completed.
  1189.  *
  1190.  * returns 0 on success, -errno on failure
  1191.  */
  1192. int BMI_tcp_testsome(int incount,
  1193.                      bmi_op_id_t * id_array,
  1194.                      int *outcount,
  1195.                      int *index_array,
  1196.                      bmi_error_code_t * error_code_array,
  1197.                      bmi_size_t * actual_size_array,
  1198.                      void **user_ptr_array,
  1199.                      int max_idle_time,
  1200.                      bmi_context_id context_id)
  1201. {
  1202.     int ret = -1;
  1203.     method_op_p query_op = NULL;
  1204.     int i;
  1205.  
  1206.     gen_mutex_lock(&interface_mutex);
  1207.  
  1208.     /* do some ``real work'' here */
  1209.     ret = tcp_do_work(max_idle_time);
  1210.     if (ret < 0)
  1211.     {
  1212.         gen_mutex_unlock(&interface_mutex);
  1213.         return (ret);
  1214.     }
  1215.  
  1216.     for(i=0; i<incount; i++)
  1217.     {
  1218.         if(id_array[i])
  1219.         {
  1220.             /* NOTE: this depends on the user passing in valid id's;
  1221.              * otherwise we segfault.  
  1222.              */
  1223.             query_op = (method_op_p)id_gen_fast_lookup(id_array[i]);
  1224.             if(((struct tcp_op*)(query_op->method_data))->tcp_op_state ==
  1225.                BMI_TCP_COMPLETE)
  1226.             {
  1227.                 assert(query_op->context_id == context_id);
  1228.                 /* this one's done; pop it out */
  1229.                 op_list_remove(query_op);
  1230.                 error_code_array[*outcount] = query_op->error_code;
  1231.                 actual_size_array[*outcount] = query_op->actual_size;
  1232.                 index_array[*outcount] = i;
  1233.                 if (user_ptr_array != NULL)
  1234.                 {
  1235.                     user_ptr_array[*outcount] = query_op->user_ptr;
  1236.                 }
  1237.                 PINT_EVENT_END(
  1238.                     (query_op->send_recv == BMI_SEND ?
  1239.                      bmi_tcp_send_event_id : bmi_tcp_recv_event_id),
  1240.                     bmi_tcp_pid, NULL,
  1241.                     query_op->event_id, actual_size_array[*outcount]);
  1242.                 dealloc_tcp_method_op(query_op);
  1243.                 (*outcount)++;
  1244.             }
  1245.         }
  1246.     }
  1247.  
  1248.     gen_mutex_unlock(&interface_mutex);
  1249.     return(0);
  1250. }
  1251.  
  1252.  
  1253. /* BMI_tcp_testunexpected()
  1254.  * 
  1255.  * Checks to see if any unexpected messages have completed.
  1256.  *
  1257.  * returns 0 on success, -errno on failure
  1258.  */
  1259. int BMI_tcp_testunexpected(int incount,
  1260.                int *outcount,
  1261.                struct bmi_method_unexpected_info *info,
  1262.                int max_idle_time)
  1263. {
  1264.     int ret = -1;
  1265.     method_op_p query_op = NULL;
  1266.  
  1267.     gen_mutex_lock(&interface_mutex);
  1268.  
  1269.     if(op_list_empty(op_list_array[IND_COMPLETE_RECV_UNEXP]))
  1270.     {
  1271.         /* do some ``real work'' here */
  1272.         ret = tcp_do_work(max_idle_time);
  1273.         if (ret < 0)
  1274.         {
  1275.             gen_mutex_unlock(&interface_mutex);
  1276.             return (ret);
  1277.         }
  1278.     }
  1279.  
  1280.     *outcount = 0;
  1281.  
  1282.     /* go through the completed/unexpected list as long as we are finding 
  1283.      * stuff and we have room in the info array for it
  1284.      */
  1285.     while ((*outcount < incount) &&
  1286.        (query_op =
  1287.         op_list_shownext(op_list_array[IND_COMPLETE_RECV_UNEXP])))
  1288.     {
  1289.     info[*outcount].error_code = query_op->error_code;
  1290.     info[*outcount].addr = query_op->addr;
  1291.     info[*outcount].buffer = query_op->buffer;
  1292.     info[*outcount].size = query_op->actual_size;
  1293.     info[*outcount].tag = query_op->msg_tag;
  1294.     op_list_remove(query_op);
  1295.     dealloc_tcp_method_op(query_op);
  1296.     (*outcount)++;
  1297.     }
  1298.     gen_mutex_unlock(&interface_mutex);
  1299.     return (0);
  1300. }
  1301.  
  1302.  
  1303. /* BMI_tcp_testcontext()
  1304.  * 
  1305.  * Checks to see if any messages from the specified context have completed.
  1306.  *
  1307.  * returns 0 on success, -errno on failure
  1308.  */
  1309. int BMI_tcp_testcontext(int incount,
  1310.              bmi_op_id_t* out_id_array,
  1311.              int *outcount,
  1312.              bmi_error_code_t * error_code_array,
  1313.              bmi_size_t * actual_size_array,
  1314.              void **user_ptr_array,
  1315.              int max_idle_time,
  1316.              bmi_context_id context_id)
  1317. {
  1318.     int ret = -1;
  1319.     method_op_p query_op = NULL;
  1320.  
  1321.     *outcount = 0;
  1322.  
  1323.     gen_mutex_lock(&interface_mutex);
  1324.  
  1325.     if(op_list_empty(completion_array[context_id]))
  1326.     {
  1327.         /* if there are unexpected ops ready to go, then short out so
  1328.          * that the next testunexpected call can pick it up without
  1329.          * delay
  1330.          */
  1331.         if(check_unexpected &&
  1332.            !op_list_empty(op_list_array[IND_COMPLETE_RECV_UNEXP]))
  1333.         {
  1334.             gen_mutex_unlock(&interface_mutex);
  1335.             return(0);
  1336.         }
  1337.  
  1338.         /* do some ``real work'' here */
  1339.         ret = tcp_do_work(max_idle_time);
  1340.         if (ret < 0)
  1341.         {
  1342.             gen_mutex_unlock(&interface_mutex);
  1343.             return (ret);
  1344.         }
  1345.     }
  1346.  
  1347.     /* pop as many items off of the completion queue as we can */
  1348.     while((*outcount < incount) && 
  1349.           (query_op =
  1350.            op_list_shownext(completion_array[context_id])))
  1351.     {
  1352.         assert(query_op);
  1353.         assert(query_op->context_id == context_id);
  1354.  
  1355.         /* this one's done; pop it out */
  1356.         op_list_remove(query_op);
  1357.         error_code_array[*outcount] = query_op->error_code;
  1358.         actual_size_array[*outcount] = query_op->actual_size;
  1359.         out_id_array[*outcount] = query_op->op_id;
  1360.         if (user_ptr_array != NULL)
  1361.         {
  1362.             user_ptr_array[*outcount] = query_op->user_ptr;
  1363.         }
  1364.  
  1365.         PINT_EVENT_END((query_op->send_recv == BMI_SEND ?
  1366.                         bmi_tcp_send_event_id : bmi_tcp_recv_event_id),
  1367.                        bmi_tcp_pid, NULL, query_op->event_id,
  1368.                        query_op->actual_size);
  1369.  
  1370.         dealloc_tcp_method_op(query_op);
  1371.         query_op = NULL;
  1372.         (*outcount)++;
  1373.     }
  1374.  
  1375.     gen_mutex_unlock(&interface_mutex);
  1376.     return(0);
  1377. }
  1378.  
  1379.  
  1380.  
  1381. /* BMI_tcp_post_send_list()
  1382.  *
  1383.  * same as the BMI_tcp_post_send() function, except that it sends
  1384.  * from an array of possibly non contiguous buffers
  1385.  *
  1386.  * returns 0 on success, 1 on immediate successful completion,
  1387.  * -errno on failure
  1388.  */
  1389. int BMI_tcp_post_send_list(bmi_op_id_t * id,
  1390.                bmi_method_addr_p dest,
  1391.                const void *const *buffer_list,
  1392.                const bmi_size_t *size_list,
  1393.                int list_count,
  1394.                bmi_size_t total_size,
  1395.                enum bmi_buffer_type buffer_type,
  1396.                bmi_msg_tag_t tag,
  1397.                void *user_ptr,
  1398.                bmi_context_id context_id,
  1399.                            PVFS_hint hints)
  1400. {
  1401.     struct tcp_msg_header my_header;
  1402.     int ret = -1;
  1403.  
  1404.     /* clear the id field for safety */
  1405.     *id = 0;
  1406.  
  1407.     /* fill in the TCP-specific message header */
  1408.     if (total_size > TCP_MODE_REND_LIMIT)
  1409.     {
  1410.     gossip_lerr("Error: BMI message too large!\n");
  1411.     return (bmi_tcp_errno_to_pvfs(-EMSGSIZE));
  1412.     }
  1413.  
  1414.     if (total_size <= TCP_MODE_EAGER_LIMIT)
  1415.     {
  1416.     my_header.mode = TCP_MODE_EAGER;
  1417.     }
  1418.     else
  1419.     {
  1420.     my_header.mode = TCP_MODE_REND;
  1421.     }
  1422.     my_header.tag = tag;
  1423.     my_header.size = total_size;
  1424.     my_header.magic_nr = BMI_MAGIC_NR;
  1425.  
  1426.     gen_mutex_lock(&interface_mutex);
  1427.  
  1428.     ret = tcp_post_send_generic(id, dest, buffer_list,
  1429.                                 size_list, list_count, buffer_type,
  1430.                                 my_header, user_ptr, context_id, hints);
  1431.     gen_mutex_unlock(&interface_mutex);
  1432.     return(ret);
  1433. }
  1434.  
  1435. /* BMI_tcp_post_recv_list()
  1436.  *
  1437.  * same as the BMI_tcp_post_recv() function, except that it recvs
  1438.  * into an array of possibly non contiguous buffers
  1439.  *
  1440.  * returns 0 on success, 1 on immediate successful completion,
  1441.  * -errno on failure
  1442.  */
  1443. int BMI_tcp_post_recv_list(bmi_op_id_t * id,
  1444.                bmi_method_addr_p src,
  1445.                void *const *buffer_list,
  1446.                const bmi_size_t *size_list,
  1447.                int list_count,
  1448.                bmi_size_t total_expected_size,
  1449.                bmi_size_t * total_actual_size,
  1450.                enum bmi_buffer_type buffer_type,
  1451.                bmi_msg_tag_t tag,
  1452.                void *user_ptr,
  1453.                bmi_context_id context_id,
  1454.                            PVFS_hint hints)
  1455. {
  1456.     int ret = -1;
  1457.  
  1458.     if (total_expected_size > TCP_MODE_REND_LIMIT)
  1459.     {
  1460.     return (bmi_tcp_errno_to_pvfs(-EINVAL));
  1461.     }
  1462.  
  1463.     gen_mutex_lock(&interface_mutex);
  1464.  
  1465.     ret = tcp_post_recv_generic(id, src, buffer_list, size_list,
  1466.                                 list_count, total_expected_size,
  1467.                                 total_actual_size, buffer_type, tag, user_ptr,
  1468.                                 context_id, hints);
  1469.  
  1470.     gen_mutex_unlock(&interface_mutex);
  1471.     return (ret);
  1472. }
  1473.  
  1474.  
  1475. /* BMI_tcp_post_sendunexpected_list()
  1476.  *
  1477.  * same as the BMI_tcp_post_sendunexpected() function, except that 
  1478.  * it sends from an array of possibly non contiguous buffers
  1479.  *
  1480.  * returns 0 on success, 1 on immediate successful completion,
  1481.  * -errno on failure
  1482.  */
  1483. int BMI_tcp_post_sendunexpected_list(bmi_op_id_t * id,
  1484.                      bmi_method_addr_p dest,
  1485.                      const void *const *buffer_list,
  1486.                      const bmi_size_t *size_list,
  1487.                      int list_count,
  1488.                      bmi_size_t total_size,
  1489.                      enum bmi_buffer_type buffer_type,
  1490.                      bmi_msg_tag_t tag,
  1491.                      void *user_ptr,
  1492.                      bmi_context_id context_id,
  1493.                                      PVFS_hint hints)
  1494. {
  1495.     struct tcp_msg_header my_header;
  1496.     int ret = -1;
  1497.  
  1498.     /* clear the id field for safety */
  1499.     *id = 0;
  1500.  
  1501.     if (total_size > TCP_MODE_EAGER_LIMIT)
  1502.     {
  1503.     return (bmi_tcp_errno_to_pvfs(-EMSGSIZE));
  1504.     }
  1505.  
  1506.     my_header.mode = TCP_MODE_UNEXP;
  1507.     my_header.tag = tag;
  1508.     my_header.size = total_size;
  1509.     my_header.magic_nr = BMI_MAGIC_NR;
  1510.  
  1511.     gen_mutex_lock(&interface_mutex);
  1512.  
  1513.     ret = tcp_post_send_generic(id, dest, buffer_list,
  1514.                                 size_list, list_count, buffer_type,
  1515.                                 my_header, user_ptr, context_id, hints);
  1516.  
  1517.     gen_mutex_unlock(&interface_mutex);
  1518.     return(ret);
  1519. }
  1520.  
  1521.  
  1522. /* BMI_tcp_open_context()
  1523.  *
  1524.  * opens a new context with the specified context id
  1525.  *
  1526.  * returns 0 on success, -errno on failure
  1527.  */
  1528. int BMI_tcp_open_context(bmi_context_id context_id)
  1529. {
  1530.  
  1531.     gen_mutex_lock(&interface_mutex);
  1532.  
  1533.     /* start a new queue for tracking completions in this context */
  1534.     completion_array[context_id] = op_list_new();
  1535.     if (!completion_array[context_id])
  1536.     {
  1537.     gen_mutex_unlock(&interface_mutex);
  1538.     return(bmi_tcp_errno_to_pvfs(-ENOMEM));
  1539.     }
  1540.  
  1541.     gen_mutex_unlock(&interface_mutex);
  1542.     return(0);
  1543. }
  1544.  
  1545.  
  1546. /* BMI_tcp_close_context()
  1547.  *
  1548.  * shuts down a context, previously opened with BMI_tcp_open_context()
  1549.  *
  1550.  * no return value
  1551.  */
  1552. void BMI_tcp_close_context(bmi_context_id context_id)
  1553. {
  1554.     
  1555.     gen_mutex_lock(&interface_mutex);
  1556.  
  1557.     /* tear down completion queue for this context */
  1558.     op_list_cleanup(completion_array[context_id]);
  1559.  
  1560.     gen_mutex_unlock(&interface_mutex);
  1561.     return;
  1562. }
  1563.  
  1564.  
  1565. /* BMI_tcp_cancel()
  1566.  *
  1567.  * attempt to cancel a pending bmi tcp operation
  1568.  *
  1569.  * returns 0 on success, -errno on failure
  1570.  */
  1571. int BMI_tcp_cancel(bmi_op_id_t id, bmi_context_id context_id)
  1572. {
  1573.     method_op_p query_op = NULL;
  1574.     
  1575.     gen_mutex_lock(&interface_mutex);
  1576.  
  1577.     query_op = (method_op_p)id_gen_fast_lookup(id);
  1578.     if(!query_op)
  1579.     {
  1580.         /* if we can't find the operattion, then assume that it has already
  1581.          * completed naturally
  1582.          */
  1583.         gen_mutex_unlock(&interface_mutex);
  1584.         return(0);
  1585.     }
  1586.  
  1587.     /* easy case: is the operation already completed? */
  1588.     if(((struct tcp_op*)(query_op->method_data))->tcp_op_state ==
  1589.     BMI_TCP_COMPLETE)
  1590.     {
  1591.     /* only close socket in forceful cancel mode */
  1592.     if(forceful_cancel_mode)
  1593.         tcp_forget_addr(query_op->addr, 0, -BMI_ECANCEL);
  1594.     /* we are done! status will be collected during test */
  1595.     gen_mutex_unlock(&interface_mutex);
  1596.     return(0);
  1597.     }
  1598.  
  1599.     /* has the operation started moving data yet? */
  1600.     if(query_op->env_amt_complete)
  1601.     {
  1602.     /* be pessimistic and kill the socket, even if not in forceful
  1603.      * cancel mode */
  1604.     /* NOTE: this may place other operations beside this one into
  1605.      * EINTR error state 
  1606.      */
  1607.     tcp_forget_addr(query_op->addr, 0, -BMI_ECANCEL);
  1608.     gen_mutex_unlock(&interface_mutex);
  1609.     return(0);
  1610.     }
  1611.  
  1612.     /* if we fall to this point, op has been posted, but no data has moved
  1613.      * for it yet as far as we know
  1614.      */
  1615.  
  1616.     /* mark op as canceled, move to completion queue */
  1617.     query_op->error_code = -BMI_ECANCEL;
  1618.     if(query_op->send_recv == BMI_SEND)
  1619.     {
  1620.     BMI_socket_collection_remove_write_bit(tcp_socket_collection_p,
  1621.                        query_op->addr);
  1622.     }
  1623.     op_list_remove(query_op);
  1624.     ((struct tcp_op*)(query_op->method_data))->tcp_op_state = 
  1625.     BMI_TCP_COMPLETE;
  1626.     /* only close socket in forceful cancel mode */
  1627.     if(forceful_cancel_mode)
  1628.     tcp_forget_addr(query_op->addr, 0, -BMI_ECANCEL);
  1629.     op_list_add(completion_array[query_op->context_id], query_op);
  1630.     gen_mutex_unlock(&interface_mutex);
  1631.     return(0);
  1632. }
  1633.  
  1634. /*
  1635.  * For now, we only support wildcard strings that are IP addresses
  1636.  * and not *hostnames*!
  1637.  */
  1638. static int check_valid_wildcard(const char *wildcard_string, unsigned long *octets)
  1639. {
  1640.     int i, len = strlen(wildcard_string), last_dot = -1, octet_count = 0;
  1641.     char str[16];
  1642.     for (i = 0; i < len; i++)
  1643.     {
  1644.         char c = wildcard_string[i];
  1645.         memset(str, 0, 16);
  1646.         if ((c < '0' || c > '9') && c != '*' && c != '.')
  1647.             return -EINVAL;
  1648.         if (c == '*') {
  1649.             if (octet_count >= 4)
  1650.                 return -EINVAL;
  1651.             octets[octet_count++] = 256;
  1652.         }
  1653.         else if (c == '.')
  1654.         {
  1655.             char *endptr = NULL;
  1656.             if (octet_count >= 4)
  1657.                 return -EINVAL;
  1658.             strncpy(str, &wildcard_string[last_dot + 1], (i - last_dot - 1));
  1659.             octets[octet_count++] = strtol(str, &endptr, 10);
  1660.             if (*endptr != '\0' || octets[octet_count-1] >= 256)
  1661.                 return -EINVAL;
  1662.             last_dot = i;
  1663.         }
  1664.     }
  1665.     for (i = octet_count; i < 4; i++)
  1666.     {
  1667.          octets[i] = 256;
  1668.     }
  1669.     return 0;
  1670. }
  1671.  
  1672. /*
  1673.  * return 1 if the addr specified is part of the wildcard specification of octet
  1674.  * return 0 otherwise.
  1675.  */
  1676. static int check_octets(struct in_addr addr, unsigned long *octets)
  1677. {
  1678. #define B1_MASK  0xff000000
  1679. #define B1_SHIFT 24
  1680. #define B2_MASK  0x00ff0000
  1681. #define B2_SHIFT 16
  1682. #define B3_MASK  0x0000ff00
  1683. #define B3_SHIFT 8
  1684. #define B4_MASK  0x000000ff
  1685.     uint32_t host_addr = ntohl(addr.s_addr);
  1686.     /* * stands for all clients */
  1687.     if (octets[0] == 256)
  1688.     {
  1689.         return 1;
  1690.     }
  1691.     if (((host_addr & B1_MASK) >> B1_SHIFT) != octets[0])
  1692.     {
  1693.         return 0;
  1694.     }
  1695.     if (octets[1] == 256)
  1696.     {
  1697.         return 1;
  1698.     }
  1699.     if (((host_addr & B2_MASK) >> B2_SHIFT) != octets[1])
  1700.     {
  1701.         return 0;
  1702.     }
  1703.     if (octets[2] == 256)
  1704.     {
  1705.         return 1;
  1706.     }
  1707.     if (((host_addr & B3_MASK) >> B3_SHIFT) != octets[2])
  1708.     {
  1709.         return 0;
  1710.     }
  1711.     if (octets[3] == 256)
  1712.     {
  1713.         return 1;
  1714.     }
  1715.     if ((host_addr & B4_MASK) != octets[3])
  1716.     {
  1717.         return 0;
  1718.     }
  1719.     return 1;
  1720. #undef B1_MASK
  1721. #undef B1_SHIFT 
  1722. #undef B2_MASK 
  1723. #undef B2_SHIFT
  1724. #undef B3_MASK
  1725. #undef B3_SHIFT
  1726. #undef B4_MASK
  1727. }
  1728. /* BMI_tcp_query_addr_range()
  1729.  * Check if a given address is within the network specified by the wildcard string!
  1730.  * or if it is part of the subnet mask specified
  1731.  */
  1732. int BMI_tcp_query_addr_range(bmi_method_addr_p map, const char *wildcard_string, int netmask)
  1733. {
  1734.     struct tcp_addr *tcp_addr_data = map->method_data;
  1735.     struct sockaddr_in map_addr;
  1736.     socklen_t map_addr_len = sizeof(map_addr);
  1737.     const char *tcp_wildcard = wildcard_string + 6 /* strlen("tcp://") */;
  1738.     int ret = -1;
  1739.  
  1740.     memset(&map_addr, 0, sizeof(map_addr));
  1741.     if(getpeername(tcp_addr_data->socket, (struct sockaddr *) &map_addr, &map_addr_len) < 0)
  1742.     {
  1743.         ret =  bmi_tcp_errno_to_pvfs(-EINVAL);
  1744.         gossip_err("Error: failed to retrieve peer name for client.\n");
  1745.         return(ret);
  1746.     }
  1747.     /* Wildcard specification */
  1748.     if (netmask == -1)
  1749.     {
  1750.         unsigned long octets[4];
  1751.         if (check_valid_wildcard(tcp_wildcard, octets) < 0)
  1752.         {
  1753.             gossip_lerr("Invalid wildcard specification: %s\n", tcp_wildcard);
  1754.             return -EINVAL;
  1755.         }
  1756.         gossip_debug(GOSSIP_BMI_DEBUG_TCP, "Map Address is : %s, Wildcard Octets: %lu.%lu.%lu.%lu\n", inet_ntoa(map_addr.sin_addr),
  1757.                 octets[0], octets[1], octets[2], octets[3]);
  1758.         if (check_octets(map_addr.sin_addr, octets) == 1)
  1759.         {
  1760.             return 1;
  1761.         }
  1762.     }
  1763.     /* Netmask specification */
  1764.     else {
  1765.         struct sockaddr_in mask_addr, network_addr;
  1766.         memset(&mask_addr, 0, sizeof(mask_addr));
  1767.         memset(&network_addr, 0, sizeof(network_addr));
  1768.         /* Convert the netmask address */
  1769.         convert_mask(netmask, &mask_addr.sin_addr);
  1770.         /* Invalid network address */
  1771.         if (inet_aton(tcp_wildcard, &network_addr.sin_addr) == 0)
  1772.         {
  1773.             gossip_err("Invalid network specification: %s\n", tcp_wildcard);
  1774.             return -EINVAL;
  1775.         }
  1776.         /* Matches the subnet mask! */
  1777.         if ((map_addr.sin_addr.s_addr & mask_addr.sin_addr.s_addr)
  1778.                 == (network_addr.sin_addr.s_addr & mask_addr.sin_addr.s_addr))
  1779.         {
  1780.             return 1;
  1781.         }
  1782.     }
  1783.     return 0;
  1784. }
  1785.  
  1786. /* BMI_tcp_addr_rev_lookup_unexpected()
  1787.  *
  1788.  * looks up an address that was initialized unexpectedly and returns a string
  1789.  * hostname
  1790.  *
  1791.  * returns string on success, "UNKNOWN" on failure
  1792.  */
  1793. const char* BMI_tcp_addr_rev_lookup_unexpected(bmi_method_addr_p map)
  1794. {
  1795.     struct tcp_addr *tcp_addr_data = map->method_data;
  1796.     int debug_on;
  1797.     uint64_t mask;
  1798.     socklen_t peerlen;
  1799.     struct sockaddr_in peer;
  1800.     int ret;
  1801.     struct hostent *peerent;
  1802.     char* tmp_peer;
  1803.  
  1804.     /* return default response if we don't have support for the right socket
  1805.      * calls 
  1806.      */
  1807. #if !defined(HAVE_GETHOSTBYADDR)
  1808.     return(tcp_addr_data->peer);
  1809. #else 
  1810.  
  1811.     /* Only resolve hostnames if a gossip mask is set to request it.
  1812.      * Otherwise we leave it at ip address 
  1813.      */
  1814.     gossip_get_debug_mask(&debug_on, &mask);
  1815.  
  1816.     if(!debug_on || (!(mask & GOSSIP_ACCESS_HOSTNAMES)))
  1817.     {
  1818.         return(tcp_addr_data->peer);
  1819.     }
  1820.  
  1821.     peerlen = sizeof(struct sockaddr_in);
  1822.  
  1823.     if(tcp_addr_data->peer_type == BMI_TCP_PEER_HOSTNAME)
  1824.     {
  1825.         /* full hostname already cached; return now */
  1826.         return(tcp_addr_data->peer);
  1827.     }
  1828.  
  1829.     /* if we hit this point, we need to resolve hostname */
  1830.     ret = getpeername(tcp_addr_data->socket, (struct sockaddr*)&(peer), &peerlen);
  1831.     if(ret < 0)
  1832.     {
  1833.         /* default to use IP address */
  1834.         return(tcp_addr_data->peer);
  1835.     }
  1836.  
  1837.     peerent = gethostbyaddr((void*)&peer.sin_addr.s_addr, 
  1838.         sizeof(struct in_addr), AF_INET);
  1839.     if(peerent == NULL)
  1840.     {
  1841.         /* default to use IP address */
  1842.         return(tcp_addr_data->peer);
  1843.     }
  1844.  
  1845.     tmp_peer = (char*)malloc(strlen(peerent->h_name) + 1);
  1846.     if(!tmp_peer)
  1847.     {
  1848.         /* default to use IP address */
  1849.         return(tcp_addr_data->peer);
  1850.     }
  1851.     strcpy(tmp_peer, peerent->h_name);
  1852.     if(tcp_addr_data->peer)
  1853.     {
  1854.         free(tcp_addr_data->peer);
  1855.     }
  1856.     tcp_addr_data->peer = tmp_peer;
  1857.     tcp_addr_data->peer_type = BMI_TCP_PEER_HOSTNAME;
  1858.     return(tcp_addr_data->peer);
  1859.  
  1860. #endif
  1861.  
  1862. }
  1863.  
  1864. /* tcp_forget_addr()
  1865.  *
  1866.  * completely removes a tcp method address from use, and aborts any
  1867.  * operations that use the address.  If the
  1868.  * dealloc_flag is set, the memory used by the address will be
  1869.  * deallocated as well.
  1870.  *
  1871.  * no return value
  1872.  */
  1873. void tcp_forget_addr(bmi_method_addr_p map,
  1874.              int dealloc_flag,
  1875.              int error_code)
  1876. {
  1877.     struct tcp_addr* tcp_addr_data = map->method_data;
  1878.     BMI_addr_t bmi_addr = tcp_addr_data->bmi_addr;
  1879.     int tmp_outcount;
  1880.     bmi_method_addr_p tmp_addr;
  1881.     int tmp_status;
  1882.  
  1883.     if (tcp_socket_collection_p)
  1884.     {
  1885.     BMI_socket_collection_remove(tcp_socket_collection_p, map);
  1886.     /* perform a test to force the socket collection to act on the remove
  1887.      * request before continuing
  1888.      */
  1889.         if(!sc_test_busy)
  1890.         {
  1891.             BMI_socket_collection_testglobal(tcp_socket_collection_p,
  1892.                 0, &tmp_outcount, &tmp_addr, &tmp_status, 0);
  1893.         }
  1894.     }
  1895.  
  1896.     tcp_shutdown_addr(map);
  1897.     tcp_cleanse_addr(map, error_code);
  1898.     tcp_addr_data->addr_error = error_code;
  1899.     if (dealloc_flag)
  1900.     {
  1901.     dealloc_tcp_method_addr(map);
  1902.     }
  1903.     else
  1904.     {
  1905.         /* this will cause the bmi control layer to check to see if 
  1906.          * this address can be completely forgotten
  1907.          */
  1908.         bmi_method_addr_forget_callback(bmi_addr);
  1909.     }
  1910.     return;
  1911. };
  1912.  
  1913. /******************************************************************
  1914.  * Internal support functions
  1915.  */
  1916.  
  1917.  
  1918. /*
  1919.  * dealloc_tcp_method_addr()
  1920.  *
  1921.  * destroys method address structures generated by the TCP/IP module.
  1922.  *
  1923.  * no return value
  1924.  */
  1925. static void dealloc_tcp_method_addr(bmi_method_addr_p map)
  1926. {
  1927.  
  1928.     struct tcp_addr *tcp_addr_data = NULL;
  1929.  
  1930.     tcp_addr_data = map->method_data;
  1931.     /* close the socket, as long as it is not the one we are listening on
  1932.      * as a server.
  1933.      */
  1934.     if (!tcp_addr_data->server_port)
  1935.     {
  1936.     if (tcp_addr_data->socket > -1)
  1937.     {
  1938.         close(tcp_addr_data->socket);
  1939.     }
  1940.     }
  1941.  
  1942.     if (tcp_addr_data->hostname)
  1943.     free(tcp_addr_data->hostname);
  1944.     if (tcp_addr_data->peer)
  1945.         free(tcp_addr_data->peer);
  1946.  
  1947.     bmi_dealloc_method_addr(map);
  1948.  
  1949.     return;
  1950. }
  1951.  
  1952.  
  1953. /*
  1954.  * alloc_tcp_method_addr()
  1955.  *
  1956.  * creates a new method address with defaults filled in for TCP/IP.
  1957.  *
  1958.  * returns pointer to struct on success, NULL on failure
  1959.  */
  1960. bmi_method_addr_p alloc_tcp_method_addr(void)
  1961. {
  1962.  
  1963.     struct bmi_method_addr *my_method_addr = NULL;
  1964.     struct tcp_addr *tcp_addr_data = NULL;
  1965.  
  1966.     my_method_addr =
  1967.     bmi_alloc_method_addr(tcp_method_params.method_id, sizeof(struct tcp_addr));
  1968.     if (!my_method_addr)
  1969.     {
  1970.     return (NULL);
  1971.     }
  1972.  
  1973.     /* note that we trust the alloc_method_addr() function to have zeroed
  1974.      * out the structures for us already 
  1975.      */
  1976.  
  1977.     tcp_addr_data = my_method_addr->method_data;
  1978.     tcp_addr_data->socket = -1;
  1979.     tcp_addr_data->port = -1;
  1980.     tcp_addr_data->map = my_method_addr;
  1981.     tcp_addr_data->sc_index = -1;
  1982.  
  1983.     return (my_method_addr);
  1984. }
  1985.  
  1986.  
  1987. /*
  1988.  * tcp_server_init()
  1989.  *
  1990.  * this function is used to prepare a node to recieve incoming
  1991.  * connections if it is initialized in a server configuration.   
  1992.  *
  1993.  * returns 0 on succes, -errno on failure
  1994.  */
  1995. static int tcp_server_init(void)
  1996. {
  1997.  
  1998.     int oldfl = 0;        /* old socket flags */
  1999.     struct tcp_addr *tcp_addr_data = NULL;
  2000.     int tmp_errno = bmi_tcp_errno_to_pvfs(-EINVAL);
  2001.     int ret = 0;
  2002.  
  2003.     /* create a socket */
  2004.     tcp_addr_data = tcp_method_params.listen_addr->method_data;
  2005.     if ((tcp_addr_data->socket = BMI_sockio_new_sock()) < 0)
  2006.     {
  2007.     tmp_errno = errno;
  2008.     gossip_err("Error: BMI_sockio_new_sock: %s\n", strerror(tmp_errno));
  2009.     return (bmi_tcp_errno_to_pvfs(-tmp_errno));
  2010.     }
  2011.  
  2012.     /* set it to non-blocking operation */
  2013.     oldfl = fcntl(tcp_addr_data->socket, F_GETFL, 0);
  2014.     if (!(oldfl & O_NONBLOCK))
  2015.     {
  2016.     fcntl(tcp_addr_data->socket, F_SETFL, oldfl | O_NONBLOCK);
  2017.     }
  2018.  
  2019.     /* setup for a fast restart to avoid bind addr in use errors */
  2020.     BMI_sockio_set_sockopt(tcp_addr_data->socket, SO_REUSEADDR, 1);
  2021.  
  2022.     /* bind it to the appropriate port */
  2023.     if(tcp_method_params.method_flags & BMI_TCP_BIND_SPECIFIC)
  2024.     {
  2025.         ret = BMI_sockio_bind_sock_specific(tcp_addr_data->socket,
  2026.             tcp_addr_data->hostname,
  2027.             tcp_addr_data->port);
  2028.         /* NOTE: this particular function converts errno in advance */
  2029.         if(ret < 0)
  2030.         {
  2031.             PVFS_perror_gossip("BMI_sockio_bind_sock_specific", ret);
  2032.             return(ret);
  2033.         }
  2034.     }
  2035.     else
  2036.     {
  2037.         ret = BMI_sockio_bind_sock(tcp_addr_data->socket,
  2038.             tcp_addr_data->port);
  2039.     }
  2040.     
  2041.     if (ret < 0)
  2042.     {
  2043.     tmp_errno = errno;
  2044.     gossip_err("Error: BMI_sockio_bind_sock: %s\n", strerror(tmp_errno));
  2045.     return (bmi_tcp_errno_to_pvfs(-tmp_errno));
  2046.     }
  2047.  
  2048.     /* go ahead and listen to the socket */
  2049.     if (listen(tcp_addr_data->socket, TCP_BACKLOG) != 0)
  2050.     {
  2051.     tmp_errno = errno;
  2052.     gossip_err("Error: listen: %s\n", strerror(tmp_errno));
  2053.     return (bmi_tcp_errno_to_pvfs(-tmp_errno));
  2054.     }
  2055.  
  2056.     return (0);
  2057. }
  2058.  
  2059.  
  2060. /* find_recv_inflight()
  2061.  *
  2062.  * checks to see if there is a recv operation in flight (when in flight
  2063.  * means that some of the data or envelope has been read) for a 
  2064.  * particular address. 
  2065.  *
  2066.  * returns pointer to operation on success, NULL if nothing found.
  2067.  */
  2068. static method_op_p find_recv_inflight(bmi_method_addr_p map)
  2069. {
  2070.     struct op_list_search_key key;
  2071.     method_op_p query_op = NULL;
  2072.  
  2073.     memset(&key, 0, sizeof(struct op_list_search_key));
  2074.     key.method_addr = map;
  2075.     key.method_addr_yes = 1;
  2076.  
  2077.     query_op = op_list_search(op_list_array[IND_RECV_INFLIGHT], &key);
  2078.  
  2079.     return (query_op);
  2080. }
  2081.  
  2082.  
  2083. /* tcp_sock_init()
  2084.  *
  2085.  * this is an internal function which is used to build up a TCP/IP
  2086.  * connection in the situation of a client side operation.
  2087.  * addressing information to determine which fields need to be set.
  2088.  * If the connection is already established then it does no work.
  2089.  *
  2090.  * NOTE: this is safe to call repeatedly.  However, always check the
  2091.  * value of the not_connected field in the tcp address before using the
  2092.  * address.
  2093.  *
  2094.  * returns 0 on success, -errno on failure
  2095.  */
  2096. static int tcp_sock_init(bmi_method_addr_p my_method_addr)
  2097. {
  2098.  
  2099.     int oldfl = 0;        /* socket flags */
  2100.     int ret = -1;
  2101.     struct pollfd poll_conn;
  2102.     struct tcp_addr *tcp_addr_data = my_method_addr->method_data;
  2103.     int tmp_errno = 0;
  2104.  
  2105.     /* check for obvious problems */
  2106.     assert(my_method_addr);
  2107.     assert(my_method_addr->method_type == tcp_method_params.method_id);
  2108.     assert(tcp_addr_data->server_port == 0);
  2109.  
  2110.     /* fail immediately if the address is in failure mode and we have no way
  2111.      * to reconnect
  2112.      */
  2113.     if(tcp_addr_data->addr_error && tcp_addr_data->dont_reconnect)
  2114.     {
  2115.     gossip_debug(GOSSIP_BMI_DEBUG_TCP, 
  2116.     "Warning: BMI communication attempted on an address in failure mode.\n");
  2117.     return(tcp_addr_data->addr_error);
  2118.     }
  2119.  
  2120.     if(tcp_addr_data->addr_error)
  2121.     {
  2122.         gossip_debug(GOSSIP_BMI_DEBUG_TCP, "%s: attempting reconnect.\n",
  2123.           __func__);
  2124.     tcp_addr_data->addr_error = 0;
  2125.     assert(tcp_addr_data->socket < 0);
  2126.     tcp_addr_data->not_connected = 1;
  2127.     }
  2128.  
  2129.     /* is there already a socket? */
  2130.     if (tcp_addr_data->socket > -1)
  2131.     {
  2132.     /* check to see if we still need to work on the connect.. */
  2133.     if (tcp_addr_data->not_connected)
  2134.     {
  2135.         /* this is a little weird, but we complete the nonblocking
  2136.          * connection by polling */
  2137.         poll_conn.fd = tcp_addr_data->socket;
  2138.         poll_conn.events = POLLOUT;
  2139.         ret = poll(&poll_conn, 1, 2);
  2140.         if ((ret < 0) || (poll_conn.revents & POLLERR))
  2141.         {
  2142.         tmp_errno = errno;
  2143.         gossip_lerr("Error: poll: %s\n", strerror(tmp_errno));
  2144.         return (bmi_tcp_errno_to_pvfs(-tmp_errno));
  2145.         }
  2146.         if (poll_conn.revents & POLLOUT)
  2147.         {
  2148.         tcp_addr_data->not_connected = 0;
  2149.         }
  2150.     }
  2151.     /* return.  the caller should check the "not_connected" flag to
  2152.      * see if the socket is usable yet. */
  2153.     return (0);
  2154.     }
  2155.     
  2156.     bmi_set_sock_buffers(tcp_addr_data->socket);
  2157.  
  2158.     /* at this point there is no socket.  try to build it */
  2159.     if (tcp_addr_data->port < 1)
  2160.     {
  2161.     return (bmi_tcp_errno_to_pvfs(-EINVAL));
  2162.     }
  2163.  
  2164.     /* make a socket */
  2165.     if ((tcp_addr_data->socket = BMI_sockio_new_sock()) < 0)
  2166.     {
  2167.     tmp_errno = errno;
  2168.     return (bmi_tcp_errno_to_pvfs(-tmp_errno));
  2169.     }
  2170.  
  2171.     /* set it to non-blocking operation */
  2172.     oldfl = fcntl(tcp_addr_data->socket, F_GETFL, 0);
  2173.     if (!(oldfl & O_NONBLOCK))
  2174.     {
  2175.     fcntl(tcp_addr_data->socket, F_SETFL, oldfl | O_NONBLOCK);
  2176.     }
  2177.  
  2178. #if defined(USE_TRUSTED) && defined(__PVFS2_CLIENT__)
  2179.     /* make sure if we need to bind or not to some local port ranges */
  2180.     tcp_enable_trusted(tcp_addr_data);
  2181. #endif
  2182.  
  2183.     /* turn off Nagle's algorithm */
  2184.     if (BMI_sockio_set_tcpopt(tcp_addr_data->socket, TCP_NODELAY, 1) < 0)
  2185.     {
  2186.     tmp_errno = errno;
  2187.     gossip_lerr("Error: failed to set TCP_NODELAY option.\n");
  2188.     close(tcp_addr_data->socket);
  2189.     return (bmi_tcp_errno_to_pvfs(-tmp_errno));
  2190.     }
  2191.  
  2192.        bmi_set_sock_buffers(tcp_addr_data->socket);
  2193.  
  2194.     if (tcp_addr_data->hostname)
  2195.     {
  2196.     gossip_ldebug(GOSSIP_BMI_DEBUG_TCP,
  2197.               "Connect: socket=%d, hostname=%s, port=%d\n",
  2198.               tcp_addr_data->socket, tcp_addr_data->hostname,
  2199.               tcp_addr_data->port);
  2200.     ret = BMI_sockio_connect_sock(tcp_addr_data->socket,
  2201.                       tcp_addr_data->hostname,
  2202.               tcp_addr_data->port);
  2203.     }
  2204.     else
  2205.     {
  2206.     return (bmi_tcp_errno_to_pvfs(-EINVAL));
  2207.     }
  2208.  
  2209.     if (ret < 0)
  2210.     {
  2211.     if (ret == -EINPROGRESS)
  2212.     {
  2213.         tcp_addr_data->not_connected = 1;
  2214.         /* this will have to be connected later with a poll */
  2215.     }
  2216.     else
  2217.     {
  2218.             /* NOTE: BMI_sockio_connect_sock returns a PVFS error */
  2219.             char buff[300];
  2220.  
  2221.             snprintf(buff, 300, "Error: BMI_sockio_connect_sock: (%s):", 
  2222.                      tcp_addr_data->hostname);
  2223.  
  2224.             PVFS_perror_gossip(buff, ret);
  2225.         return (ret);
  2226.     }
  2227.     }
  2228.  
  2229.     return (0);
  2230. }
  2231.  
  2232.  
  2233. /* enqueue_operation()
  2234.  *
  2235.  * creates a new operation based on the arguments to the function.  It
  2236.  * then makes sure that the address is added to the socket collection,
  2237.  * and the operation is added to the appropriate operation queue.
  2238.  *
  2239.  * Damn, what a big prototype!
  2240.  *
  2241.  * returns 0 on success, -errno on failure
  2242.  */
  2243. static int enqueue_operation(op_list_p target_list,
  2244.                  enum bmi_op_type send_recv,
  2245.                  bmi_method_addr_p map,
  2246.                  void *const *buffer_list,
  2247.                  const bmi_size_t *size_list,
  2248.                  int list_count,
  2249.                  bmi_size_t amt_complete,
  2250.                  bmi_size_t env_amt_complete,
  2251.                  bmi_op_id_t * id,
  2252.                  int tcp_op_state,
  2253.                  struct tcp_msg_header header,
  2254.                  void *user_ptr,
  2255.                  bmi_size_t actual_size,
  2256.                  bmi_size_t expected_size,
  2257.                  bmi_context_id context_id,
  2258.                              int32_t eid)
  2259. {
  2260.     method_op_p new_method_op = NULL;
  2261.     struct tcp_op *tcp_op_data = NULL;
  2262.     struct tcp_addr* tcp_addr_data = NULL;
  2263.     int i;
  2264.  
  2265.     /* allocate the operation structure */
  2266.     new_method_op = alloc_tcp_method_op();
  2267.     if (!new_method_op)
  2268.     {
  2269.     return (bmi_tcp_errno_to_pvfs(-ENOMEM));
  2270.     }
  2271.  
  2272.     *id = new_method_op->op_id;
  2273.     new_method_op->event_id = eid;
  2274.  
  2275.     /* set the fields */
  2276.     new_method_op->send_recv = send_recv;
  2277.     new_method_op->addr = map;
  2278.     new_method_op->user_ptr = user_ptr;
  2279.     /* this is on purpose; we want to use the buffer_list all of
  2280.      * the time, no special case for one contig buffer
  2281.      */
  2282.     new_method_op->buffer = NULL;
  2283.     new_method_op->actual_size = actual_size;
  2284.     new_method_op->expected_size = expected_size;
  2285.     new_method_op->send_recv = send_recv;
  2286.     new_method_op->amt_complete = amt_complete;
  2287.     new_method_op->env_amt_complete = env_amt_complete;
  2288.     new_method_op->msg_tag = header.tag;
  2289.     new_method_op->mode = header.mode;
  2290.     new_method_op->list_count = list_count;
  2291.     new_method_op->context_id = context_id;
  2292.  
  2293.     /* set our current position in list processing */
  2294.     i=0;
  2295.     new_method_op->list_index = 0;
  2296.     new_method_op->cur_index_complete = 0;
  2297.     while(amt_complete > 0)
  2298.     {
  2299.     if(amt_complete >= size_list[i])
  2300.     {
  2301.         amt_complete -= size_list[i];
  2302.         new_method_op->list_index++;
  2303.         i++;
  2304.     }
  2305.     else
  2306.     {
  2307.         new_method_op->cur_index_complete = amt_complete;
  2308.         amt_complete = 0;
  2309.     }
  2310.     }
  2311.  
  2312.     tcp_op_data = new_method_op->method_data;
  2313.     tcp_op_data->tcp_op_state = tcp_op_state;
  2314.     tcp_op_data->env = header;
  2315.  
  2316.     /* if there is only one item in the list, then keep the list stored
  2317.      * in the op structure.  This allows us to use the same code for send
  2318.      * and recv as we use for send_list and recv_list, without having to 
  2319.      * malloc lists for those special cases
  2320.      */
  2321.     if (list_count == 1)
  2322.     {
  2323.     new_method_op->buffer_list = &tcp_op_data->buffer_list_stub;
  2324.     new_method_op->size_list = &tcp_op_data->size_list_stub;
  2325.     ((void**)new_method_op->buffer_list)[0] = buffer_list[0];
  2326.     ((bmi_size_t*)new_method_op->size_list)[0] = size_list[0];
  2327.     }
  2328.     else
  2329.     {
  2330.     new_method_op->size_list = size_list;
  2331.     new_method_op->buffer_list = buffer_list;
  2332.     }
  2333.  
  2334.     tcp_addr_data = map->method_data;
  2335.  
  2336.     if(tcp_addr_data->addr_error)
  2337.     {
  2338.     /* server should always fail here, client should let receives queue
  2339.      * as if nothing were wrong
  2340.      */
  2341.     if(tcp_addr_data->dont_reconnect || send_recv == BMI_SEND)
  2342.     {
  2343.         gossip_debug(GOSSIP_BMI_DEBUG_TCP, 
  2344.                "Warning: BMI communication attempted on an "
  2345.                "address in failure mode.\n");
  2346.         new_method_op->error_code = tcp_addr_data->addr_error;
  2347.         op_list_add(op_list_array[new_method_op->context_id],
  2348.             new_method_op);
  2349.         return(tcp_addr_data->addr_error);
  2350.     }
  2351.     }
  2352.  
  2353. #if 0
  2354.     if(tcp_addr_data->addr_error)
  2355.     {
  2356.         /* this address is bad, don't try to do anything with it */
  2357.         gossip_err("Warning: BMI communication attempted on an "
  2358.                    "address in failure mode.\n");
  2359.  
  2360.         new_method_op->error_code = tcp_addr_data->addr_error;
  2361.         op_list_add(op_list_array[new_method_op->context_id],
  2362.                     new_method_op);
  2363.         return(tcp_addr_data->addr_error);
  2364.     }
  2365. #endif
  2366.  
  2367.     /* add the socket to poll on */
  2368.     BMI_socket_collection_add(tcp_socket_collection_p, map);
  2369.     if(send_recv == BMI_SEND)
  2370.     {
  2371.         BMI_socket_collection_add_write_bit(tcp_socket_collection_p, map);
  2372.     }
  2373.  
  2374.     /* keep up with the operation */
  2375.     op_list_add(target_list, new_method_op);
  2376.  
  2377.     return (0);
  2378. }
  2379.  
  2380.  
  2381. /* tcp_post_recv_generic()
  2382.  *
  2383.  * does the real work of posting an operation - works for both
  2384.  * eager and rendezvous messages
  2385.  *
  2386.  * returns 0 on success that requires later poll, returns 1 on instant
  2387.  * completion, -errno on failure
  2388.  */
  2389. static int tcp_post_recv_generic(bmi_op_id_t * id,
  2390.                                  bmi_method_addr_p src,
  2391.                                  void *const *buffer_list,
  2392.                                  const bmi_size_t *size_list,
  2393.                                  int list_count,
  2394.                                  bmi_size_t expected_size,
  2395.                                  bmi_size_t * actual_size,
  2396.                                  enum bmi_buffer_type buffer_type,
  2397.                                  bmi_msg_tag_t tag,
  2398.                                  void *user_ptr,
  2399.                                  bmi_context_id context_id,
  2400.                                  PVFS_hint hints)
  2401. {
  2402.     method_op_p query_op = NULL;
  2403.     int ret = -1;
  2404.     struct tcp_addr *tcp_addr_data = NULL;
  2405.     struct tcp_op *tcp_op_data = NULL;
  2406.     struct tcp_msg_header bogus_header;
  2407.     struct op_list_search_key key;
  2408.     bmi_size_t copy_size = 0;
  2409.     bmi_size_t total_copied = 0;
  2410.     int i;
  2411.     PINT_event_id eid = 0;
  2412.  
  2413.     PINT_EVENT_START(
  2414.         bmi_tcp_recv_event_id, bmi_tcp_pid, NULL, &eid,
  2415.         PINT_HINT_GET_CLIENT_ID(hints),
  2416.         PINT_HINT_GET_REQUEST_ID(hints),
  2417.         PINT_HINT_GET_RANK(hints),
  2418.         PINT_HINT_GET_HANDLE(hints),
  2419.         PINT_HINT_GET_OP_ID(hints),
  2420.         expected_size);
  2421.  
  2422.     tcp_addr_data = src->method_data;
  2423.  
  2424.     /* short out immediately if the address is bad and we have no way to
  2425.      * reconnect
  2426.      */
  2427.     if(tcp_addr_data->addr_error && tcp_addr_data->dont_reconnect)
  2428.     {
  2429.         gossip_debug(
  2430.             GOSSIP_BMI_DEBUG_TCP,
  2431.             "Warning: BMI communication attempted "
  2432.             "on an address in failure mode.\n");
  2433.         return(tcp_addr_data->addr_error);
  2434.     }
  2435.  
  2436.     /* lets make sure that the message hasn't already been fully
  2437.      * buffered in eager mode before doing anything else
  2438.      */
  2439.     memset(&key, 0, sizeof(struct op_list_search_key));
  2440.     key.method_addr = src;
  2441.     key.method_addr_yes = 1;
  2442.     key.msg_tag = tag;
  2443.     key.msg_tag_yes = 1;
  2444.  
  2445.     query_op =
  2446.         op_list_search(op_list_array[IND_RECV_EAGER_DONE_BUFFERING], &key);
  2447.     if (query_op)
  2448.     {
  2449.         /* make sure it isn't too big */
  2450.         if (query_op->actual_size > expected_size)
  2451.         {
  2452.             gossip_err("Error: message ordering violation;\n");
  2453.             gossip_err("Error: message too large for next buffer.\n");
  2454.             return (bmi_tcp_errno_to_pvfs(-EPROTO));
  2455.         }
  2456.  
  2457.         /* whoohoo- it is already done! */
  2458.         /* copy buffer out to list segments; handle short case */
  2459.         for (i = 0; i < list_count; i++)
  2460.         {
  2461.             copy_size = size_list[i];
  2462.             if (copy_size + total_copied > query_op->actual_size)
  2463.             {
  2464.                 copy_size = query_op->actual_size - total_copied;
  2465.             }
  2466.             memcpy(buffer_list[i], (void *) ((char *) query_op->buffer +
  2467.                                              total_copied), copy_size);
  2468.             total_copied += copy_size;
  2469.             if (total_copied == query_op->actual_size)
  2470.             {
  2471.                 break;
  2472.             }
  2473.         }
  2474.         /* copy out to correct memory regions */
  2475.         (*actual_size) = query_op->actual_size;
  2476.         free(query_op->buffer);
  2477.         *id = 0;
  2478.         op_list_remove(query_op);
  2479.         dealloc_tcp_method_op(query_op);
  2480.         PINT_EVENT_END(bmi_tcp_recv_event_id, bmi_tcp_pid, NULL, eid, 0,
  2481.                        *actual_size);
  2482.  
  2483.         return (1);
  2484.     }
  2485.  
  2486.     /* look for a message that is already being received */
  2487.     query_op = op_list_search(op_list_array[IND_RECV_INFLIGHT], &key);
  2488.     if (query_op)
  2489.     {
  2490.         tcp_op_data = query_op->method_data;
  2491.     }
  2492.  
  2493.     /* see if it is being buffered into a temporary memory region */
  2494.     if (query_op && tcp_op_data->tcp_op_state == BMI_TCP_BUFFERING)
  2495.     {
  2496.         /* make sure it isn't too big */
  2497.         if (query_op->actual_size > expected_size)
  2498.         {
  2499.             gossip_err("Error: message ordering violation;\n");
  2500.             gossip_err("Error: message too large for next buffer.\n");
  2501.             return (bmi_tcp_errno_to_pvfs(-EPROTO));
  2502.         }
  2503.  
  2504.         /* copy what we have so far into the correct buffers */
  2505.         total_copied = 0;
  2506.         for (i = 0; i < list_count; i++)
  2507.         {
  2508.             copy_size = size_list[i];
  2509.             if (copy_size + total_copied > query_op->amt_complete)
  2510.             {
  2511.                 copy_size = query_op->amt_complete - total_copied;
  2512.             }
  2513.             if (copy_size > 0)
  2514.             {
  2515.                 memcpy(buffer_list[i], (void *) ((char *) query_op->buffer +
  2516.                                                  total_copied), copy_size);
  2517.             }
  2518.             total_copied += copy_size;
  2519.             if (total_copied == query_op->amt_complete)
  2520.             {
  2521.                 query_op->list_index = i;
  2522.                 query_op->cur_index_complete = copy_size;
  2523.                 break;
  2524.             }
  2525.         }
  2526.  
  2527.         /* see if we ended on a buffer boundary */
  2528.         if (query_op->cur_index_complete ==
  2529.             query_op->size_list[query_op->list_index])
  2530.         {
  2531.             query_op->list_index++;
  2532.             query_op->cur_index_complete = 0;
  2533.         }
  2534.  
  2535.         /* release the old buffer */
  2536.         if (query_op->buffer)
  2537.         {
  2538.             free(query_op->buffer);
  2539.         }
  2540.  
  2541.         *id = query_op->op_id;
  2542.         tcp_op_data = query_op->method_data;
  2543.         tcp_op_data->tcp_op_state = BMI_TCP_INPROGRESS;
  2544.  
  2545.         query_op->list_count = list_count;
  2546.         query_op->user_ptr = user_ptr;
  2547.         query_op->context_id = context_id;
  2548.         /* if there is only one item in the list, then keep the list stored
  2549.          * in the op structure.  This allows us to use the same code for send
  2550.          * and recv as we use for send_list and recv_list, without having to 
  2551.          * malloc lists for those special cases
  2552.          */
  2553.         if (list_count == 1)
  2554.         {
  2555.             query_op->buffer_list = &tcp_op_data->buffer_list_stub;
  2556.             query_op->size_list = &tcp_op_data->size_list_stub;
  2557.             ((void **)query_op->buffer_list)[0] = buffer_list[0];
  2558.             ((bmi_size_t *)query_op->size_list)[0] = size_list[0];
  2559.         }
  2560.         else
  2561.         {
  2562.             query_op->buffer_list = buffer_list;
  2563.             query_op->size_list = size_list;
  2564.         }
  2565.  
  2566.         if (query_op->amt_complete < query_op->actual_size)
  2567.         {
  2568.             /* try to recv some more data */
  2569.             tcp_addr_data = query_op->addr->method_data;
  2570.             ret = payload_progress(tcp_addr_data->socket,
  2571.                                    query_op->buffer_list,
  2572.                                    query_op->size_list,
  2573.                                    query_op->list_count,
  2574.                                    query_op->actual_size,
  2575.                                    &(query_op->list_index),
  2576.                                    &(query_op->cur_index_complete),
  2577.                                    BMI_RECV,
  2578.                                    NULL,
  2579.                                    0);
  2580.             if (ret < 0)
  2581.             {
  2582.                 PVFS_perror_gossip("Error: payload_progress", ret);
  2583.                 /* payload_progress() returns BMI error codes */
  2584.                 tcp_forget_addr(query_op->addr, 0, ret);
  2585.                 return (ret);
  2586.             }
  2587.  
  2588.             query_op->amt_complete += ret;
  2589.         }
  2590.         assert(query_op->amt_complete <= query_op->actual_size);
  2591.         if (query_op->amt_complete == query_op->actual_size)
  2592.         {
  2593.             /* we are done */
  2594.             op_list_remove(query_op);
  2595.             *id = 0;
  2596.             (*actual_size) = query_op->actual_size;
  2597.             dealloc_tcp_method_op(query_op);
  2598.             PINT_EVENT_END(
  2599.                 bmi_tcp_recv_event_id, bmi_tcp_pid, NULL, eid,
  2600.                 0, *actual_size);
  2601.  
  2602.             return (1);
  2603.         }
  2604.         else
  2605.         {
  2606.             /* there is still more work to do */
  2607.             tcp_op_data->tcp_op_state = BMI_TCP_INPROGRESS;
  2608.             return (0);
  2609.         }
  2610.     }
  2611.  
  2612.     /* NOTE: if the message was in flight, but not buffering, then
  2613.      * that means that it has already matched an earlier receive
  2614.      * post or else is an unexpected message that doesn't require a
  2615.      * matching receive post - at any rate it shouldn't be handled
  2616.      * here
  2617.      */
  2618.  
  2619.     /* if we hit this point we must enqueue */
  2620.     if (expected_size <= TCP_MODE_EAGER_LIMIT)
  2621.     {
  2622.         bogus_header.mode = TCP_MODE_EAGER;
  2623.     }
  2624.     else
  2625.     {
  2626.         bogus_header.mode = TCP_MODE_REND;
  2627.     }
  2628.     bogus_header.tag = tag;
  2629.     ret = enqueue_operation(op_list_array[IND_RECV],
  2630.                             BMI_RECV, src, buffer_list, size_list,
  2631.                             list_count, 0, 0, id, BMI_TCP_INPROGRESS,
  2632.                             bogus_header, user_ptr, 0,
  2633.                             expected_size, context_id, eid);
  2634.     /* just for safety; this field isn't valid to the caller anymore */
  2635.     (*actual_size) = 0;
  2636.     /* TODO: figure out why this causes deadlocks; observable in 2
  2637.      * scenarios:
  2638.      * - pvfs2-client-core with threaded library and nptl
  2639.      * - pvfs2-server threaded with nptl sending messages to itself
  2640.      */
  2641. #if 0
  2642.     if (ret >= 0)
  2643.     {
  2644.         /* go ahead and try to do some work while we are in this
  2645.          * function since we appear to be backlogged.  Make sure that
  2646.          * we do not wait in the poll, however.
  2647.          */
  2648.         ret = tcp_do_work(0);
  2649.     }
  2650. #endif
  2651.     return (ret);
  2652. }
  2653.  
  2654.  
  2655. /* tcp_cleanse_addr()
  2656.  *
  2657.  * finds all active operations matching the given address, places them
  2658.  * in an error state, and moves them to the completed queue.
  2659.  *
  2660.  * NOTE: this function does not shut down the address.  That should be
  2661.  * handled separately
  2662.  *
  2663.  * returns 0 on success, -errno on failure
  2664.  */
  2665. static int tcp_cleanse_addr(bmi_method_addr_p map, int error_code)
  2666. {
  2667.     int i = 0;
  2668.     struct op_list_search_key key;
  2669.     method_op_p query_op = NULL;
  2670.  
  2671.     memset(&key, 0, sizeof(struct op_list_search_key));
  2672.     key.method_addr = map;
  2673.     key.method_addr_yes = 1;
  2674.  
  2675.     /* NOTE: we know the unexpected completed queue is the last index! */
  2676.     for (i = 0; i < (NUM_INDICES - 1); i++)
  2677.     {
  2678.     if (op_list_array[i])
  2679.     {
  2680.         while ((query_op = op_list_search(op_list_array[i], &key)))
  2681.         {
  2682.         op_list_remove(query_op);
  2683.         query_op->error_code = error_code;
  2684.         if (query_op->mode == TCP_MODE_UNEXP && query_op->send_recv
  2685.             == BMI_RECV)
  2686.         {
  2687.             op_list_add(op_list_array[IND_COMPLETE_RECV_UNEXP],
  2688.                 query_op);
  2689.         }
  2690.         else
  2691.         {
  2692.             ((struct tcp_op*)(query_op->method_data))->tcp_op_state = 
  2693.             BMI_TCP_COMPLETE;
  2694.             op_list_add(completion_array[query_op->context_id], query_op);
  2695.         }
  2696.         }
  2697.     }
  2698.     }
  2699.  
  2700.     return (0);
  2701. }
  2702.  
  2703.  
  2704. /* tcp_shutdown_addr()
  2705.  *
  2706.  * closes connections associated with a tcp method address
  2707.  *
  2708.  * returns 0 on success, -errno on failure
  2709.  */
  2710. static int tcp_shutdown_addr(bmi_method_addr_p map)
  2711. {
  2712.  
  2713.     struct tcp_addr *tcp_addr_data = map->method_data;
  2714.     if (tcp_addr_data->socket > -1)
  2715.     {
  2716.     close(tcp_addr_data->socket);
  2717.     }
  2718.     tcp_addr_data->socket = -1;
  2719.     tcp_addr_data->not_connected = 1;
  2720.  
  2721.     return (0);
  2722. }
  2723.  
  2724.  
  2725. /* tcp_do_work()
  2726.  *
  2727.  * this is the function that actually does communication work during
  2728.  * BMI_tcp_testXXX and BMI_tcp_waitXXX functions.  The amount of work 
  2729.  * that it does is tunable.
  2730.  *
  2731.  * returns 0 on success, -errno on failure.
  2732.  */
  2733. static int tcp_do_work(int max_idle_time)
  2734. {
  2735.     int ret = -1;
  2736.     bmi_method_addr_p addr_array[TCP_WORK_METRIC];
  2737.     int status_array[TCP_WORK_METRIC];
  2738.     int socket_count = 0;
  2739.     int i = 0;
  2740.     int stall_flag = 0;
  2741.     int busy_flag = 1;
  2742.     struct timespec req;
  2743.     struct tcp_addr* tcp_addr_data = NULL;
  2744.     struct timespec wait_time;
  2745.     struct timeval start;
  2746.  
  2747.     if(sc_test_busy)
  2748.     {
  2749.         /* another thread is already polling or working on sockets */
  2750.         if(max_idle_time == 0)
  2751.         {
  2752.             /* we don't want to spend time waiting on it; return
  2753.              * immediately.
  2754.              */
  2755.             return(0);
  2756.         }
  2757.  
  2758.         /* Sleep until working thread thread signals that it has finished
  2759.          * its work and then return.  No need for this thread to poll;
  2760.          * the other thread may have already finished what we wanted.
  2761.          * This condition wait is used strictly as a best effort to
  2762.          * prevent busy spin.  We'll sort out the results later.
  2763.          */
  2764.         gettimeofday(&start, NULL);
  2765.         wait_time.tv_sec = start.tv_sec + max_idle_time / 1000;
  2766.         wait_time.tv_nsec = (start.tv_usec + ((max_idle_time % 1000)*1000))*1000;
  2767.         if (wait_time.tv_nsec > 1000000000)
  2768.         {
  2769.             wait_time.tv_nsec = wait_time.tv_nsec - 1000000000;
  2770.             wait_time.tv_sec++;
  2771.         }
  2772.         gen_cond_timedwait(&interface_cond, &interface_mutex, &wait_time);
  2773.         return(0);
  2774.     }
  2775.  
  2776.     /* this thread has gained control of the polling.  */
  2777.     sc_test_busy = 1;
  2778.     gen_mutex_unlock(&interface_mutex);
  2779.  
  2780.     /* our turn to look at the socket collection */
  2781.     ret = BMI_socket_collection_testglobal(tcp_socket_collection_p,
  2782.                        TCP_WORK_METRIC, &socket_count,
  2783.                        addr_array, status_array,
  2784.                        max_idle_time);
  2785.  
  2786.     gen_mutex_lock(&interface_mutex);
  2787.     sc_test_busy = 0;
  2788.  
  2789.     if (ret < 0)
  2790.     {
  2791.         /* wake up anyone else who might have been waiting */
  2792.         gen_cond_broadcast(&interface_cond);
  2793.         PVFS_perror_gossip("Error: socket collection:", ret);
  2794.         /* BMI_socket_collection_testglobal() returns BMI error code */
  2795.     return (ret);
  2796.     }
  2797.  
  2798.     if(socket_count == 0)
  2799.     busy_flag = 0;
  2800.  
  2801.     /* do different kinds of work depending on results */
  2802.     for (i = 0; i < socket_count; i++)
  2803.     {
  2804.     tcp_addr_data = addr_array[i]->method_data;
  2805.     /* skip working on addresses in failure mode */
  2806.     if(tcp_addr_data->addr_error)
  2807.     {
  2808.             /* addr_error field is in BMI error code format */
  2809.         tcp_forget_addr(addr_array[i], 0, tcp_addr_data->addr_error);
  2810.         continue;
  2811.     }
  2812.  
  2813.     if (status_array[i] & SC_ERROR_BIT)
  2814.     {
  2815.         ret = tcp_do_work_error(addr_array[i]);
  2816.         if (ret < 0)
  2817.         {
  2818.                 PVFS_perror_gossip("Warning: BMI error handling failure, continuing", ret);
  2819.         }
  2820.     }
  2821.     else
  2822.     {
  2823.         if (status_array[i] & SC_WRITE_BIT)
  2824.         {
  2825.         ret = tcp_do_work_send(addr_array[i], &stall_flag);
  2826.         if (ret < 0)
  2827.         {
  2828.                     PVFS_perror_gossip("Warning: BMI send error, continuing", ret);
  2829.                 }
  2830.         if(!stall_flag)
  2831.             busy_flag = 0;
  2832.         }
  2833.         if (status_array[i] & SC_READ_BIT)
  2834.         {
  2835.         ret = tcp_do_work_recv(addr_array[i], &stall_flag);
  2836.         if (ret < 0)
  2837.         {
  2838.                     PVFS_perror_gossip("Warning: BMI recv error, continuing", ret);
  2839.         }
  2840.         if(!stall_flag)
  2841.             busy_flag = 0;
  2842.         }
  2843.     }
  2844.     }
  2845.  
  2846.     /* IMPORTANT NOTE: if we have set the following flag, then it indicates that
  2847.      * poll() is finding data on our sockets, yet we are not able to move
  2848.      * any of it right now.  This means that the sockets are backlogged, and
  2849.      * BMI is in danger of busy spinning during test functions.  Let's sleep
  2850.      * for a millisecond here in hopes of letting the rest of the system
  2851.      * catch up somehow (either by clearing a backlog in another I/O
  2852.      * component, or by posting more matching BMI recieve operations)
  2853.      */
  2854.     if(busy_flag)
  2855.     {
  2856.     req.tv_sec = 0;
  2857.     req.tv_nsec = 1000;
  2858.         gen_mutex_unlock(&interface_mutex);
  2859.     nanosleep(&req, NULL);
  2860.         gen_mutex_lock(&interface_mutex);
  2861.     }
  2862.  
  2863.     /* wake up anyone else who might have been waiting */
  2864.     gen_cond_broadcast(&interface_cond);
  2865.     return (0);
  2866. }
  2867.  
  2868.  
  2869. /* tcp_do_work_send()
  2870.  *
  2871.  * does work on a TCP address that is ready to send data.
  2872.  *
  2873.  * returns 0 on success, -errno on failure
  2874.  */
  2875. static int tcp_do_work_send(bmi_method_addr_p map, int* stall_flag)
  2876. {
  2877.     method_op_p active_method_op = NULL;
  2878.     struct op_list_search_key key;
  2879.     int blocked_flag = 0;
  2880.     int ret = 0;
  2881.     int tmp_stall_flag;
  2882.  
  2883.     *stall_flag = 1;
  2884.  
  2885.     while (blocked_flag == 0 && ret == 0)
  2886.     {
  2887.     /* what we want to do here is find the first operation in the send
  2888.      * queue for this address.
  2889.      */
  2890.     memset(&key, 0, sizeof(struct op_list_search_key));
  2891.     key.method_addr = map;
  2892.     key.method_addr_yes = 1;
  2893.     active_method_op = op_list_search(op_list_array[IND_SEND], &key);
  2894.     if (!active_method_op)
  2895.     {
  2896.         /* ran out of queued sends to work on */
  2897.         return (0);
  2898.     }
  2899.  
  2900.     ret = work_on_send_op(active_method_op, &blocked_flag, &tmp_stall_flag);
  2901.     if(!tmp_stall_flag)
  2902.         *stall_flag = 0;
  2903.     }
  2904.  
  2905.     return (ret);
  2906. }
  2907.  
  2908.  
  2909. /* handle_new_connection()
  2910.  *
  2911.  * this function should be called only on special tcp method addresses
  2912.  * that represent local server ports.  It will attempt to accept a new
  2913.  * connection and create a new method address for the remote host.
  2914.  *
  2915.  * side effect: destroys the temporary method_address that is passed in
  2916.  * to it.
  2917.  *
  2918.  * returns 0 on success, -errno on failure
  2919.  */
  2920. static int handle_new_connection(bmi_method_addr_p map)
  2921. {
  2922.     struct tcp_addr *tcp_addr_data = NULL;
  2923.     int accepted_socket = -1;
  2924.     bmi_method_addr_p new_addr = NULL;
  2925.     int ret = -1;
  2926.     char* tmp_peer = NULL;
  2927.  
  2928.     ret = tcp_accept_init(&accepted_socket, &tmp_peer);
  2929.     if (ret < 0)
  2930.     {
  2931.     return (ret);
  2932.     }
  2933.     if (accepted_socket < 0)
  2934.     {
  2935.     /* guess it wasn't ready after all */
  2936.     return (0);
  2937.     }
  2938.  
  2939.     /* ok, we have a new socket.  what now?  Probably simplest
  2940.      * thing to do is to create a new method_addr, add it to the
  2941.      * socket collection, and return.  It will get caught the next
  2942.      * time around */
  2943.     new_addr = alloc_tcp_method_addr();
  2944.     if (!new_addr)
  2945.     {
  2946.     return (bmi_tcp_errno_to_pvfs(-ENOMEM));
  2947.     }
  2948.     gossip_ldebug(GOSSIP_BMI_DEBUG_TCP,
  2949.                   "Assigning socket %d to new method addr.\n",
  2950.           accepted_socket);
  2951.     tcp_addr_data = new_addr->method_data;
  2952.     tcp_addr_data->socket = accepted_socket;
  2953.     tcp_addr_data->peer = tmp_peer;
  2954.     tcp_addr_data->peer_type = BMI_TCP_PEER_IP;
  2955.  
  2956.     /* set a flag to make sure that we never try to reconnect this address
  2957.      * in the future
  2958.      */
  2959.     tcp_addr_data->dont_reconnect = 1;
  2960.     /* register this address with the method control layer */
  2961.     tcp_addr_data->bmi_addr = bmi_method_addr_reg_callback(new_addr);
  2962.     if (ret < 0)
  2963.     {
  2964.     tcp_shutdown_addr(new_addr);
  2965.     dealloc_tcp_method_addr(new_addr);
  2966.     dealloc_tcp_method_addr(map);
  2967.     return (ret);
  2968.     }
  2969.     BMI_socket_collection_add(tcp_socket_collection_p, new_addr);
  2970.  
  2971.     dealloc_tcp_method_addr(map);
  2972.     return (0);
  2973.  
  2974. }
  2975.  
  2976.  
  2977. /* tcp_do_work_recv()
  2978.  * 
  2979.  * does work on a TCP address that is ready to recv data.
  2980.  *
  2981.  * returns 0 on success, -errno on failure
  2982.  */
  2983. static int tcp_do_work_recv(bmi_method_addr_p map, int* stall_flag)
  2984. {
  2985.  
  2986.     method_op_p active_method_op = NULL;
  2987.     int ret = -1;
  2988.     void *new_buffer = NULL;
  2989.     struct op_list_search_key key;
  2990.     struct tcp_msg_header new_header;
  2991.     struct tcp_addr *tcp_addr_data = map->method_data;
  2992.     struct tcp_op *tcp_op_data = NULL;
  2993.     int tmp_errno;
  2994.     int tmp;
  2995.     bmi_size_t old_amt_complete = 0;
  2996.     time_t current_time;
  2997.  
  2998.     *stall_flag = 1;
  2999.  
  3000.     /* figure out if this is a new connection */
  3001.     if (tcp_addr_data->server_port)
  3002.     {
  3003.     /* just try to accept connection- no work yet */
  3004.     *stall_flag = 0;
  3005.     return (handle_new_connection(map));
  3006.     }
  3007.  
  3008.     /* look for a recv for this address that is already in flight */
  3009.     active_method_op = find_recv_inflight(map);
  3010.     /* see if we found one in progress... */
  3011.     if (active_method_op)
  3012.     {
  3013.     tcp_op_data = active_method_op->method_data;
  3014.     if (active_method_op->mode == TCP_MODE_REND &&
  3015.         tcp_op_data->tcp_op_state == BMI_TCP_BUFFERING)
  3016.     {
  3017.         /* we must wait for recv post */
  3018.         return (0);
  3019.     }
  3020.     else
  3021.     {
  3022.         old_amt_complete = active_method_op->amt_complete;
  3023.         ret = work_on_recv_op(active_method_op, stall_flag);
  3024.             gossip_debug(GOSSIP_BMI_DEBUG_TCP, "actual_size=%d, "
  3025.                          "amt_complete=%d, old_amt_complete=%d\n",
  3026.                          (int)active_method_op->actual_size,
  3027.                          (int)active_method_op->amt_complete,
  3028.                          (int)old_amt_complete);
  3029.  
  3030.         if ((ret == 0) &&
  3031.                 (old_amt_complete == active_method_op->amt_complete) &&
  3032.                 active_method_op->actual_size &&
  3033.                 (active_method_op->amt_complete <
  3034.                  active_method_op->actual_size))
  3035.         {
  3036.                 gossip_debug(
  3037.                     GOSSIP_BMI_DEBUG_TCP, "Warning: bmi_tcp unable "
  3038.                     "to recv any data reported by poll(). [1]\n");
  3039.  
  3040.                 if (tcp_addr_data->zero_read_limit++ ==
  3041.                     BMI_TCP_ZERO_READ_LIMIT)
  3042.                 {
  3043.                     gossip_debug(GOSSIP_BMI_DEBUG_TCP,
  3044.                                  "...dropping connection.\n");
  3045.                     tcp_forget_addr(map, 0, bmi_tcp_errno_to_pvfs(-EPIPE));
  3046.                 }
  3047.         }
  3048.             else
  3049.             {
  3050.                 tcp_addr_data->zero_read_limit = 0;
  3051.             }
  3052.         return(ret);
  3053.     }
  3054.     }
  3055.  
  3056.     /* let's see if a the entire header is ready to be received.  If so
  3057.      * we will go ahead and pull it.  Otherwise, we will try again later.
  3058.      * It isn't worth the complication of reading only a partial message
  3059.      * header - we really want it atomically
  3060.      */
  3061.     ret = BMI_sockio_nbpeek(tcp_addr_data->socket,
  3062.                             new_header.enc_hdr, TCP_ENC_HDR_SIZE);
  3063.     if (ret < 0)
  3064.     {
  3065.     tcp_forget_addr(map, 0, bmi_tcp_errno_to_pvfs(-errno));
  3066.     return (0);
  3067.     }
  3068.  
  3069.     if (ret == 0)
  3070.     {
  3071.         gossip_debug(
  3072.             GOSSIP_BMI_DEBUG_TCP, "Warning: bmi_tcp unable "
  3073.             "to recv any data reported by poll(). [2]\n");
  3074.  
  3075.         if (tcp_addr_data->zero_read_limit++ ==
  3076.             BMI_TCP_ZERO_READ_LIMIT)
  3077.         {
  3078.             gossip_debug(GOSSIP_BMI_DEBUG_TCP,
  3079.                          "...dropping connection.\n");
  3080.             tcp_forget_addr(map, 0, bmi_tcp_errno_to_pvfs(-EPIPE));
  3081.         }
  3082.     return(0);
  3083.     }
  3084.     else
  3085.     {
  3086.         tcp_addr_data->zero_read_limit = 0;
  3087.     }
  3088.  
  3089.     if (ret < TCP_ENC_HDR_SIZE)
  3090.     {
  3091.         current_time = time(NULL);
  3092.         if(!tcp_addr_data->short_header_timer)
  3093.         {
  3094.             tcp_addr_data->short_header_timer = current_time;
  3095.         }
  3096.         else if((current_time - tcp_addr_data->short_header_timer) > 
  3097.             BMI_TCP_HEADER_WAIT_SECONDS)
  3098.         {
  3099.         gossip_err("Error: incomplete BMI TCP header after %d seconds, closing connection.\n",
  3100.                 BMI_TCP_HEADER_WAIT_SECONDS);
  3101.             tcp_forget_addr(map, 0, bmi_tcp_errno_to_pvfs(-EPIPE));
  3102.             return (0);
  3103.         }
  3104.  
  3105.     /* header not ready yet, but we will keep hoping */
  3106.     return (0);
  3107.     }
  3108.  
  3109.     tcp_addr_data->short_header_timer = 0;
  3110.     *stall_flag = 0;
  3111.     gossip_ldebug(GOSSIP_BMI_DEBUG_TCP, "Reading header for new op.\n");
  3112.     ret = BMI_sockio_nbrecv(tcp_addr_data->socket,
  3113.                            new_header.enc_hdr, TCP_ENC_HDR_SIZE);
  3114.     if (ret < TCP_ENC_HDR_SIZE)
  3115.     {
  3116.     tmp_errno = errno;
  3117.     gossip_err("Error: BMI_sockio_nbrecv: %s\n", strerror(tmp_errno));
  3118.     tcp_forget_addr(map, 0, bmi_tcp_errno_to_pvfs(-tmp_errno));
  3119.     return (0);
  3120.     }
  3121.  
  3122.     /* decode the header */
  3123.     BMI_TCP_DEC_HDR(new_header);
  3124.  
  3125.     /* so we have the header. now what?  These are the possible
  3126.      * scenarios:
  3127.      * a) unexpected message
  3128.      * b) eager message for which a recv has been posted
  3129.      * c) eager message for which a recv has not been posted
  3130.      * d) rendezvous messsage for which a recv has been posted
  3131.      * e) rendezvous messsage for which a recv has not been posted
  3132.      * f) eager message for which a rend. recv has been posted
  3133.      */
  3134.  
  3135.     /* check magic number of message */
  3136.     if(new_header.magic_nr != BMI_MAGIC_NR)
  3137.     {
  3138.     gossip_err("Error: bad magic in BMI TCP message.\n");
  3139.     tcp_forget_addr(map, 0, bmi_tcp_errno_to_pvfs(-EBADMSG));
  3140.     return(0);
  3141.     }
  3142.  
  3143.     gossip_ldebug(GOSSIP_BMI_DEBUG_TCP, "Received new message; mode: %d.\n",
  3144.           (int) new_header.mode);
  3145.     gossip_ldebug(GOSSIP_BMI_DEBUG_TCP, "tag: %d\n", (int) new_header.tag);
  3146.  
  3147.     if (new_header.mode == TCP_MODE_UNEXP)
  3148.     {
  3149.     /* allocate the operation structure */
  3150.     active_method_op = alloc_tcp_method_op();
  3151.     if (!active_method_op)
  3152.     {
  3153.         tcp_forget_addr(map, 0, bmi_tcp_errno_to_pvfs(-ENOMEM));
  3154.         return (bmi_tcp_errno_to_pvfs(-ENOMEM));
  3155.     }
  3156.     /* create data buffer */
  3157.     new_buffer = malloc(new_header.size);
  3158.     if (!new_buffer)
  3159.     {
  3160.         dealloc_tcp_method_op(active_method_op);
  3161.         tcp_forget_addr(map, 0, bmi_tcp_errno_to_pvfs(-ENOMEM));
  3162.         return (bmi_tcp_errno_to_pvfs(-ENOMEM));
  3163.     }
  3164.  
  3165.     /* set the fields */
  3166.     active_method_op->send_recv = BMI_RECV;
  3167.     active_method_op->addr = map;
  3168.     active_method_op->actual_size = new_header.size;
  3169.     active_method_op->expected_size = 0;
  3170.     active_method_op->amt_complete = 0;
  3171.     active_method_op->env_amt_complete = TCP_ENC_HDR_SIZE;
  3172.     active_method_op->msg_tag = new_header.tag;
  3173.     active_method_op->buffer = new_buffer;
  3174.     active_method_op->mode = TCP_MODE_UNEXP;
  3175.     active_method_op->buffer_list = &(active_method_op->buffer);
  3176.     active_method_op->size_list = &(active_method_op->actual_size);
  3177.     active_method_op->list_count = 1;
  3178.     tcp_op_data = active_method_op->method_data;
  3179.     tcp_op_data->tcp_op_state = BMI_TCP_INPROGRESS;
  3180.     tcp_op_data->env = new_header;
  3181.  
  3182.     op_list_add(op_list_array[IND_RECV_INFLIGHT], active_method_op);
  3183.     /* grab some data if we can */
  3184.     return (work_on_recv_op(active_method_op, &tmp));
  3185.     }
  3186.  
  3187.     memset(&key, 0, sizeof(struct op_list_search_key));
  3188.     key.method_addr = map;
  3189.     key.method_addr_yes = 1;
  3190.     key.msg_tag = new_header.tag;
  3191.     key.msg_tag_yes = 1;
  3192.  
  3193.     /* look for a match within the posted operations */
  3194.     active_method_op = op_list_search(op_list_array[IND_RECV], &key);
  3195.  
  3196.     if (active_method_op)
  3197.     {
  3198.     /* make sure it isn't too big */
  3199.     if (new_header.size > active_method_op->expected_size)
  3200.     {
  3201.         gossip_err("Error: message ordering violation;\n");
  3202.         gossip_err("Error: message too large for next buffer.\n");
  3203.         gossip_err("Error: incoming size: %ld, expected size: %ld\n",
  3204.             (long) new_header.size,
  3205.             (long) active_method_op->expected_size);
  3206.         /* TODO: return error here or do something else? */
  3207.         return (bmi_tcp_errno_to_pvfs(-EPROTO));
  3208.     }
  3209.  
  3210.     /* we found a match.  go work on it and return */
  3211.     op_list_remove(active_method_op);
  3212.     active_method_op->env_amt_complete = TCP_ENC_HDR_SIZE;
  3213.     active_method_op->actual_size = new_header.size;
  3214.     op_list_add(op_list_array[IND_RECV_INFLIGHT], active_method_op);
  3215.     return (work_on_recv_op(active_method_op, &tmp));
  3216.     }
  3217.  
  3218.     /* no match anywhere.  Start a new operation */
  3219.     /* allocate the operation structure */
  3220.     active_method_op = alloc_tcp_method_op();
  3221.     if (!active_method_op)
  3222.     {
  3223.     tcp_forget_addr(map, 0, bmi_tcp_errno_to_pvfs(-ENOMEM));
  3224.     return (bmi_tcp_errno_to_pvfs(-ENOMEM));
  3225.     }
  3226.  
  3227.     if (new_header.mode == TCP_MODE_EAGER)
  3228.     {
  3229.     /* create data buffer for eager messages */
  3230.     new_buffer = malloc(new_header.size);
  3231.     if (!new_buffer)
  3232.     {
  3233.         dealloc_tcp_method_op(active_method_op);
  3234.         tcp_forget_addr(map, 0, bmi_tcp_errno_to_pvfs(-ENOMEM));
  3235.         return (bmi_tcp_errno_to_pvfs(-ENOMEM));
  3236.     }
  3237.     }
  3238.     else
  3239.     {
  3240.     new_buffer = NULL;
  3241.     }
  3242.  
  3243.     /* set the fields */
  3244.     active_method_op->send_recv = BMI_RECV;
  3245.     active_method_op->addr = map;
  3246.     active_method_op->actual_size = new_header.size;
  3247.     active_method_op->expected_size = 0;
  3248.     active_method_op->amt_complete = 0;
  3249.     active_method_op->env_amt_complete = TCP_ENC_HDR_SIZE;
  3250.     active_method_op->msg_tag = new_header.tag;
  3251.     active_method_op->buffer = new_buffer;
  3252.     active_method_op->mode = new_header.mode;
  3253.     active_method_op->buffer_list = &(active_method_op->buffer);
  3254.     active_method_op->size_list = &(active_method_op->actual_size);
  3255.     active_method_op->list_count = 1;
  3256.     tcp_op_data = active_method_op->method_data;
  3257.     tcp_op_data->tcp_op_state = BMI_TCP_BUFFERING;
  3258.     tcp_op_data->env = new_header;
  3259.  
  3260.     op_list_add(op_list_array[IND_RECV_INFLIGHT], active_method_op);
  3261.  
  3262.     /* grab some data if we can */
  3263.     if (new_header.mode == TCP_MODE_EAGER)
  3264.     {
  3265.     return (work_on_recv_op(active_method_op, &tmp));
  3266.     }
  3267.  
  3268.     return (0);
  3269. }
  3270.  
  3271.  
  3272. /*
  3273.  * work_on_send_op()
  3274.  *
  3275.  * used to perform work on a send operation.  this is called by the poll
  3276.  * function.
  3277.  * 
  3278.  * sets blocked_flag if no more work can be done on socket without
  3279.  * blocking
  3280.  * returns 0 on success, -errno on failure.
  3281.  */
  3282. static int work_on_send_op(method_op_p my_method_op,
  3283.                int *blocked_flag, int* stall_flag)
  3284. {
  3285.     int ret = -1;
  3286.     struct tcp_addr *tcp_addr_data = my_method_op->addr->method_data;
  3287.     struct tcp_op *tcp_op_data = my_method_op->method_data;
  3288.  
  3289.     *blocked_flag = 1;
  3290.     *stall_flag = 0;
  3291.  
  3292.     /* make sure that the connection is done before we continue */
  3293.     if (tcp_addr_data->not_connected)
  3294.     {
  3295.     ret = tcp_sock_init(my_method_op->addr);
  3296.     if (ret < 0)
  3297.     {
  3298.             PVFS_perror_gossip("Error: socket failed to init", ret);
  3299.             /* tcp_sock_init() returns BMI error code */
  3300.         tcp_forget_addr(my_method_op->addr, 0, ret);
  3301.         return (0);
  3302.     }
  3303.     if (tcp_addr_data->not_connected)
  3304.     {
  3305.         /* try again later- still could not connect */
  3306.         tcp_op_data->tcp_op_state = BMI_TCP_INPROGRESS;
  3307.         return (0);
  3308.     }
  3309.     }
  3310.  
  3311.     ret = payload_progress(tcp_addr_data->socket,
  3312.     my_method_op->buffer_list,
  3313.     my_method_op->size_list,
  3314.     my_method_op->list_count,
  3315.     my_method_op->actual_size,
  3316.     &(my_method_op->list_index),
  3317.     &(my_method_op->cur_index_complete),
  3318.     BMI_SEND,
  3319.     tcp_op_data->env.enc_hdr,
  3320.     &my_method_op->env_amt_complete);
  3321.     if (ret < 0)
  3322.     {
  3323.         PVFS_perror_gossip("Error: payload_progress", ret);
  3324.         /* payload_progress() returns BMI error codes */
  3325.     tcp_forget_addr(my_method_op->addr, 0, ret);
  3326.     return (0);
  3327.     }
  3328.  
  3329.     if(ret == 0)
  3330.     *stall_flag = 1;
  3331.  
  3332.     gossip_ldebug(GOSSIP_BMI_DEBUG_TCP, "Sent: %d bytes of data.\n", ret);
  3333.     my_method_op->amt_complete += ret;
  3334.     assert(my_method_op->amt_complete <= my_method_op->actual_size);
  3335.  
  3336.     if (my_method_op->amt_complete == my_method_op->actual_size && my_method_op->env_amt_complete == TCP_ENC_HDR_SIZE)
  3337.     {
  3338.     /* we are done */
  3339.     my_method_op->error_code = 0;
  3340.     BMI_socket_collection_remove_write_bit(tcp_socket_collection_p,
  3341.                        my_method_op->addr);
  3342.     op_list_remove(my_method_op);
  3343.     ((struct tcp_op*)(my_method_op->method_data))->tcp_op_state = 
  3344.         BMI_TCP_COMPLETE;
  3345.     op_list_add(completion_array[my_method_op->context_id], my_method_op);
  3346.     *blocked_flag = 0;
  3347.     }
  3348.     else
  3349.     {
  3350.     /* there is still more work to do */
  3351.     tcp_op_data->tcp_op_state = BMI_TCP_INPROGRESS;
  3352.     }
  3353.  
  3354.     return (0);
  3355. }
  3356.  
  3357.  
  3358. /*
  3359.  * work_on_recv_op()
  3360.  *
  3361.  * used to perform work on a recv operation.  this is called by the poll
  3362.  * function.
  3363.  * NOTE: this function assumes the method header has already been read.
  3364.  *
  3365.  * returns 0 on success, -errno on failure.
  3366.  */
  3367. static int work_on_recv_op(method_op_p my_method_op, int* stall_flag)
  3368. {
  3369.  
  3370.     int ret = -1;
  3371.     struct tcp_addr *tcp_addr_data = my_method_op->addr->method_data;
  3372.     struct tcp_op *tcp_op_data = my_method_op->method_data;
  3373.  
  3374.     *stall_flag = 1;
  3375.  
  3376.     if (my_method_op->actual_size != 0)
  3377.     {
  3378.     /* now let's try to recv some actual data */
  3379.     ret = payload_progress(tcp_addr_data->socket,
  3380.         my_method_op->buffer_list,
  3381.         my_method_op->size_list,
  3382.         my_method_op->list_count,
  3383.         my_method_op->actual_size,
  3384.         &(my_method_op->list_index),
  3385.         &(my_method_op->cur_index_complete),
  3386.         BMI_RECV,
  3387.         NULL,
  3388.         0);
  3389.     if (ret < 0)
  3390.     {
  3391.             PVFS_perror_gossip("Error: payload_progress", ret);
  3392.             /* payload_progress() returns BMI error codes */
  3393.         tcp_forget_addr(my_method_op->addr, 0, ret);
  3394.         return (0);
  3395.     }
  3396.     }
  3397.     else
  3398.     {
  3399.     ret = 0;
  3400.     }
  3401.  
  3402.     if(ret > 0)
  3403.     *stall_flag = 0;
  3404.  
  3405.     my_method_op->amt_complete += ret;
  3406.     assert(my_method_op->amt_complete <= my_method_op->actual_size);
  3407.  
  3408.     if (my_method_op->amt_complete == my_method_op->actual_size)
  3409.     {
  3410.     /* we are done */
  3411.     op_list_remove(my_method_op);
  3412.     if (tcp_op_data->tcp_op_state == BMI_TCP_BUFFERING)
  3413.     {
  3414.         /* queue up to wait on matching post recv */
  3415.         op_list_add(op_list_array[IND_RECV_EAGER_DONE_BUFFERING],
  3416.             my_method_op);
  3417.     }
  3418.     else
  3419.     {
  3420.         my_method_op->error_code = 0;
  3421.         if (my_method_op->mode == TCP_MODE_UNEXP)
  3422.         {
  3423.         op_list_add(op_list_array[IND_COMPLETE_RECV_UNEXP],
  3424.                 my_method_op);
  3425.         }
  3426.         else
  3427.         {
  3428.         ((struct tcp_op*)(my_method_op->method_data))->tcp_op_state = 
  3429.             BMI_TCP_COMPLETE;
  3430.         op_list_add(completion_array[my_method_op->context_id], my_method_op);
  3431.         }
  3432.     }
  3433.     }
  3434.  
  3435.     return (0);
  3436. }
  3437.  
  3438.  
  3439. /* tcp_do_work_error()
  3440.  * 
  3441.  * handles a tcp address that has indicated an error during polling.
  3442.  *
  3443.  * returns 0 on success, -errno on failure
  3444.  */
  3445. static int tcp_do_work_error(bmi_method_addr_p map)
  3446. {
  3447.     struct tcp_addr *tcp_addr_data = NULL;
  3448.     int buf;
  3449.     int ret;
  3450.     int tmp_errno;
  3451.  
  3452.     tcp_addr_data = map->method_data;
  3453.  
  3454.     /* perform a read on the socket so that we can get a real errno */
  3455.     ret = read(tcp_addr_data->socket, &buf, sizeof(int));
  3456.     if (ret == 0)
  3457.         tmp_errno = EPIPE;  /* report other side closed socket with this */
  3458.     else
  3459.         tmp_errno = errno;
  3460.  
  3461.     gossip_debug(GOSSIP_BMI_DEBUG_TCP, "Error: bmi_tcp: %s\n",
  3462.       strerror(tmp_errno));
  3463.  
  3464.     if (tcp_addr_data->server_port)
  3465.     {
  3466.     /* Ignore this and hope it goes away... we don't want to lose
  3467.      * our local socket */
  3468.     dealloc_tcp_method_addr(map);
  3469.     gossip_lerr("Warning: error polling on server socket, continuing.\n");
  3470.     return (0);
  3471.     }
  3472.  
  3473.     if(tmp_errno == 0)
  3474.     tmp_errno = EPROTO;
  3475.  
  3476.     tcp_forget_addr(map, 0, bmi_tcp_errno_to_pvfs(-tmp_errno));
  3477.  
  3478.     return (0);
  3479. }
  3480.  
  3481. #if defined(USE_TRUSTED) && defined(__PVFS2_CLIENT__)
  3482. /*
  3483.  * tcp_enable_trusted()
  3484.  * Ideally, this function should look up the security configuration of
  3485.  * the server and determines
  3486.  * if it needs to bind to any specific port locally or not..
  3487.  * For now look at the FIXME below.
  3488.  */
  3489. static int tcp_enable_trusted(struct tcp_addr *tcp_addr_data)
  3490. {
  3491.     /*
  3492.      * FIXME:
  3493.      * For now, there is no way for us to check if a given
  3494.      * server is actually using port protection or not.
  3495.      * For now we unconditionally use a trusted port range
  3496.      * as long as USE_TRUSTED is #defined.
  3497.      *
  3498.      * Although most of the time we expect users
  3499.      * to be using a range of 0-1024, it is hard to keep probing
  3500.      * until one gets a port in the range specified.
  3501.      * Hence this is a temporary fix. we will see if this
  3502.      * requirement even needs to be met at all.
  3503.      */
  3504.     static unsigned short my_requested_port = 1023;
  3505.     unsigned short my_local_port = 0;
  3506.     struct sockaddr_in my_local_sockaddr;
  3507.     socklen_t len = sizeof(struct sockaddr_in);
  3508.     memset(&my_local_sockaddr, 0, sizeof(struct sockaddr_in));
  3509.  
  3510.     /* setup for a fast restart to avoid bind addr in use errors */
  3511.     if (BMI_sockio_set_sockopt(tcp_addr_data->socket, SO_REUSEADDR, 1) < 0)
  3512.     {
  3513.         gossip_lerr("Could not set SO_REUSEADDR on local socket (port %hd)\n", my_local_port);
  3514.     }
  3515.     if (BMI_sockio_bind_sock(tcp_addr_data->socket, my_requested_port) < 0)
  3516.     {
  3517.         gossip_lerr("Could not bind to local port %hd: %s\n", 
  3518.                 my_requested_port, strerror(errno));
  3519.     }
  3520.     else {
  3521.         my_requested_port--;
  3522.     }
  3523.     my_local_sockaddr.sin_family = AF_INET;
  3524.     if (getsockname(tcp_addr_data->socket, 
  3525.                 (struct sockaddr *)&my_local_sockaddr, &len) == 0)
  3526.     {
  3527.         my_local_port = ntohs(my_local_sockaddr.sin_port);
  3528.     }
  3529.     gossip_debug(GOSSIP_BMI_DEBUG_TCP, "Bound locally to port: %hd\n", my_local_port);
  3530.     return 0;
  3531. }
  3532.  
  3533. #endif
  3534.  
  3535. #if defined(USE_TRUSTED) && defined(__PVFS2_SERVER__)
  3536.  
  3537. static char *bad_errors[] = {
  3538.     "invalid network address",
  3539.     "invalid port",
  3540.     "invalid network address and port"
  3541. };
  3542.  
  3543. /*
  3544.  * tcp_allow_trusted()
  3545.  * if trusted ports was enabled make sure
  3546.  * that we can accept a particular connection from a given
  3547.  * client
  3548.  */
  3549. static int tcp_allow_trusted(struct sockaddr_in *peer_sockaddr)
  3550. {
  3551.     char *peer_hostname = inet_ntoa(peer_sockaddr->sin_addr);
  3552.     unsigned short peer_port = ntohs(peer_sockaddr->sin_port);
  3553.     int   i, what_failed   = -1;
  3554.  
  3555.     /* Don't refuse connects if there were any
  3556.      * parse errors or if it is not enabled in the config file
  3557.      */
  3558.     if (gtcp_allowed_connection->port_enforce == 0
  3559.             && gtcp_allowed_connection->network_enforce == 0)
  3560.     {
  3561.         return 0;
  3562.     }
  3563.     /* make sure that the client is within the allowed network */
  3564.     if (gtcp_allowed_connection->network_enforce == 1)
  3565.     {
  3566.         /* Always allow localhost to connect */
  3567.         if (ntohl(peer_sockaddr->sin_addr.s_addr) == INADDR_LOOPBACK)
  3568.         {
  3569.             goto port_check;
  3570.         }
  3571.         for (i = 0; i < gtcp_allowed_connection->network_count; i++)
  3572.         {
  3573.             /* check with all the masks */
  3574.             if ((peer_sockaddr->sin_addr.s_addr & gtcp_allowed_connection->netmask[i].s_addr) 
  3575.                     != (gtcp_allowed_connection->network[i].s_addr & gtcp_allowed_connection->netmask[i].s_addr ))
  3576.             {
  3577.                 continue;
  3578.             }
  3579.             else {
  3580.                 goto port_check;
  3581.             }
  3582.         }
  3583.         /* not from a trusted network */
  3584.         what_failed = 0;
  3585.     }
  3586. port_check:
  3587.     /* make sure that the client port numbers are within specified limits */
  3588.     if (gtcp_allowed_connection->port_enforce == 1)
  3589.     {
  3590.         if (peer_port < gtcp_allowed_connection->ports[0]
  3591.                 || peer_port > gtcp_allowed_connection->ports[1])
  3592.         {
  3593.             what_failed = (what_failed < 0) ? 1 : 2;
  3594.         }
  3595.     }
  3596.     /* okay, we are good to go */
  3597.     if (what_failed < 0)
  3598.     {
  3599.         return 0;
  3600.     }
  3601.     /* no good */
  3602.     gossip_err("Rejecting client %s on port %d: %s\n",
  3603.            peer_hostname, peer_port, bad_errors[what_failed]);
  3604.     return -1;
  3605. }
  3606.  
  3607. #endif
  3608.  
  3609. /* 
  3610.  * tcp_accept_init()
  3611.  * 
  3612.  * used to establish a connection from the server side.  Attempts an
  3613.  * accept call and provides the socket if it succeeds.
  3614.  *
  3615.  * returns 0 on success, -errno on failure.
  3616.  */
  3617. static int tcp_accept_init(int *socket, char** peer)
  3618. {
  3619.  
  3620.     int ret = -1;
  3621.     int tmp_errno = 0;
  3622.     struct tcp_addr *tcp_addr_data = tcp_method_params.listen_addr->method_data;
  3623.     int oldfl = 0;
  3624.     struct sockaddr_in peer_sockaddr;
  3625.     int peer_sockaddr_size = sizeof(struct sockaddr_in);
  3626.     char* tmp_peer;
  3627.  
  3628.     /* do we have a socket on this end yet? */
  3629.     if (tcp_addr_data->socket < 0)
  3630.     {
  3631.     ret = tcp_server_init();
  3632.     if (ret < 0)
  3633.     {
  3634.         return (ret);
  3635.     }
  3636.     }
  3637.  
  3638.     *socket = accept(tcp_addr_data->socket, (struct sockaddr*)&peer_sockaddr,
  3639.               (socklen_t *)&peer_sockaddr_size);
  3640.  
  3641.     if (*socket < 0)
  3642.     {
  3643.     if ((errno == EAGAIN) ||
  3644.         (errno == EWOULDBLOCK) ||
  3645.         (errno == ENETDOWN) ||
  3646.         (errno == EPROTO) ||
  3647.         (errno == ENOPROTOOPT) ||
  3648.         (errno == EHOSTDOWN) ||
  3649.         (errno == ENONET) ||
  3650.             (errno == EHOSTUNREACH) ||
  3651.         (errno == EOPNOTSUPP) ||
  3652.             (errno == ENETUNREACH) ||
  3653.             (errno == ENFILE) ||
  3654.             (errno == EMFILE))
  3655.     {
  3656.         /* try again later */
  3657.             if ((errno == ENFILE) || (errno == EMFILE))
  3658.             {
  3659.             gossip_err("Error: accept: %s (continuing)\n",strerror(errno));
  3660.                 bmi_method_addr_drop_callback(BMI_tcp_method_name);
  3661.             }
  3662.         return (0);
  3663.     }
  3664.     else
  3665.     {
  3666.         gossip_err("Error: accept: %s\n", strerror(errno));
  3667.         return (bmi_tcp_errno_to_pvfs(-errno));
  3668.     }
  3669.     }
  3670.  
  3671. #if defined(USE_TRUSTED) && defined(__PVFS2_SERVER__)
  3672.  
  3673.     /* make sure that we are allowed to accept this connection */
  3674.     if (tcp_allow_trusted(&peer_sockaddr) < 0)
  3675.     {
  3676.         /* Force closure of the connection */
  3677.         close(*socket);
  3678.         return (bmi_tcp_errno_to_pvfs(-EACCES));
  3679.     }
  3680.  
  3681. #endif
  3682.  
  3683.     /* we accepted a new connection.  turn off Nagle's algorithm. */
  3684.     if (BMI_sockio_set_tcpopt(*socket, TCP_NODELAY, 1) < 0)
  3685.     {
  3686.     tmp_errno = errno;
  3687.     gossip_lerr("Error: failed to set TCP_NODELAY option.\n");
  3688.     close(*socket);
  3689.     return (bmi_tcp_errno_to_pvfs(-tmp_errno));
  3690.     }
  3691.  
  3692.     /* set it to non-blocking operation */
  3693.     oldfl = fcntl(*socket, F_GETFL, 0);
  3694.     if (!(oldfl & O_NONBLOCK))
  3695.     {
  3696.     fcntl(*socket, F_SETFL, oldfl | O_NONBLOCK);
  3697.     }
  3698.  
  3699.     /* allocate ip address string */
  3700.     tmp_peer = inet_ntoa(peer_sockaddr.sin_addr);
  3701.     *peer = (char*)malloc(strlen(tmp_peer)+1);
  3702.     if(!(*peer))
  3703.     {
  3704.         close(*socket);
  3705.         return(bmi_tcp_errno_to_pvfs(-BMI_ENOMEM));
  3706.     }
  3707.     strcpy(*peer, tmp_peer);
  3708.  
  3709.     return (0);
  3710. }
  3711.  
  3712.  
  3713. /* alloc_tcp_method_op()
  3714.  *
  3715.  * creates a new method op with defaults filled in for tcp.
  3716.  *
  3717.  * returns pointer to structure on success, NULL on failure
  3718.  */
  3719. static method_op_p alloc_tcp_method_op(void)
  3720. {
  3721.     method_op_p my_method_op = NULL;
  3722.  
  3723.     my_method_op = bmi_alloc_method_op(sizeof(struct tcp_op));
  3724.  
  3725.     /* we trust alloc_method_op to zero it out */
  3726.  
  3727.     return (my_method_op);
  3728. }
  3729.  
  3730.  
  3731. /* dealloc_tcp_method_op()
  3732.  *
  3733.  * destroys an existing tcp method op, freeing segment lists if
  3734.  * needed
  3735.  *
  3736.  * no return value
  3737.  */
  3738. static void dealloc_tcp_method_op(method_op_p old_op)
  3739. {
  3740.     bmi_dealloc_method_op(old_op);
  3741.     return;
  3742. }
  3743.  
  3744. /* tcp_post_send_generic()
  3745.  * 
  3746.  * Submits send operations (low level).
  3747.  *
  3748.  * returns 0 on success that requires later poll, returns 1 on instant
  3749.  * completion, -errno on failure
  3750.  */
  3751. static int tcp_post_send_generic(bmi_op_id_t * id,
  3752.                                  bmi_method_addr_p dest,
  3753.                                  const void *const *buffer_list,
  3754.                                  const bmi_size_t *size_list,
  3755.                                  int list_count,
  3756.                                  enum bmi_buffer_type buffer_type,
  3757.                                  struct tcp_msg_header my_header,
  3758.                                  void *user_ptr,
  3759.                                  bmi_context_id context_id,
  3760.                                  PVFS_hint hints)
  3761. {
  3762.     struct tcp_addr *tcp_addr_data = dest->method_data;
  3763.     method_op_p query_op = NULL;
  3764.     int ret = -1;
  3765.     bmi_size_t total_size = 0;
  3766.     bmi_size_t amt_complete = 0;
  3767.     bmi_size_t env_amt_complete = 0;
  3768.     struct op_list_search_key key;
  3769.     int list_index = 0;
  3770.     bmi_size_t cur_index_complete = 0;
  3771.     PINT_event_id eid = 0;
  3772.  
  3773.     if(PINT_EVENT_ENABLED)
  3774.     {
  3775.         int i = 0;
  3776.         for(; i < list_count; ++i)
  3777.         {
  3778.             total_size += size_list[i];
  3779.         }
  3780.     }
  3781.  
  3782.     PINT_EVENT_START(
  3783.         bmi_tcp_send_event_id, bmi_tcp_pid, NULL, &eid,
  3784.         PINT_HINT_GET_CLIENT_ID(hints),
  3785.         PINT_HINT_GET_REQUEST_ID(hints),
  3786.         PINT_HINT_GET_RANK(hints),
  3787.         PINT_HINT_GET_HANDLE(hints),
  3788.         PINT_HINT_GET_OP_ID(hints),
  3789.         total_size);
  3790.  
  3791.     /* Three things can happen here:
  3792.      * a) another op is already in queue for the address, so we just
  3793.      * queue up
  3794.      * b) we can send the whole message and return
  3795.      * c) we send part of the message and queue the rest
  3796.      */
  3797.  
  3798.     /* NOTE: on the post_send side of an operation, it doesn't really
  3799.      * matter whether the op is going to be eager or rendezvous.  It is
  3800.      * handled the same way (except for how the header is filled in).
  3801.      * The difference is in the recv processing for TCP.
  3802.      */
  3803.  
  3804.     /* NOTE: we also don't care what the buffer_type says, TCP could care
  3805.      * less what buffers it is using.
  3806.      */
  3807.  
  3808.     /* encode the message header */
  3809.     BMI_TCP_ENC_HDR(my_header);
  3810.  
  3811.     /* the first thing we must do is find out if another send is queued
  3812.      * up for this address so that we don't mess up our ordering.    */
  3813.     memset(&key, 0, sizeof(struct op_list_search_key));
  3814.     key.method_addr = dest;
  3815.     key.method_addr_yes = 1;
  3816.     query_op = op_list_search(op_list_array[IND_SEND], &key);
  3817.     if (query_op)
  3818.     {
  3819.         /* queue up operation */
  3820.         ret = enqueue_operation(op_list_array[IND_SEND], BMI_SEND,
  3821.                                 dest, (void **) buffer_list,
  3822.                                 size_list, list_count, 0, 0,
  3823.                                 id, BMI_TCP_INPROGRESS, my_header, user_ptr,
  3824.                                 my_header.size, 0,
  3825.                                 context_id,
  3826.                                 eid);
  3827.  
  3828.         /* TODO: is this causing deadlocks?  See similar call in recv
  3829.          * path for another example.  This particular one seems to be an
  3830.          * issue under a heavy bonnie++ load that Neill has been
  3831.          * debugging.  Comment out for now to see if the problem goes
  3832.          * away.
  3833.          */
  3834. #if 0
  3835.     if (ret >= 0)
  3836.     {
  3837.         /* go ahead and try to do some work while we are in this
  3838.          * function since we appear to be backlogged.  Make sure that
  3839.          * we do not wait in the poll, however.
  3840.          */
  3841.         ret = tcp_do_work(0);
  3842.     }
  3843. #endif
  3844.     if (ret < 0)
  3845.     {
  3846.         gossip_err("Error: enqueue_operation() or tcp_do_work() returned: %d\n", ret);
  3847.     }
  3848.     return (ret);
  3849.     }
  3850.  
  3851.     /* make sure the connection is established */
  3852.     ret = tcp_sock_init(dest);
  3853.     if (ret < 0)
  3854.     {
  3855.     gossip_debug(GOSSIP_BMI_DEBUG_TCP, "tcp_sock_init() failure.\n");
  3856.         /* tcp_sock_init() returns BMI error code */
  3857.     tcp_forget_addr(dest, 0, ret);
  3858.         PINT_EVENT_END(bmi_tcp_send_event_id, bmi_tcp_pid, NULL, 0, ret);
  3859.     return (ret);
  3860.     }
  3861.  
  3862.     tcp_addr_data = dest->method_data;
  3863.  
  3864. #if 0
  3865.     /* TODO: this is a hack for testing! */
  3866.     /* disables immediate send completion... */
  3867.     ret = enqueue_operation(op_list_array[IND_SEND], BMI_SEND,
  3868.                 dest, buffer_list, size_list, list_count, 0, 0,
  3869.                 id, BMI_TCP_INPROGRESS, my_header, user_ptr,
  3870.                 my_header.size, 0,
  3871.                 context_id);
  3872.     return(ret);
  3873. #endif
  3874.  
  3875.     if (tcp_addr_data->not_connected)
  3876.     {
  3877.     /* if the connection is not completed, queue up for later work */
  3878.     ret = enqueue_operation(op_list_array[IND_SEND], BMI_SEND,
  3879.                 dest, (void **) buffer_list, size_list,
  3880.                 list_count, 0, 0,
  3881.                 id, BMI_TCP_INPROGRESS, my_header, user_ptr,
  3882.                 my_header.size, 0,
  3883.                 context_id,
  3884.                                 eid);
  3885.     if(ret < 0)
  3886.     {
  3887.         gossip_err("Error: enqueue_operation() returned: %d\n", ret);
  3888.     }
  3889.     return (ret);
  3890.     }
  3891.  
  3892.     /* try to send some data */
  3893.     env_amt_complete = 0;
  3894.     ret = payload_progress(tcp_addr_data->socket,
  3895.     (void **) buffer_list,
  3896.     size_list, list_count, my_header.size, &list_index,
  3897.     &cur_index_complete, BMI_SEND, my_header.enc_hdr, &env_amt_complete);
  3898.     if (ret < 0)
  3899.     {
  3900.         PVFS_perror_gossip("Error: payload_progress", ret);
  3901.         /* payload_progress() returns BMI error codes */
  3902.     tcp_forget_addr(dest, 0, ret);
  3903.         PINT_EVENT_END(bmi_tcp_send_event_id, bmi_tcp_pid, NULL, eid, 0, ret);
  3904.     return (ret);
  3905.     }
  3906.  
  3907.     gossip_ldebug(GOSSIP_BMI_DEBUG_TCP, "Sent: %d bytes of data.\n", ret);
  3908.     amt_complete = ret;
  3909.     assert(amt_complete <= my_header.size);
  3910.     if (amt_complete == my_header.size && env_amt_complete == TCP_ENC_HDR_SIZE)
  3911.     {
  3912.         /* we are already done */
  3913.         PINT_EVENT_END(bmi_tcp_send_event_id, bmi_tcp_pid,
  3914.                        NULL, eid, 0, amt_complete);
  3915.         return (1);
  3916.     }
  3917.  
  3918.     /* queue up the remainder */
  3919.     ret = enqueue_operation(op_list_array[IND_SEND], BMI_SEND,
  3920.                             dest, (void **) buffer_list,
  3921.                             size_list, list_count,
  3922.                             amt_complete, env_amt_complete, id,
  3923.                             BMI_TCP_INPROGRESS, my_header, user_ptr,
  3924.                             my_header.size, 0, context_id, eid);
  3925.  
  3926.     if(ret < 0)
  3927.     {
  3928.         gossip_err("Error: enqueue_operation() returned: %d\n", ret);
  3929.     }
  3930.     return (ret);
  3931. }
  3932.  
  3933.  
  3934. /* payload_progress()
  3935.  *
  3936.  * makes progress on sending/recving data payload portion of a message
  3937.  *
  3938.  * returns amount completed on success, -errno on failure
  3939.  */
  3940. static int payload_progress(int s, void *const *buffer_list, const bmi_size_t* 
  3941.     size_list, int list_count, bmi_size_t total_size, int* list_index, 
  3942.     bmi_size_t* current_index_complete, enum bmi_op_type send_recv, 
  3943.     char* enc_hdr, bmi_size_t* env_amt_complete)
  3944. {
  3945.     int i;
  3946.     int count = 0;
  3947.     int ret;
  3948.     int completed;
  3949.     /* used for finding the stopping point on short receives */
  3950.     int final_index = list_count-1;
  3951.     bmi_size_t final_size = size_list[list_count-1];
  3952.     bmi_size_t sum = 0;
  3953.     int vector_index = 0;
  3954.     int header_flag = 0;
  3955.     int tmp_env_done = 0;
  3956.  
  3957.     if(send_recv == BMI_RECV)
  3958.     {
  3959.     /* find out if we should stop short in list processing */
  3960.     for(i=0; i<list_count; i++)
  3961.     {
  3962.         sum += size_list[i];
  3963.         if(sum >= total_size)
  3964.         {
  3965.         final_index = i;
  3966.         final_size = size_list[i] - (sum-total_size);
  3967.         break;
  3968.         }
  3969.     }
  3970.     }
  3971.  
  3972.     assert(list_count > *list_index);
  3973.  
  3974.     /* make sure we don't overrun our preallocated iovec array */
  3975.     if((list_count - (*list_index)) > BMI_TCP_IOV_COUNT)
  3976.     {
  3977.     list_count = (*list_index) + BMI_TCP_IOV_COUNT;
  3978.     }
  3979.  
  3980.     /* do we need to send any of the header? */
  3981.     if(send_recv == BMI_SEND && *env_amt_complete < TCP_ENC_HDR_SIZE)
  3982.     {
  3983.     stat_io_vector[vector_index].iov_base = &enc_hdr[*env_amt_complete];
  3984.     stat_io_vector[vector_index].iov_len = TCP_ENC_HDR_SIZE - *env_amt_complete;
  3985.     count++;
  3986.     vector_index++;
  3987.     header_flag = 1;
  3988.     }
  3989.  
  3990.     /* setup vector */
  3991.     stat_io_vector[vector_index].iov_base = 
  3992.     (char*)buffer_list[*list_index] + *current_index_complete;
  3993.     count++;
  3994.     if(final_index == 0)
  3995.     {
  3996.     stat_io_vector[vector_index].iov_len = final_size - *current_index_complete;
  3997.     }
  3998.     else
  3999.     {
  4000.     stat_io_vector[vector_index].iov_len = 
  4001.         size_list[*list_index] - *current_index_complete;
  4002.     for(i = (*list_index + 1); i < list_count; i++)
  4003.     {
  4004.         vector_index++;
  4005.         count++;
  4006.         stat_io_vector[vector_index].iov_base = buffer_list[i];
  4007.         if(i == final_index)
  4008.         {
  4009.         stat_io_vector[vector_index].iov_len = final_size;
  4010.         break;
  4011.         }
  4012.         else
  4013.         {
  4014.         stat_io_vector[vector_index].iov_len = size_list[i];
  4015.         }
  4016.     }
  4017.     }
  4018.  
  4019.     assert(count > 0);
  4020.  
  4021.     if(send_recv == BMI_RECV)
  4022.     {
  4023.     ret = BMI_sockio_nbvector(s, stat_io_vector, count, 1);
  4024.     }
  4025.     else
  4026.     {
  4027.     ret = BMI_sockio_nbvector(s, stat_io_vector, count, 0);
  4028.     }
  4029.  
  4030.     /* if error or nothing done, return now */
  4031.     if(ret == 0)
  4032.     return(0);
  4033.     if(ret <= 0)
  4034.     return(bmi_tcp_errno_to_pvfs(-errno));
  4035.  
  4036.     completed = ret;
  4037.     if(header_flag && (completed >= 0))
  4038.     {
  4039.     /* take care of completed header status */
  4040.     tmp_env_done = TCP_ENC_HDR_SIZE - *env_amt_complete;
  4041.     if(tmp_env_done > completed)
  4042.         tmp_env_done = completed;
  4043.     completed -= tmp_env_done;
  4044.     ret -= tmp_env_done;
  4045.     (*env_amt_complete) += tmp_env_done;
  4046.     }
  4047.  
  4048.     i=header_flag;
  4049.     while(completed > 0)
  4050.     {
  4051.     /* take care of completed data payload */
  4052.     if(completed >= stat_io_vector[i].iov_len)
  4053.     {
  4054.         completed -= stat_io_vector[i].iov_len;
  4055.         *current_index_complete = 0;
  4056.         (*list_index)++;
  4057.         i++;
  4058.     }
  4059.     else
  4060.     {
  4061.         *current_index_complete += completed;
  4062.         completed = 0;
  4063.     }
  4064.     }
  4065.  
  4066.     return(ret);
  4067. }
  4068.  
  4069. static void bmi_set_sock_buffers(int socket){
  4070.     //Set socket buffer sizes:
  4071.     gossip_debug(GOSSIP_BMI_DEBUG_TCP, "Default socket buffers send:%d receive:%d\n",
  4072.         GET_SENDBUFSIZE(socket), GET_RECVBUFSIZE(socket));
  4073.     gossip_debug(GOSSIP_BMI_DEBUG_TCP, "Setting socket buffer size for send:%d receive:%d \n",
  4074.         tcp_buffer_size_send, tcp_buffer_size_receive);
  4075.     if( tcp_buffer_size_receive != 0)
  4076.          SET_RECVBUFSIZE(socket,tcp_buffer_size_receive);
  4077.     if( tcp_buffer_size_send != 0)
  4078.          SET_SENDBUFSIZE(socket,tcp_buffer_size_send);
  4079.     gossip_debug(GOSSIP_BMI_DEBUG_TCP, "Reread socket buffers send:%d receive:%d\n",
  4080.         GET_SENDBUFSIZE(socket), GET_RECVBUFSIZE(socket));
  4081. }
  4082.  
  4083. /*
  4084.  * Local variables:
  4085.  *  c-indent-level: 4
  4086.  *  c-basic-offset: 4
  4087.  * End:
  4088.  *
  4089.  * vim: ts=8 sts=4 sw=4 expandtab
  4090.  */
  4091.