home *** CD-ROM | disk | FTP | other *** search
/ InfoMagic Internet Tools 1993 July / Internet Tools.iso / RockRidge / mail / pp / pp-6.0 / Src / qmgr / chans.c < prev    next >
Encoding:
C/C++ Source or Header  |  1991-12-18  |  52.8 KB  |  2,366 lines

  1. /* chans.c: channel specific stuff for the qmgr */
  2.  
  3. # ifndef lint
  4. static char Rcsid[] = "@(#)$Header: /xtel/pp/pp-beta/Src/qmgr/RCS/chans.c,v 6.0 1991/12/18 20:27:38 jpo Rel $";
  5. # endif
  6.  
  7. /*
  8.  * $Header: /xtel/pp/pp-beta/Src/qmgr/RCS/chans.c,v 6.0 1991/12/18 20:27:38 jpo Rel $
  9.  *
  10.  * $Log: chans.c,v $
  11.  * Revision 6.0  1991/12/18  20:27:38  jpo
  12.  * Release 6.0
  13.  *
  14.  */
  15.  
  16.  
  17.  
  18. #include <isode/logger.h>
  19. #include "types.h"
  20. #include "Qmgr-ops.h"
  21. #include "util.h"
  22.  
  23. extern struct Mtalist *findmtalist ();
  24. extern MsgStruct *newmsgstruct ();
  25. extern MsgStruct *find_msg ();
  26. extern Chanlist *findchanlist ();
  27. extern Mlist    *findmtamsg ();
  28.  
  29. extern int warn_number;
  30.  
  31. static int    cb_once_only = 0;
  32. static Connblk cbqueue;
  33. static Connblk *CHead = &cbqueue;
  34. static char    *cb2name ();
  35. static char    *cb_print ();
  36. static int    cb_count = 0;
  37. static int    nspecials = 0;
  38. static int    demand_chan = 0;
  39.  
  40. Chanlist _runq, *runq;
  41. void    addtorunq (), delfromrunq ();
  42.  
  43. struct eventqueue {
  44.     struct eventqueue *ev_forw;
  45.     struct eventqueue *ev_back;
  46.  
  47.     time_t    ev_time;
  48.     Connblk *ev_conn;
  49.     enum cb_type ev_type;
  50.     Chanlist *ev_clp;
  51. };
  52.  
  53. struct stats stats;    
  54.  
  55. static struct eventqueue evqueue;
  56. static struct eventqueue *EHead = &evqueue;
  57. static int ev_count = 0;
  58. static struct eventqueue *newevblk ();
  59. static void freevblk (), init_events ();
  60. static void do_event ();
  61. static void channel_event ();
  62. static void special_event ();
  63. static void restart ();
  64. #define NULLEV    ((struct eventqueue *)0)
  65.  
  66. static int runnable ();
  67. static int invoke ();
  68. static int timeout_proc ();
  69. static int channel_ttl ();
  70. static int expiremsg (), warnmsg ();
  71. /* static void setfds (); */
  72. static void getresponse ();
  73. static void start_timer ();
  74. extern void kill_msg ();
  75.  
  76. extern Mlist *nextmsg ();
  77.  
  78. extern fd_set perf_rfds, perf_wfds;
  79. extern int perf_nfds;
  80. extern int delaytime;
  81. extern int nodisablemsgclean;
  82.  
  83. void    review_channels ();
  84. void    investigate_chan ();
  85. void    investigate_mta ();
  86. void    delay_channel ();
  87. void    cleanup_conn ();
  88. void    cache_clear (), clear_msgcache ();
  89. void    cache_inc (), msgcache_inc ();
  90. void    freems ();
  91. void    insertinchan ();
  92. void    insertindelchan ();
  93. void    insertindrchan ();
  94. void    sort_chans();
  95. void    zapmsg ();
  96.  
  97. time_t     msgmincache ();
  98. int nchansrunning = 0;
  99. static int timer_running = 0;
  100. static int high_wtmk, low_wtmk;
  101.  
  102. static int    do_processmessage (), processmessage_result (),
  103.     processmessage_error (), do_quit ();
  104. static int    do_initchannel (), channelinit_result ();
  105. static int    do_readqueue (), readqueue_result ();
  106. static int    general_error ();
  107. static time_t    nextchantime;
  108. static int    rqueue;
  109. static int     noperations;
  110.  
  111. struct client_dispatch {
  112.     char        *ds_name;
  113.     int        ds_operation;
  114.  
  115.     IFP        ds_argument;
  116.     modtyp        *ds_fr_mod;
  117.     int        ds_fr_index;
  118.     IFP        ds_result;
  119.     IFP        ds_error;
  120.  
  121.     char        *ds_help;
  122. };
  123.  
  124. static struct client_dispatch dis[] = {
  125. #define    DIS_PROC    0
  126. {
  127.     "processmessage", operation_Qmgr_processmessage,
  128.     do_processmessage, &_ZQmgr_mod, _ZProcMsgQmgr,
  129.     processmessage_result, processmessage_error,
  130.     "Process some messages"
  131. },
  132. #define DIS_INIT    1
  133. {
  134.     "channelInitialise", operation_Qmgr_channelInitialise,
  135.     do_initchannel, &_ZQmgr_mod, _ZChannelQmgr,
  136.     channelinit_result, general_error,
  137.     "Initialise a channel"
  138. },
  139. #define DIS_QUIT    2
  140. {
  141.     "quit", 0, do_quit, NULL, 0, NULLIFP, NULLIFP,
  142.     "terminate the association"
  143. },
  144. #define DIS_READQ    3
  145. {
  146.     "readqueue", operation_Qmgr_readqueue,
  147.     do_readqueue, &_ZQmgr_mod, _ZPseudo_readqueueQmgr,
  148.     readqueue_result, general_error,
  149.     "Load up the queue"
  150. },
  151.     NULL
  152. };
  153.  
  154. static void create_chan (chan)
  155. CHAN    *chan;
  156. {
  157.     Chanlist *clp;
  158.     extern int chan_state;
  159.  
  160.     PP_TRACE (("create_chan (%s)", chan -> ch_name));
  161.  
  162.     if (nchanlist == 0)
  163.         chan_list = (Chanlist **) malloc (sizeof (Chanlist *));
  164.     else
  165.         chan_list = (Chanlist **) realloc ((char *)chan_list,
  166.                            (unsigned)(nchanlist + 1)
  167.                            * sizeof (Chanlist *));
  168.     chan_list[nchanlist] = clp = (Chanlist *) calloc (1, sizeof (Chanlist));
  169.     clp -> channame = chan -> ch_name;
  170.     clp -> chan = chan;
  171.     clp -> mtas = (struct Mtalist *) calloc (1, sizeof (Mtalist));
  172.     clp -> mtas -> mta_back = clp -> mtas -> mta_forw = clp -> mtas;
  173.     clp -> chan_enabled = chan_state;
  174.     clp -> mta_hash = NULL;
  175.     switch (chan -> ch_chan_type) {
  176.         case CH_DELETE:
  177.         delete_chan = clp;
  178.         break;
  179.  
  180.         case CH_QMGR_LOAD:
  181.         clp -> chan_special = 1;
  182.         loader_chan = clp;
  183.         break;
  184.  
  185.         case CH_DEBRIS:
  186.         clp -> chan_special = 1;
  187.         trash_chan = clp;
  188.         break;
  189.  
  190.         case CH_TIMEOUT:
  191.         timeout_chan = clp;
  192.         break;
  193.         case CH_WARNING:
  194.         warn_chan = clp;
  195.         break;
  196.  
  197.         case CH_OUT:
  198.         case CH_BOTH:
  199.         if (chan -> ch_sort[0] == 0)
  200.             chan -> ch_sort[0] = CH_SORT_MTA;
  201.         break;
  202.     }
  203.     if (chan -> ch_sort[0] == 0)
  204.         chan -> ch_sort[0] = CH_SORT_NONE;
  205.     if (chan -> ch_sort[0] != CH_SORT_NONE)
  206.         clp -> mta_hash = (Mtalist **)
  207.             calloc (MTA_HASHSIZE, sizeof (*clp->mta_hash));
  208.     if (clp -> chan_special)
  209.         addtorunq (clp);
  210.     nchanlist ++;
  211. }
  212.  
  213. void init_chans ()
  214. {
  215.     int    i;
  216.  
  217.     PP_TRACE (("init_chans ()"));
  218.  
  219.     runq = &_runq;
  220.     runq -> cl_forw = runq -> cl_back = runq;
  221.  
  222.     for (i = 0; ch_all[i]; i++)
  223.         create_chan (ch_all[i]);
  224.     init_events ();
  225.     stats.boottime = current_time;
  226. }
  227.  
  228. /* ARGSUSED */
  229. int chan_lose (fd, acf)
  230. int    fd;
  231. struct AcSAPfinish *acf;
  232. {
  233.     Connblk *cb;
  234.  
  235.     PP_TRACE (("chan_lose (%d)", fd));
  236.     if ((cb = findcblk (fd)) == NULLCB)
  237.         return ACS_ACCEPT;
  238.     /* restore the state */
  239.     delay_channel (cb);
  240.     if (fd != NOTOK) {
  241.         FD_CLR (fd, &perf_rfds);
  242.         FD_CLR (fd, &perf_wfds);
  243.     }
  244.     cb -> cb_fd = NOTOK;
  245.     cleanup_conn (cb);
  246.  
  247.     return ACS_ACCEPT;
  248. }
  249.  
  250. void start_specials ()
  251. {
  252.     Connblk *cb;
  253.  
  254.     PP_TRACE (("start_specials ()"));
  255.  
  256.     if (loader_chan == NULLCHANLIST)
  257.         PP_LOG (LLOG_EXCEPTIONS,
  258.             ("No loading channel specified"));
  259.     else {
  260.         loader_chan -> chan_update = 1;
  261.     }
  262.  
  263.     if (trash_chan == NULLCHANLIST)
  264.         PP_LOG (LLOG_EXCEPTIONS,
  265.             ("No debris channel specified"));
  266.     else {
  267.         cache_inc (&trash_chan -> cache,
  268.                random() % debris_time);
  269.         trash_chan -> chan_update = 1;
  270.     }
  271.     if (timeout_chan == NULLCHANLIST)
  272.         PP_LOG (LLOG_EXCEPTIONS,
  273.             ("No timeout channel specified"));
  274.     else {
  275.         if (warn_chan == NULLCHANLIST)
  276.             PP_LOG (LLOG_EXCEPTIONS,
  277.                 ("No warning channel specified"));
  278.         cb = newcblk (cb_timer);
  279.         cb -> cb_proc = timeout_proc;
  280.         cb -> cb_reload = timeout_time;
  281.         (void) newevblk (NULLCHANLIST, cb, cb_timer,
  282.                  random () % timeout_time);
  283.     }
  284. }    
  285.  
  286. #define LOAD_UPDATE_INTERVAL    15
  287. #define L1_DECAY    (0.9)
  288. #define L2_DECAY    (0.92)
  289.  
  290. static void do_load_avg ()
  291. {
  292.     static time_t lastload;
  293.     static int last_msg_out, last_msg_in;
  294.     int mo, mi;
  295.     static int count = 0;
  296.  
  297.     if (lastload == 0)
  298.         lastload = current_time;
  299.     if (current_time - lastload < LOAD_UPDATE_INTERVAL) {
  300.         stats.runnable_chans = L1_DECAY * stats.runnable_chans +
  301.             (1.0 - L1_DECAY) * rqueue;
  302.         return;
  303.     }
  304.     mo = stats.n_messages_out - last_msg_out;
  305.     mi = stats.n_messages_in - last_msg_in;
  306.     if (mi < 0 || mo < 0)
  307.         PP_LOG (LLOG_EXCEPTIONS, ("Negative stats %d & %d",
  308.                       mi, mo));
  309.  
  310.     while (lastload < current_time) {
  311.         stats.runnable_chans = L1_DECAY * stats.runnable_chans +
  312.             (1.0 - L1_DECAY) * rqueue;
  313.         stats.ops_sec = L2_DECAY * stats.ops_sec +
  314.             (1.0 - L2_DECAY) *
  315.                 ((double)noperations /
  316.                      (double)LOAD_UPDATE_INTERVAL);
  317.         lastload += LOAD_UPDATE_INTERVAL;
  318.         stats.msg_sec_in = L2_DECAY * stats.msg_sec_in +
  319.             (1.0 - L2_DECAY) * ((double)mi/
  320.                         (double)LOAD_UPDATE_INTERVAL);
  321.         stats.msg_sec_out = L2_DECAY * stats.msg_sec_out +
  322.             (1.0 - L2_DECAY) * ((double)mo/
  323.                         (double)LOAD_UPDATE_INTERVAL);
  324.     }
  325.     last_msg_in = stats.n_messages_in;
  326.     last_msg_out = stats.n_messages_out;
  327.  
  328.     noperations = 0;
  329.     PP_TRACE (("Load averages rq %g, ops %g mi %g mo %g",
  330.            stats.runnable_chans, stats.ops_sec,
  331.            stats.msg_sec_in, stats.msg_sec_out));
  332.     lastload = current_time;
  333.     if (count++ > 100) {
  334.         count = 0;
  335.         PP_NOTICE (("STATS: m-in=%d m-out=%d, a-in=%d a-out=%d os=%g rc=%g mis=%g mos=%g %.22s",
  336.                 stats.n_messages_in, stats.n_messages_out,
  337.                 stats.n_addr_in, stats.n_addr_out,
  338.                 stats.ops_sec, stats.runnable_chans,
  339.                 stats.msg_sec_in, stats.msg_sec_out,
  340.                 ctime (&stats.boottime)));
  341.     }
  342.     ll_close (pp_log_norm);
  343. }
  344.  
  345. void schedule ()
  346. {
  347.     struct eventqueue *ev;
  348.     static time_t lasttime;
  349.     time_t    delta;
  350.  
  351.     PP_TRACE (("schedule ()"));
  352.  
  353.     if (opmode && nchansrunning == 0) {
  354.         if (opmode == OP_SHUTDOWN)
  355.             exit (0);
  356.         if (opmode == OP_RESTART)
  357.             restart ();
  358.     }
  359. #ifdef notdef
  360.     if (countdown -- < 0) {
  361.         countdown = 50;
  362.         sort_chans ();
  363.     }
  364. #endif
  365.  
  366.     if (lasttime == 0)
  367.         lasttime = current_time;
  368.     delta = current_time - lasttime;
  369.  
  370.     review_channels (current_time);
  371.     do_load_avg ();
  372.  
  373.     if (EHead -> ev_forw != EHead)
  374.         EHead -> ev_forw -> ev_time -= delta;
  375.  
  376.  
  377.     for (ev = EHead -> ev_forw; ev != EHead;) {
  378.         struct eventqueue *ev2 = ev -> ev_forw;
  379.  
  380.         if (ev -> ev_time <= 0) {
  381.             do_event (ev);
  382.             freevblk (ev);
  383.         }
  384.         else
  385.             break;
  386.         ev = ev2;
  387.     }
  388.     review_channels (current_time);
  389.  
  390.     if (EHead -> ev_forw == EHead )
  391.         delaytime = nextchantime;
  392.     else {
  393.         PP_DBG (("event head time = %d delta = %d",
  394.              EHead -> ev_forw -> ev_time, delta));
  395.         delaytime = max(0, EHead -> ev_forw -> ev_time);
  396.     }
  397.     if (nextchantime < delaytime)
  398.         delaytime = nextchantime;
  399.  
  400.     PP_TRACE (("delaytime == %d", delaytime));
  401.     lasttime = current_time;
  402.     if (opmode && nchansrunning == 0) {
  403.         if (opmode == OP_SHUTDOWN)
  404.             exit (0);
  405.         if (opmode == OP_RESTART)
  406.             restart ();
  407.     }
  408. }
  409.  
  410. void    addtorunq (clp)
  411. Chanlist *clp;
  412. {
  413.     Chanlist *clp2;
  414.  
  415.     PP_TRACE (("adding channel %s to runq", clp -> channame));
  416.     for (clp2 = runq -> cl_forw; clp2 != runq; clp2 = clp2 -> cl_forw)
  417.         if (clp2 == clp) {
  418.             PP_LOG (LLOG_EXCEPTIONS,
  419.                 ("channel %s already on runq",
  420.                  clp -> channame));
  421.             return;
  422.         }
  423.     insque (clp, runq -> cl_forw);
  424. }
  425.  
  426. static void prunq ()
  427. {
  428.     Chanlist *clp;
  429.  
  430.     for (clp = runq -> cl_forw; clp != runq; clp = clp -> cl_forw)
  431.         PP_TRACE (("runq: %s", clp -> channame));
  432. }
  433.  
  434.  
  435. void    delfromrunq (clp)
  436. Chanlist *clp;
  437. {
  438.     PP_TRACE (("removeing channel %s from runq", clp -> channame));
  439.  
  440.     if (clp -> cl_forw == NULLCHANLIST ||
  441.         clp -> cl_back == NULLCHANLIST) {
  442.         PP_LOG(LLOG_EXCEPTIONS,
  443.                ("chan %s not on runq", clp -> channame));
  444.         return;
  445.     }
  446.     if (clp -> nmtas != 0) {
  447.         PP_LOG (LLOG_EXCEPTIONS, ("nmtas not zero for %s",
  448.                       clp -> channame));
  449.         return;
  450.     }
  451.     if (clp -> mtas -> mta_back != clp -> mtas) {
  452.         PP_LOG (LLOG_EXCEPTIONS, ("Still mtas on channel"));
  453.         clp -> chan_update = 1;
  454.         return;
  455.     }
  456.     if (clp -> chan_special) {
  457.         PP_LOG (LLOG_EXCEPTIONS,
  458.             ("Trying to remove special chan %s from runq",
  459.              clp -> channame));
  460.         return;
  461.     }
  462.     remque (clp);
  463.     clp -> cl_forw = clp -> cl_back = NULLCHANLIST;
  464. }
  465.             
  466.             
  467. static void restart ()
  468. {
  469.     int     n, i;
  470.     extern char *cmddfldir, *dupfpath ();
  471.     char    *path;
  472.     char *argv[20];
  473.     int    argc;
  474.     char    nbuf[20];
  475.     extern int chan_state;
  476.  
  477.     PP_TRACE (("restart ()"));
  478.  
  479.     while (CHead -> cb_forw != CHead)
  480.         freecblk (CHead -> cb_forw);    /* shut down connecitons */
  481.  
  482.     n = getdtablesize ();
  483.  
  484.     i = (isatty(2) ? 3 : 0);
  485.     for (; i <= n; i++)
  486.         (void) close (i);
  487. #ifdef notdef
  488.     for (i = 0; i < NSIG; i++)
  489.         (void) signal (i, SIG_DFL);
  490. #endif
  491.     argc = 0;
  492.     argv[argc++] = myname;
  493.     if (maxchansrunning != MAXCHANSRUNNING) {
  494.         argv[argc++] = "-m";
  495.         (void) sprintf (nbuf, "%d", maxchansrunning);
  496.         argv[argc++] = strdup (nbuf);
  497.     }
  498.     if (load_time != LOAD_RETRY_INTERVAL) {
  499.         argv[argc++] = "-l";
  500.         (void) sprintf (nbuf, "%d", load_time/60/60);
  501.         argv[argc++] = strdup (nbuf);
  502.     }
  503.     if (debris_time != TRASH_RETRY_INTERVAL) {
  504.         argv[argc++] = "-d";
  505.         (void) sprintf (nbuf, "%d", debris_time/60/60);
  506.         argv[argc++] = strdup (nbuf);
  507.     }
  508.     if (cache_time != CACHE_TIME) {
  509.         argv[argc++] = "-c";
  510.         (void) sprintf (nbuf, "%d", cache_time);
  511.         argv[argc++] = strdup (nbuf);
  512.     }
  513.     if (timeout_time != TIMEOUT_RETRY_INTERVAL) {
  514.         argv[argc++] = "-t";
  515.         (void) sprintf (nbuf, "%d", timeout_time / 60 / 60);
  516.         argv[argc++] = strdup (nbuf);
  517.     }
  518.     if (chan_state == 0)
  519.         argv[argc++] = "-D";
  520.     argv[argc] = NULLCP;
  521.     path = dupfpath (cmddfldir, myname);
  522.     (void) execv (path, argv);
  523.     (void) execv (myname, argv);
  524.     _exit(1);
  525. }
  526.  
  527. void review_channels (now)
  528. time_t    now;
  529. {
  530.     Chanlist *clp;
  531.     int ndiffchans = 0;
  532.     time_t    tdiff;
  533.  
  534.     PP_TRACE (("review_channels"));
  535.     nextchantime = MAX_SLEEP;
  536.     rqueue = nchansrunning;
  537.     demand_chan = 0;
  538.  
  539.     /* first find how many different channels are running */
  540.     for (clp = runq -> cl_forw; clp != runq; clp = clp -> cl_forw)
  541.         if (clp -> nactive > 0)
  542.             ndiffchans ++;
  543.  
  544.     /* high watermark is the share of the channels allowed each */
  545.     high_wtmk = max(1, (maxchansrunning-1)/(ndiffchans ? ndiffchans : 1));
  546.     /* low water mark - each channel allowed at least this */
  547.     low_wtmk = max(1, high_wtmk/2);
  548.  
  549.     for (clp = runq -> cl_forw; clp != runq; clp = clp -> cl_forw) {
  550.         switch (runnable (clp, now)) {
  551.             case OK:
  552.             (void) newevblk (clp, NULLCB,
  553.                      clp-> chan_special ?
  554.                      cb_special : cb_channel,
  555.                      (time_t)0);
  556.             rqueue ++;
  557.             break;
  558.             case DONE:
  559.             /*
  560.              * If it is allowed more - but can't get one
  561.              * it can demand its fair share.
  562.              * In this case we have to reasses the high
  563.              * water mark.
  564.              */
  565.             if (clp -> nactive < low_wtmk) {
  566.                 if (clp -> nactive == 0)
  567.                     ndiffchans ++;
  568.                 demand_chan = 1;
  569.                 PP_TRACE(("%s demands a channel",
  570.                        clp -> channame));
  571.             }
  572.             rqueue ++;
  573.             break;
  574.             case NOTOK:
  575.             tdiff = clp -> nextevent - current_time;
  576.             if (tdiff > 0 && tdiff < nextchantime)
  577.                 nextchantime = tdiff;
  578.             break;
  579.         }
  580.     }
  581.  
  582.     high_wtmk = max(1, maxchansrunning/(ndiffchans ? ndiffchans : 1));
  583.     PP_TRACE (("review channels lw=%d hw=%d nd=%d",
  584.            low_wtmk, high_wtmk, ndiffchans));
  585. }
  586.  
  587.  
  588. static int runnable (clp, now)
  589. Chanlist *clp;
  590. time_t    now;
  591. {
  592.     PP_TRACE (("runnable %s?", clp -> channame));
  593.     if (opmode)
  594.         return NOTOK;
  595.     if (clp -> chan_update && clp -> nactive == 0) {
  596.         investigate_chan (clp, now);
  597.         clp -> chan_update = 0;
  598.     }
  599.     if (clp -> chan_special == 0 && clp -> num_msgs <= 0
  600.         && clp -> num_drs <= 0) /* special channel with things to do */
  601.         return NOTOK;
  602.     if (clp -> chan_enabled == 0) /* channel switched off */
  603.         return NOTOK;
  604.     if (clp -> cache.cachetime > now ||
  605.         clp -> nextevent > now ) /* not time yet */
  606.         return NOTOK;
  607.     if (clp -> chan_special && nspecials > 0)
  608.         return NOTOK;
  609.     if (clp -> chan -> ch_maxproc > 0 &&
  610.         clp -> nactive >= clp -> chan -> ch_maxproc)
  611.         return NOTOK;
  612.     if (clp -> nactive <= 0) {
  613.         if (nchansrunning >= maxchansrunning) /* too many running already */
  614.             return DONE;
  615.  
  616.         return OK; /* None running - start one */
  617.     }
  618. /*
  619.  * So there is a channel already running:
  620.  * OK - should we start more - to do this we need the following
  621.  * o More mtas ready than chans currently running
  622.  * o We started the last channel more than a 15 secs ago
  623.  *
  624.  * o We aren't consuming all the channels allowed
  625.  * o More messages than channels * 5 OR last time we started was ages ago
  626.  */
  627.     if (clp -> nmtas <= clp -> nactive
  628.         || clp -> laststart + 15 > current_time)
  629.         return NOTOK;
  630.  
  631.     
  632.     /* ok - we ought to start a new channel - but should we? */
  633.  
  634.     if (clp -> nactive >= max(1,maxchansrunning-1))
  635.         return DONE;    /* Would like to - but out of resources */
  636.     
  637.     if ((clp -> num_msgs + clp -> num_drs) > clp -> nactive * 5
  638.         || clp -> laststart + 60*5 < current_time) {
  639.         Mtalist *mlp = NULLMTALIST;
  640.         
  641.         if (nextmsg (clp, &mlp, 0, 0) == NULL)
  642.             return NOTOK; /* nope all locked */
  643.         PP_TRACE (("Can start a new chan on %s", mlp -> mtaname));
  644.         if (nchansrunning >= maxchansrunning) /* too many running already */
  645.             return DONE;
  646.         return OK;
  647.     }
  648.     /* No - this channel is just not ready to start another one */
  649.     return NOTOK;
  650. }
  651.  
  652. static void do_event (ev)
  653. struct eventqueue *ev;
  654. {
  655.     Connblk *cb;
  656.  
  657.     PP_TRACE (("do_event ()"));
  658.  
  659.     if ((cb = ev -> ev_conn) == NULLCB) {
  660.         ev -> ev_conn = cb = newcblk (ev -> ev_type);
  661.         cb -> cb_clp = ev -> ev_clp;
  662.     }
  663.  
  664.     switch (cb -> cb_type) {
  665.         case cb_channel:
  666.         channel_event (ev);
  667.         break;
  668.  
  669.         case cb_special:
  670.         special_event (ev);
  671.         break;
  672.  
  673.         case cb_timer:
  674.         PP_TRACE (("Timeout event"));
  675.         if (cb -> cb_proc)
  676.             (*cb -> cb_proc)();
  677.         if (cb -> cb_reload)
  678.             (void) newevblk (NULLCHANLIST, cb, cb_timer,
  679.                      cb -> cb_reload);
  680.         else    freecblk (cb);
  681.         break;
  682.  
  683.         default:
  684.         PP_LOG (LLOG_EXCEPTIONS, ("Unknown type %d", cb -> cb_type));
  685.         break;
  686.     }
  687. }
  688.  
  689. static int chan_still_ready (cb)
  690. Connblk *cb;
  691. {
  692.     /* should we continue to process? */
  693.  
  694.     /* don't process if:
  695.      * - channel disabled
  696.      * - we are in some global shutdown
  697.      * - we are being greedy & others are suffering
  698.      */
  699.     if (cb -> cb_clp -> chan_enabled == 0 || opmode ||
  700.         (demand_chan && cb -> cb_nops > MIN_OPS &&
  701.          cb -> cb_clp -> nactive > high_wtmk)) {
  702.         if (demand_chan) {
  703.             PP_TRACE (("Closing down %s due to demand h=%d l=%d",
  704.                    cb_print (cb), high_wtmk, low_wtmk));
  705.             if (cb -> cb_clp -> cache.cachetime <
  706.                 current_time)
  707.                 cb -> cb_clp -> cache.cachetime =
  708.                     current_time + 15;
  709.                 /* we are going to satisfy request */
  710.             demand_chan = 0;
  711.         }
  712.         return NOTOK;
  713.     }
  714.     /*
  715.      * hmm - this channel is not going so well - perhaps we should
  716.      * not try so hard. E.g. close down this channel if there is more
  717.      * than one running anyway.
  718.      */
  719.     if (cb -> cb_clp -> error_count > 5) {
  720.         cb -> cb_clp -> error_count = 0;
  721.         if (cb -> cb_clp -> cache.cachetime < current_time)
  722.             cache_inc (&cb -> cb_clp -> cache, cache_time);
  723.         if (cb -> cb_clp -> nactive > 1) {
  724.             PP_NOTICE (("Closing channel %s as things aren't going very well",
  725.                     cb_print(cb)));
  726.             return NOTOK;
  727.         }
  728.     }
  729.     /*
  730.      * OK - grab the next message and GO.
  731.      */
  732.     if (cb -> cb_ml == NULL &&
  733.         (cb -> cb_ml = nextmsg (cb -> cb_clp,
  734.                     &cb -> cb_mlp, 1, 0)) == NULL) {
  735.             
  736.         if (cb -> cb_clp -> cache.cachetime < current_time)
  737.             /* stop it thrashing... */
  738.             cb -> cb_clp -> cache.cachetime =
  739.                 current_time + 15;
  740.         return NOTOK;
  741.     }
  742.     return OK;
  743. }
  744.  
  745. static void channel_event (ev)
  746. struct eventqueue *ev;
  747. {
  748.     Connblk *cb = ev -> ev_conn;
  749.     Mlist    *ml;
  750.     Chanlist *clp;
  751.  
  752.     PP_TRACE (("channel_event ()"));
  753.  
  754.     if ((clp = ev -> ev_clp) == NULLCHANLIST)
  755.         PP_TRACE (("No chanlist of event queue"));
  756.  
  757.     switch (cb -> cb_state) {
  758.         case cb_idle:
  759.         if (runnable (clp, current_time) != OK) {
  760.             freecblk (cb);
  761.             return;
  762.         }
  763.         if ((ml = nextmsg (cb -> cb_clp, &cb -> cb_mlp, 1, 0)) == NULL) {
  764.             clp -> chan_update = 1;
  765.             freecblk (cb);
  766.             return;
  767.         }
  768.         cb -> cb_ml = ml;
  769.         cb -> cb_clp -> lastattempt = current_time;
  770.         cb -> cb_clp -> laststart = current_time;
  771.         switch (start_async (cb, clp -> channame)) {
  772.             case NOTOK:
  773.             cache_inc (&clp -> cache, cache_time);
  774.             clp -> nextevent = clp -> cache.cachetime;
  775.             cleanup_conn (cb);
  776.             break;
  777.  
  778.             case DONE:
  779.             (void) newevblk (cb -> cb_clp, cb,
  780.                      cb -> cb_type, (time_t)0);
  781.             /* and fall */
  782.             case OK:
  783.             clp -> nactive ++;
  784.             nchansrunning ++;
  785.             break;
  786.         }
  787.         break;
  788.  
  789.         case cb_conn_established:
  790.         (void) chan_invoke (cb, DIS_INIT,
  791.                     (caddr_t *) &clp -> channame);
  792.         break;
  793.  
  794.         case cb_conn_request1:
  795.         case cb_conn_request2:
  796.         case cb_init_sent:
  797.         case cb_proc_sent:
  798.         case cb_close_sent:
  799.         PP_TRACE (("Shouldn't be in this state - %d",
  800.                cb -> cb_state));
  801.         break;
  802.         
  803.         case cb_active:
  804.         if (chan_still_ready(cb) == NOTOK) {
  805.             (void) chan_invoke (cb, DIS_QUIT,
  806.                         (caddr_t *)0);
  807.         }
  808.         else {
  809.             (void) chan_invoke (cb, DIS_PROC, (caddr_t *)0);
  810.         }
  811.         break;
  812.     }
  813. }
  814.  
  815. static void special_event (ev)
  816. struct eventqueue *ev;
  817. {
  818.     Connblk *cb = ev -> ev_conn;
  819.     Chanlist    *clp = ev -> ev_clp;
  820.  
  821.     PP_TRACE (("special_event (%s)", cb_print (cb)));
  822.     switch (cb -> cb_state) {
  823.         case cb_idle:
  824.         if (runnable (clp, current_time) != OK) {
  825.             freecblk (cb);
  826.             return;
  827.         }
  828.         clp -> lastattempt = current_time;
  829.         clp -> laststart = current_time;
  830.  
  831.         switch (start_async (cb, clp -> channame)) {
  832.             case NOTOK:
  833.             PP_LOG (LLOG_EXCEPTIONS,
  834.                 ("Can't start special channel %s",
  835.                  clp -> channame));
  836.             cache_inc (&clp -> cache, (time_t)60);
  837.             (void) newevblk (NULLCHANLIST, NULLCB, cb_timer,
  838.                      (time_t) 60);
  839.             clp -> chan_update = 1;
  840.             freecblk (cb);
  841.             break;
  842.  
  843.             case DONE:
  844.             (void) newevblk (clp, cb, cb -> cb_type,
  845.                      (time_t)0);
  846.             case OK:
  847.             nspecials ++;
  848.             clp -> nactive ++;
  849.             nchansrunning ++;
  850.             if (clp -> chan -> ch_chan_type == CH_QMGR_LOAD &&
  851.                 nodisablemsgclean == 0 &&
  852.                 delete_chan -> chan_enabled) {
  853.                 delete_chan -> chan_enabled = 0;
  854.                 delete_chan -> chan_syssusp = 1;
  855.             }
  856.             break;
  857.         }
  858.         break;
  859.  
  860.         case cb_active:
  861.         switch (clp -> chan -> ch_chan_type) {
  862.             case CH_QMGR_LOAD:
  863.             if (cb -> cb_clp -> chan_enabled == 0 || opmode) {
  864.                 (void) chan_invoke (cb, DIS_QUIT, (caddr_t*)0);
  865.                 clp -> cache.cachetime =
  866.                     current_time + 60;
  867.             }
  868.             else
  869.                 (void) chan_invoke (cb, DIS_READQ,
  870.                             (caddr_t *)0);
  871.             break;
  872.             case CH_DEBRIS:
  873.             (void) chan_invoke (cb, DIS_QUIT, (caddr_t *)0);
  874.             clp -> lastsuccess = current_time;
  875.             clp -> cache.cachetime = current_time + debris_time;
  876.             break;
  877.             case CH_TIMEOUT:
  878.             (void) chan_invoke (cb, DIS_QUIT, (caddr_t *)0);
  879.             break;
  880.         }
  881.         clp -> chan_update = 1;
  882.         break;
  883.  
  884.         case cb_conn_established:
  885.         switch (clp -> chan -> ch_chan_type) {
  886.             case CH_QMGR_LOAD:
  887.             PP_TRACE (("start loading"));
  888.             (void) chan_invoke (cb, DIS_READQ, (caddr_t *)0);
  889.             break;
  890.             case CH_DEBRIS:
  891.             PP_TRACE (("start debris collection"));
  892.             (void) chan_invoke (cb, DIS_INIT,
  893.                         (caddr_t *)&clp -> channame);
  894.             break;
  895.             case CH_TIMEOUT:
  896.             break;
  897.         }
  898.         break;
  899.         case cb_finished:
  900.         switch (clp -> chan -> ch_chan_type) {
  901.             case CH_QMGR_LOAD:
  902.             (void) chan_invoke (cb, DIS_QUIT, (caddr_t *)0);
  903.             clp -> cache.cachetime = current_time + load_time;
  904.             if (delete_chan -> chan_enabled == 0 &&
  905.                 delete_chan -> chan_syssusp == 1) {
  906.                 delete_chan -> chan_enabled = 1;
  907.                 delete_chan -> chan_syssusp = 0;
  908.                 delete_chan -> chan_update = 1;
  909.             }
  910.             clp -> chan_update = 1;
  911.             break;
  912.             default:
  913.             PP_LOG (LLOG_EXCEPTIONS, ("Bad state in special %d",
  914.                           cb -> cb_state));
  915.             break;
  916.         }
  917.         break;
  918.  
  919.         default:
  920.         PP_LOG (LLOG_EXCEPTIONS, ("Bad state in special %d",
  921.                       cb -> cb_state));
  922.         break;
  923.     }
  924. }
  925.  
  926. int start_async (cb, title)
  927. Connblk *cb;
  928. char    *title;
  929. {
  930.     int    sd;
  931.     int    result;
  932.  
  933.     PP_NOTICE (("Starting channel %s %s", title, cb_print(cb)));
  934.     switch(result = assoc_start (title, &sd)) {
  935.         case NOTOK:
  936.         PP_LOG (LLOG_EXCEPTIONS, ("Can't start channel %s",
  937.                       title));
  938.         return NOTOK;
  939.  
  940.         case CONNECTING_1:
  941.         cb -> cb_state = cb_conn_request1;
  942.         cb -> cb_fd = sd;
  943.         FD_CLR (sd, &perf_rfds);
  944.         FD_SET (sd, &perf_wfds);
  945.         if (sd >= perf_nfds)
  946.             perf_nfds = sd + 1;
  947.         break;
  948.  
  949.         case CONNECTING_2:
  950.         cb -> cb_state = cb_conn_request2;
  951.         cb -> cb_fd = sd;
  952.         FD_CLR (sd, &perf_wfds);
  953.         FD_SET (sd, &perf_rfds);
  954.         if (sd >= perf_nfds)
  955.             perf_nfds = sd + 1;
  956.         break;
  957.  
  958.         case DONE:
  959.         cb -> cb_state = cb_conn_established;
  960.         cb -> cb_fd = sd;
  961.         FD_CLR (sd, &perf_wfds);
  962.         FD_SET (sd, &perf_rfds);
  963.         if (sd >= perf_nfds)
  964.             perf_nfds = sd + 1;
  965.         break;
  966.  
  967.         default:
  968.         PP_LOG (LLOG_EXCEPTIONS,
  969.             ("Unknown return code from assoc_start"));
  970.         return NOTOK;
  971.     }
  972.     start_timer ();
  973.     cb -> cb_ttl = current_time + 60;
  974.     remque (cb -> cb_clp);
  975.     insque (cb -> cb_clp, runq -> cl_back);
  976.     PP_DBG (("Association descriptor=%d", sd));
  977.     return result;
  978. }
  979.  
  980. int chan_invoke (cb, type, arg)
  981. Connblk *cb;
  982. int    type;
  983. caddr_t    *arg;
  984. {
  985.     struct client_dispatch *ds;
  986.     int result;
  987.  
  988.     noperations ++;
  989.     cb -> cb_nops ++;
  990.     ds = &dis[type];
  991.     switch (type) {
  992.         default:
  993.         PP_TRACE (("%s %s", ds -> ds_name, cb_print (cb)));
  994.         break;
  995.         case DIS_PROC:
  996.         PP_NOTICE (("%s %s", ds -> ds_name, cb_print(cb)));
  997.         break;
  998.     }
  999.     switch (result = invoke (cb, table_Qmgr_Operations, 
  1000.                  ds, arg)) {
  1001.         case OK:
  1002.         break;
  1003.  
  1004.         case NOTOK:
  1005.         cb -> cb_state = cb_active;
  1006.         break;
  1007.  
  1008.         case DONE:
  1009.         cleanup_conn (cb);
  1010.         break;
  1011.     }
  1012.     return result;
  1013. }
  1014.  
  1015. /*   */
  1016.  
  1017. static int invoke (cb, ops, ds, args)
  1018. Connblk *cb;
  1019. struct RyOperation ops[];
  1020. register struct client_dispatch *ds;
  1021. char  **args;
  1022. {
  1023.     int        result;
  1024.     caddr_t in;
  1025.     struct RoSAPindication  rois;
  1026.     register struct RoSAPindication *roi = &rois;
  1027.     register struct RoSAPpreject   *rop = &roi -> roi_preject;
  1028.     int    sd;
  1029.  
  1030.     PP_TRACE (("invoke (%s)", cb_print (cb)));
  1031.  
  1032.     sd = cb -> cb_fd;
  1033.     in = NULL;
  1034.     if (ds -> ds_argument &&
  1035.     (result = (*ds -> ds_argument) (sd, ds, args, &in)) != OK)
  1036.     return result;
  1037.  
  1038.      switch (result = RyStub (sd, ops, ds -> ds_operation,
  1039.                   cb -> cb_id = RyGenID (sd),
  1040.                   NULLIP, in, ds -> ds_result, ds -> ds_error,
  1041.                   ROS_ASYNC, roi)) {
  1042.     case NOTOK:        /* failure */
  1043.         ros_advise (rop, "STUB");
  1044.         if (ROS_FATAL (rop -> rop_reason))
  1045.             return DONE;
  1046.         return NOTOK;
  1047.  
  1048.     case OK:        /* got a result/error response */
  1049.         break;
  1050.  
  1051.     case DONE:        /* got RO-END? */
  1052.         adios (NULLCP, "got RO-END.INDICATION");
  1053.         return NOTOK;
  1054.  
  1055.     default:
  1056.         adios (NULLCP, "unknown return from RyStub=%d", result);
  1057.         /* NOTREACHED */
  1058.     }
  1059.  
  1060.     if (ds -> ds_fr_mod && in)
  1061.     fre_obj (in, ds -> ds_fr_mod -> md_dtab[ds -> ds_fr_index],
  1062.          ds -> ds_fr_mod, 1);
  1063.     return OK;
  1064. }
  1065.  
  1066. void chan_manage (fd)
  1067. int    fd;
  1068. {
  1069.     register Connblk *cb;
  1070.     int    sd;
  1071.  
  1072.     if ((cb = findcblk (fd)) == NULLCB) {
  1073.         PP_LOG (LLOG_EXCEPTIONS, ("fd %d is not registered!", fd));
  1074.         FD_CLR (fd, &perf_rfds);
  1075.         FD_CLR (fd, &perf_wfds);
  1076.         return;
  1077.     }
  1078.  
  1079.     PP_TRACE (("chan_manage (%s)", cb_print (cb) ));
  1080.  
  1081.     switch (cb -> cb_state) {
  1082.         case cb_conn_request1:
  1083.         case cb_conn_request2:
  1084.         PP_TRACE (("Awaiting async connection (state %d)",
  1085.                cb -> cb_state == cb_conn_request1 ? 1 : 2));
  1086.         switch (acsap_retry (sd = cb -> cb_fd)) {
  1087.             case NOTOK:
  1088.             PP_TRACE (("Connection error on %s", cb_print (cb)));
  1089.             delay_channel (cb);
  1090.             cleanup_conn (cb);
  1091.             return;
  1092.  
  1093.             case CONNECTING_1:
  1094.             cb -> cb_state = cb_conn_request1;
  1095.             PP_TRACE (("State 1 on %s", cb_print (cb)));
  1096.             FD_CLR (sd, &perf_rfds);
  1097.             FD_SET (sd, &perf_wfds);
  1098.             if (sd >= perf_nfds)
  1099.                 perf_nfds = sd + 1;
  1100.             break;
  1101.  
  1102.             case CONNECTING_2:
  1103.             cb -> cb_state = cb_conn_request2;
  1104.             PP_TRACE (("State 2 on %s", cb_print (cb)));
  1105.             FD_CLR (sd, &perf_wfds);
  1106.             FD_SET (sd, &perf_rfds);
  1107.             if (sd >= perf_nfds)
  1108.                 perf_nfds = sd + 1;
  1109.             break;
  1110.             case DONE:
  1111.             cb -> cb_state = cb_conn_established;
  1112.             PP_TRACE (("connection now established on %s",
  1113.                    cb_print (cb)));
  1114.             FD_CLR (sd, &perf_wfds);
  1115.             FD_SET (sd, &perf_rfds);
  1116.             if (sd >= perf_nfds)
  1117.                 perf_nfds = sd + 1;
  1118.             (void) newevblk (cb -> cb_clp, cb,
  1119.                      cb -> cb_type, (time_t)0);
  1120.             break;
  1121.         }
  1122.         break;
  1123.         case cb_idle:
  1124.         freecblk (cb);
  1125.         break;
  1126.         case cb_proc_sent:
  1127.         PP_TRACE (("Awaiting proc response ..."));
  1128.         getresponse (cb, OK);
  1129.         break;
  1130.         case cb_init_sent:
  1131.         PP_TRACE (("Awaiting init response"));
  1132.         getresponse (cb, OK);
  1133.         break;
  1134.         default:
  1135.         PP_TRACE (("Funny state!"));
  1136.         break;
  1137.     }
  1138. }
  1139.  
  1140. static void getresponse (cb, type)
  1141. Connblk *cb;
  1142. int    type;
  1143. {
  1144.     caddr_t    out;
  1145.     struct RoSAPindication    rois;
  1146.     register struct RoSAPindication *roi = &rois;
  1147.     register struct RoSAPpreject   *rop = &roi -> roi_preject;
  1148.     
  1149.     PP_TRACE (("getresponse (%s)", cb_print (cb)));
  1150.  
  1151.     switch (RyWait (cb -> cb_fd, &cb -> cb_id, &out, type, roi)) {
  1152.         case NOTOK:
  1153.         if (rop -> rop_reason == ROS_TIMER)
  1154.             break;
  1155.         PP_TRACE (("RyWait returned NOTOK"));
  1156.         ros_advise (rop, "STUB");
  1157.         delay_channel (cb);
  1158.         cleanup_conn (cb);
  1159.         break;
  1160.  
  1161.         case OK:
  1162.         PP_TRACE (("RyWait returned OK"));
  1163.         switch (cb -> cb_state) {
  1164.             default:
  1165.             cb -> cb_state = cb_active;
  1166.             /* and fall */
  1167.             case cb_finished:
  1168.             (void) newevblk (cb -> cb_clp, cb,
  1169.                      cb -> cb_type, (time_t)0);
  1170.             break;
  1171.             case cb_proc_sent:
  1172.             break;
  1173.             case cb_error:
  1174.             cleanup_conn (cb);
  1175.             break;
  1176.         }
  1177.         break;
  1178.  
  1179.         case DONE:
  1180.         cb -> cb_ml = NULL;
  1181.         cleanup_conn (cb);
  1182.         break;
  1183.     }
  1184. }
  1185.  
  1186. #ifdef notdef
  1187. static void setfds ()
  1188. {
  1189.     register Connblk *cb;
  1190.     struct PSAPindication pi;
  1191.  
  1192.     PP_TRACE (("setfds ()"));
  1193.  
  1194.     FD_ZERO (&perf_wfds);
  1195.     FD_ZERO (&perf_rfds);
  1196.     perf_nfds = 0;
  1197.  
  1198.     for (cb = CHead -> cb_forw; cb != CHead; cb = cb -> cb_forw)
  1199.         switch (cb -> cb_state) {
  1200.             case cb_conn_request1:
  1201.             if (PSelectMask (cb -> cb_fd, &perf_wfds,
  1202.                      &perf_nfds, &pi) == NOTOK)
  1203.                 PP_LOG (LLOG_EXCEPTIONS,
  1204.                      ("PSelectMask failed"));
  1205.             break;
  1206.             case cb_conn_request2:
  1207.             case cb_proc_sent:
  1208.             case cb_init_sent:
  1209.             case cb_conn_established:
  1210.             if (PSelectMask (cb -> cb_fd, &perf_rfds,
  1211.                      &perf_nfds, &pi) == NOTOK)
  1212.                 PP_LOG (LLOG_EXCEPTIONS,
  1213.                     ("PSelectMask failed"));
  1214.             break;
  1215.             default:
  1216.             break;
  1217.         }
  1218.     PP_TRACE (("nrfds=%d rfds=0x%x wfds=0x%x", perf_nfds,
  1219.          perf_rfds.fds_bits[0], perf_wfds.fds_bits[0]));
  1220. }
  1221. #endif
  1222.  
  1223. /* ARGSUSED */
  1224. static int  do_quit (sd, ds, args, arg)
  1225. int    sd;
  1226. struct client_dispatch *ds;
  1227. char  **args;
  1228. caddr_t    *arg;
  1229. {
  1230.     Connblk *cb;
  1231.  
  1232.     PP_TRACE (("do_quit(%d)", sd));
  1233.  
  1234.     (void) assoc_release (sd);
  1235.     if (sd != NOTOK) {
  1236.         FD_CLR (sd, &perf_wfds);
  1237.         FD_CLR (sd, &perf_rfds);
  1238.     }
  1239.     if ((cb = findcblk (sd)) != NULLCB)
  1240.         cb -> cb_fd = NOTOK;
  1241.     else
  1242.         return DONE;
  1243.  
  1244.     PP_TRACE (("do_quit (%s)", cb_print (cb) ));
  1245.     if (cb -> cb_type == cb_channel)
  1246.         cb -> cb_clp -> chan_update = 1;
  1247.  
  1248.     PP_NOTICE (("Closing channel %s", cb_print (cb)));
  1249.  
  1250.     return DONE;
  1251. }
  1252.  
  1253. /* connection block functions */
  1254.  
  1255. Connblk *newcblk (type)
  1256. enum    cb_type type;
  1257. {
  1258.     Connblk *cb;
  1259.  
  1260.     PP_TRACE (("newcblk () cnt=%d", cb_count));
  1261.  
  1262.     cb = (Connblk *) calloc (1, sizeof *cb);
  1263.     if (cb == NULLCB)
  1264.         return cb;
  1265.  
  1266.     cb -> cb_fd = NOTOK;
  1267.     cb -> cb_type = type;
  1268.  
  1269.     if (cb_once_only == 0) {
  1270.         CHead -> cb_forw = CHead -> cb_back = CHead;
  1271.         cb_once_only ++;
  1272.     }
  1273.     insque (cb, CHead -> cb_back);
  1274.     cb_count ++;
  1275.     return cb;
  1276. }
  1277.  
  1278. void freecblk (cb)
  1279. Connblk *cb;
  1280. {
  1281.     struct AcSAPindication    acis;
  1282.     struct RoSAPindication    rois;
  1283.     register struct RoSAPindication *roi = &rois;
  1284.     
  1285.     if (cb == NULLCB)
  1286.         return;
  1287.  
  1288.     PP_DBG (("freecblk (%s) cnt=%d",
  1289.         cb_print(cb), cb_count));
  1290.  
  1291.     if (cb -> cb_fd != NOTOK) {
  1292.         PP_NOTICE (("Killing channel %s",
  1293.                 cb_print (cb)));
  1294.         (void) AcUAbortRequest (cb -> cb_fd, NULLPEP, 0, &acis);
  1295.         (void) RyLose (cb -> cb_fd, roi);
  1296.         FD_CLR (cb -> cb_fd, &perf_wfds);
  1297.         FD_CLR (cb -> cb_fd, &perf_rfds);
  1298.     }
  1299.     remque (cb);
  1300.  
  1301.     cb_count --;
  1302.     if (cb_count < 0)
  1303.         PP_LOG (LLOG_EXCEPTIONS, ("Internal error cb_count = %d",
  1304.                       cb_count));
  1305.     free ((char *) cb);
  1306. }
  1307.  
  1308. Connblk *findcblk (fd)
  1309. int    fd;
  1310. {
  1311.     Connblk *cb;
  1312.  
  1313.     PP_DBG (("findcblk (%d) cnt=%d", fd, cb_count));
  1314.  
  1315.     if (cb_once_only == 0)
  1316.         return NULLCB;
  1317.  
  1318.     for (cb = CHead -> cb_forw; cb != CHead; cb = cb -> cb_forw)
  1319.         if (cb -> cb_fd == fd)
  1320.             return cb;
  1321.  
  1322.     PP_TRACE (("Can't locate block with %d (%d alloc'd)",
  1323.         fd, cb_count));
  1324.     return NULLCB;
  1325. }
  1326.  
  1327. static char    *cb2name (cb)
  1328. Connblk *cb;
  1329. {
  1330.     switch (cb -> cb_type) {
  1331.         case cb_channel:
  1332.         case cb_special:
  1333.         if (cb -> cb_clp && cb -> cb_clp -> channame)
  1334.             return cb -> cb_clp -> channame;
  1335.         else    return "unknown channel";
  1336.  
  1337.         case cb_timer:
  1338.         return "timer";
  1339.  
  1340.         case cb_responder:
  1341.         return "responder";
  1342.  
  1343.         default:
  1344.         break;
  1345.     }
  1346.     return "something";
  1347. }
  1348.  
  1349. static char    *cb_print (cb)
  1350. Connblk *cb;
  1351. {
  1352.     static char buf[1024];
  1353.  
  1354.     switch (cb -> cb_type) {
  1355.         case cb_channel:
  1356.         case cb_special:
  1357.         (void) sprintf (buf, "<fd=%d chan=%s mta=%s msg=%s>",
  1358.                 cb -> cb_fd,
  1359.                 cb2name(cb),
  1360.                 cb -> cb_mlp ? cb -> cb_mlp -> mtaname: "none",
  1361.                 (cb -> cb_ml && cb -> cb_ml -> ms ) ?
  1362.                 cb -> cb_ml -> ms -> queid : "none");
  1363.         break;
  1364.         case cb_timer:
  1365.         (void) sprintf (buf, "<timer proc=0x%x reload=%d>",
  1366.                 cb -> cb_proc, cb -> cb_reload);
  1367.         break;
  1368.  
  1369.         case cb_responder:
  1370.         (void) sprintf (buf, "<fd=%d responder auth=%d>",
  1371.                 cb -> cb_fd, cb -> cb_authenticated);
  1372.         break;
  1373.     }
  1374.  
  1375.     return buf;
  1376. }
  1377.  
  1378. static void pcblock_error (cb)
  1379. Connblk *cb;
  1380. {
  1381.     PP_OPER (NULLCP,
  1382.          ("Channel %s in state %d taking too long - aborting rfd=%s wfd=%s nfds=%d",
  1383.           cb_print (cb), cb -> cb_state,
  1384.           FD_ISSET (cb->cb_fd, &perf_rfds) ? "set" : "unset",
  1385.           FD_ISSET (cb->cb_fd, &perf_wfds) ? "set" : "unset",
  1386.           perf_nfds
  1387.          ));
  1388. }
  1389.     
  1390. static struct eventqueue *newevblk (clp, cb, type, when)
  1391. Chanlist *clp;
  1392. Connblk *cb;
  1393. time_t    when;
  1394. enum cb_type type;
  1395. {
  1396.     struct eventqueue *ev, *pev;
  1397.  
  1398.     PP_TRACE (("newevblk (clp, cb, %d, %d) cnt=%d",
  1399.            type, when, ev_count));
  1400.  
  1401.     ev = (struct eventqueue *) calloc (1, sizeof *ev);
  1402.     if (ev == NULLEV) 
  1403.         return ev;
  1404.  
  1405.     ev_count ++;
  1406.     ev -> ev_clp = clp;
  1407.     ev -> ev_conn = cb;
  1408.     ev -> ev_type = type;
  1409.  
  1410.     for (pev = EHead -> ev_forw; pev != EHead; pev = pev -> ev_forw) {
  1411.         if (pev -> ev_time > when) {
  1412.             insque (ev, pev -> ev_back);
  1413.             break;
  1414.         }
  1415.         else
  1416.             when -= pev -> ev_time;
  1417.     }
  1418.     if (pev == EHead)
  1419.         insque (ev, EHead -> ev_back);
  1420.     for (; pev != EHead; pev = pev -> ev_forw)
  1421.         pev -> ev_time -= when;
  1422.  
  1423.     ev -> ev_time = when;
  1424.  
  1425.     return ev;
  1426. }
  1427.  
  1428. static void freevblk (ev)
  1429. struct eventqueue *ev;
  1430. {
  1431.     PP_DBG (("freevblk () cnt=%d", ev_count));
  1432.  
  1433.     if (ev == NULLEV)
  1434.         return;
  1435.  
  1436.     if (ev -> ev_forw != EHead)
  1437.         ev -> ev_forw -> ev_time += ev -> ev_time;
  1438.             
  1439.     remque (ev);
  1440.  
  1441.     free ((char *) ev);
  1442.     ev_count --;
  1443.     if (ev_count < 0)
  1444.         PP_LOG (LLOG_EXCEPTIONS, ("Internal error - ev_count = %d",
  1445.                       ev_count));
  1446. }
  1447.  
  1448. #ifdef notdef
  1449. static time_t evtimebyclp (clp)
  1450. Chanlist *clp;
  1451. {
  1452.     struct eventqueue *ev;
  1453.     time_t    now = current_time;
  1454.     
  1455.     for (ev = EHead -> ev_forw; ev != EHead; ev = ev -> ev_forw)
  1456.         if (ev -> ev_clp == clp)
  1457.             return now + ev -> ev_time;
  1458.         else now += ev -> ev_time;
  1459.     return now + MAX_SLEEP + 2;
  1460. }
  1461.  
  1462. static struct eventqueue *findevbyclp (clp)
  1463. Chanlist *clp;
  1464. {
  1465.     struct eventqueue *ev;
  1466.  
  1467.     for (ev = EHead -> ev_forw; ev != EHead; ev = ev -> ev_forw)
  1468.         if (ev -> ev_clp == clp)
  1469.             return ev;
  1470.     return NULLEV;
  1471. }
  1472. #endif
  1473.  
  1474. static void init_events ()
  1475. {
  1476.     PP_TRACE (("init_events ()"));
  1477.  
  1478.     EHead -> ev_forw = EHead -> ev_back = EHead;
  1479. }
  1480.  
  1481. sched_delete (rlp, ms, zapped)
  1482. Reciplist *rlp;
  1483. MsgStruct *ms;
  1484. int zapped;
  1485. {
  1486.     rlp -> status = ST_DELETE;
  1487.     if (ms -> count == 0)
  1488.         insertindelchan (ms);
  1489.     stats.n_addr_out ++;
  1490.     if (zapped)
  1491.         stats.n_messages_out ++;
  1492. }
  1493.  
  1494. void inc_channel (cb, number)    /* successful delivery */
  1495. Connblk *cb;
  1496. int    number;
  1497. {
  1498.     MsgStruct *ms;
  1499.     Reciplist *rlp, *rl0;
  1500.     LIST_RCHAN *lcp;
  1501.     int deleted;
  1502.  
  1503.     PP_TRACE (("inc_channel (%s, %d)",
  1504.            cb_print (cb), number));
  1505.  
  1506.     cb -> cb_clp -> lastsuccess = current_time;
  1507.     cb -> cb_clp -> error_count = 0;
  1508.     ms = cb -> cb_ml -> ms;
  1509.     clear_msgcache (cb -> cb_ml);
  1510.  
  1511.     if (number == 0)    /* set real recip number */
  1512.         number = cb -> cb_ml -> recips[0] -> id;
  1513.     deleted = delfromchan (cb -> cb_clp, cb -> cb_mlp,
  1514.                    cb -> cb_ml, number);
  1515.  
  1516.     if (deleted & ZAP_MTA)
  1517.         cb -> cb_mlp = NULLMTALIST;
  1518.  
  1519.     cache_clear (&cb -> cb_clp -> cache);
  1520.  
  1521.     if (cb -> cb_mlp) {
  1522.         cache_clear (&cb -> cb_mlp -> cache);
  1523.         cb -> cb_mlp -> lastsuccess = current_time;
  1524.         cb -> cb_mlp -> error_count = 0;
  1525.     }
  1526.  
  1527.     if (ms == NULL) {
  1528.         PP_LOG (LLOG_EXCEPTIONS, ("Can't locate message!"));
  1529.         return;
  1530.     }
  1531.     for (rlp = ms  -> recips; rlp; rlp = rlp -> rp_next)
  1532.         if (rlp -> id == number)
  1533.             break;
  1534.     if (rlp == NULL) {
  1535.         PP_LOG (LLOG_EXCEPTIONS, ("No recipient %d in list!",
  1536.                       number));
  1537.         return;
  1538.     }
  1539.  
  1540.     switch (rlp -> status) {
  1541.         case ST_WARNING:
  1542.         ms -> numberwarns ++;
  1543.         lcp = rlp -> cchan;
  1544.         rlp -> status = ST_NORMAL;
  1545.         if (lcp) {
  1546.             insertinchan (lcp -> li_chan, ms, rlp,
  1547.                       chan2mta (lcp -> li_chan, rlp));
  1548.             return;
  1549.         }
  1550.         break;
  1551.         case ST_NORMAL:
  1552.         lcp = rlp -> cchan = rlp -> cchan -> li_next;
  1553.         if (lcp) {
  1554.             insertinchan (lcp -> li_chan, ms, rlp,
  1555.                       chan2mta (lcp -> li_chan, rlp));
  1556.             return;
  1557.         }
  1558.         sched_delete (rlp, ms, deleted & ZAP_MSG);
  1559.         break;
  1560.  
  1561.         case ST_DR:
  1562.         for (rl0 = ms -> recips; rl0; rl0 = rl0 -> rp_next)
  1563.             if (rl0 -> id == 0)
  1564.                 break;
  1565.         if ((lcp = rl0 -> cchan) == NULLIST_RCHAN) {
  1566.             PP_LOG (LLOG_EXCEPTIONS, ("no channels left for DR"));
  1567.             break;
  1568.         }
  1569.         if (lcp -> li_next == NULLIST_RCHAN) /* end of list */
  1570.             sched_delete (rlp, ms, deleted & ZAP_MSG);
  1571.         else  {
  1572.             lcp = rl0 -> cchan = lcp -> li_next;
  1573.             insertinchan (lcp -> li_chan, ms, rlp,
  1574.                       chan2mta (lcp -> li_chan, rl0));
  1575.         }
  1576.         break;
  1577.         case ST_DELETE:
  1578.         cb -> cb_ml = NULL;
  1579.         if (ms -> count == 0)
  1580.             zapmsg (ms);
  1581.         break;
  1582.     }
  1583. }
  1584.  
  1585. void delay_host (cb, rno, str, first)
  1586. Connblk *cb;
  1587. int    rno;
  1588. char    *str;
  1589. int first;
  1590. {
  1591.     struct Mtalist *mlp;
  1592.  
  1593.     PP_TRACE (("delay_host (%s, %d)", cb_print (cb), rno));
  1594.  
  1595.     if (cb -> cb_ml)
  1596.         msg_unlock (cb -> cb_ml -> ms);
  1597.  
  1598.     if ((mlp = cb -> cb_mlp) == NULLMTALIST) {
  1599.         PP_LOG (LLOG_EXCEPTIONS, ("Can't locate mta in queue"));
  1600.         return;
  1601.     }
  1602.     if (mlp -> info) {
  1603.         free (mlp -> info);
  1604.         mlp -> info = NULLCP;
  1605.     }
  1606.     if (str)
  1607.         mlp -> info = strdup (str);
  1608.     cache_inc (&mlp -> cache, cache_time*3/2);
  1609.     mlp -> nextevent = mlp -> cache.cachetime;
  1610.     if (cb -> cb_clp) {
  1611.         remque (mlp);
  1612.         insque (mlp, cb -> cb_clp -> mtas -> mta_back);
  1613.         if (first)
  1614.             cb -> cb_clp -> error_count ++;
  1615.     }
  1616. }
  1617.  
  1618. void delay_message (cb, str, first)
  1619. Connblk *cb;
  1620. char    *str;
  1621. int first;
  1622. {
  1623.     Mtalist    *mlp;
  1624.  
  1625.     PP_TRACE (("delay_message (%s)", cb_print (cb)));
  1626.  
  1627.     if (cb -> cb_ml == NULL)
  1628.         return;
  1629.     if ((mlp = cb -> cb_mlp) == NULLMTALIST) {
  1630.         PP_LOG (LLOG_EXCEPTIONS, ("Can't locate mta in queue"));
  1631.         return;
  1632.     }
  1633.     if (cb -> cb_ml -> info) {
  1634.         free (cb -> cb_ml -> info);
  1635.         cb -> cb_ml -> info = NULLCP;
  1636.     }
  1637.     if (str)
  1638.         cb -> cb_ml -> info = strdup (str);
  1639.     if (first)
  1640.         cb -> cb_ml -> ms -> nerrors ++;
  1641.     msg_unlock (cb -> cb_ml -> ms);
  1642.     msgcache_inc (cb -> cb_ml, cache_time*2);
  1643.     mlp -> mta_changed = 1;
  1644.     if (first)
  1645.         mlp -> error_count ++;
  1646.     cb -> cb_clp -> chan_update = 1;
  1647. }
  1648.  
  1649. static void bad_recip (cb, rno)
  1650. Connblk *cb;
  1651. int    rno;
  1652. {
  1653.     Reciplist *rlp;
  1654.     MsgStruct *ms;
  1655.     LIST_RCHAN *lcp;
  1656.     int    del;
  1657.  
  1658.     PP_TRACE (("bad_recip (%s, %d)", cb_print (cb), rno));
  1659.  
  1660.     if (rno == 0)        /* set real recip number */
  1661.         rno = cb -> cb_ml -> recips[0] -> id;
  1662.  
  1663.     ms = cb -> cb_ml -> ms;
  1664.     for (rlp = ms -> recips; rlp; rlp = rlp -> rp_next)
  1665.         if (rlp -> id == rno)
  1666.             break;
  1667.     if (rlp == NULLRL)
  1668.         return;
  1669.  
  1670.     if (rlp -> status == ST_DR) { /* DR being DR'd */
  1671.         Reciplist *rl0;
  1672.  
  1673.         for (rl0 = ms -> recips; rl0; rl0 = rl0 -> rp_next)
  1674.             if (rl0 -> id == 0)
  1675.                 break;
  1676.  
  1677.         if ((lcp = rl0 -> cchan) && lcp -> li_next == NULLIST_RCHAN) {
  1678.             PP_OPER (NULLCP,
  1679.                  ("A DR giving status DR on chan %s - Help!",
  1680.                   lcp -> li_chan ? lcp -> li_chan -> ch_name :
  1681.                   "<unknown>"));
  1682.             msgcache_inc (cb -> cb_ml, cache_time*2);
  1683.             cb -> cb_mlp -> mta_changed = 1;
  1684.             cb -> cb_mlp -> error_count ++;
  1685.             cb -> cb_clp -> chan_update = 1;
  1686.             return;
  1687.         }
  1688.  
  1689.         cb -> cb_clp -> lastsuccess = current_time;
  1690.         cb -> cb_clp -> error_count = 0;
  1691.         if (cb -> cb_mlp) {
  1692.             cb -> cb_mlp -> lastsuccess = current_time;
  1693.             cb -> cb_mlp -> error_count = 0;
  1694.         }
  1695.         del = delfromchan (cb -> cb_clp, cb -> cb_mlp, cb -> cb_ml, rno); 
  1696.         if (del & ZAP_MTA)
  1697.             cb -> cb_mlp = NULLMTALIST;
  1698.  
  1699.         PP_NOTICE (("Skipping over DR refromatting channels"));
  1700.         /* skip over all the bad reformatters */
  1701.         while (lcp -> li_next != NULLIST_RCHAN)
  1702.             lcp = lcp -> li_next;
  1703.         rl0 -> cchan = lcp;
  1704.         insertinchan (lcp -> li_chan, ms,
  1705.                   rlp, chan2mta (lcp -> li_chan, rl0));
  1706.     }
  1707.     else {
  1708.         cb -> cb_clp -> lastsuccess = current_time;
  1709.         cb -> cb_clp -> error_count = 0;
  1710.         if (cb -> cb_mlp) {
  1711.             cb -> cb_mlp -> lastsuccess = current_time;
  1712.             cb -> cb_mlp -> error_count = 0;
  1713.         }
  1714.         del = delfromchan (cb -> cb_clp, cb -> cb_mlp, cb -> cb_ml, rno); 
  1715.         if (del & ZAP_MTA)
  1716.             cb -> cb_mlp = NULLMTALIST;
  1717.         insertindrchan (cb -> cb_ml -> ms, rlp);
  1718.     }
  1719. }
  1720.  
  1721. static void sharedDR (cb, rno)
  1722. Connblk *cb;
  1723. int    rno;
  1724. {
  1725.     int del;
  1726.  
  1727.     PP_TRACE (("sharedDR (%s, %d)", cb_print (cb), rno));
  1728.  
  1729.     if (rno == 0)    /* set real recip number */
  1730.         rno = cb -> cb_ml -> recips[0] -> id;
  1731.  
  1732.     cb -> cb_clp -> lastsuccess = current_time;
  1733.     if (cb -> cb_mlp)
  1734.         cb -> cb_mlp -> lastsuccess = current_time;
  1735.     del = delfromchan (cb -> cb_clp, cb -> cb_mlp, cb -> cb_ml, rno);
  1736.     if (del & ZAP_MTA)
  1737.         cb -> cb_mlp = NULLMTALIST;
  1738. }
  1739.  
  1740. /* ARGSUSED */
  1741.  
  1742. static int do_processmessage (sd, ds, args, arg)
  1743. int    sd;
  1744. struct client_dispatch *ds;
  1745. char    **args;
  1746. struct type_Qmgr_ProcMsg **arg;
  1747. {
  1748.     Reciplist *rl0;
  1749.     Connblk *cb;
  1750.     Mlist *ml;
  1751.     register struct type_Qmgr_ProcMsg *pm;
  1752.     struct type_Qmgr_UserList *lusers ();
  1753.     char    *p;
  1754.  
  1755.     PP_TRACE (("do_processmessage (%d)", sd));
  1756.  
  1757.     if ((cb = findcblk (sd)) == NULLCB)
  1758.         return NOTOK;
  1759.     if (cb -> cb_state != cb_active)
  1760.         return NOTOK;
  1761.     ml = cb -> cb_ml;
  1762.     if (ml == NULL) {
  1763.         PP_LOG (LLOG_EXCEPTIONS, ("Missing ml structure"));
  1764.         return NOTOK;
  1765.     }
  1766.     *arg = pm = (struct type_Qmgr_ProcMsg *) malloc (sizeof (**arg));
  1767.     
  1768.     pm -> channel = str2qb (cb -> cb_clp -> channame,
  1769.                 strlen (cb -> cb_clp -> channame), 1);
  1770.  
  1771.     p = ml -> ms -> queid;
  1772.     pm -> qid = str2qb (p, strlen (p), 1);
  1773.     for (rl0 = ml -> ms -> recips; rl0; rl0 = rl0 -> rp_next)
  1774.         if (rl0 -> id == 0)
  1775.             break;
  1776.     if (ml -> recips[0] -> status == ST_DR &&
  1777.         rl0 && rl0 -> cchan && rl0 -> cchan -> li_next != NULL) {
  1778.         pm -> users = (struct type_Qmgr_UserList *)
  1779.             smalloc (sizeof *pm -> users);
  1780.         pm -> users -> next = NULL;
  1781.         pm -> users -> RecipientId = (struct type_Qmgr_RecipientId *)
  1782.             smalloc (sizeof *pm -> users -> RecipientId);
  1783.         pm -> users -> RecipientId -> parm = 0;
  1784.     }
  1785.     else
  1786.         pm -> users = lusers (ml, 0);
  1787.     if (pm -> users == NULL)
  1788.         PP_LOG (LLOG_EXCEPTIONS, ("Empty user list!"));
  1789.     cb -> cb_ttl = current_time + CHAN_TIMEOUT + (ml -> ms -> size /20);
  1790.     cb -> cb_state = cb_proc_sent;
  1791.     if (cb -> cb_mlp)
  1792.         cb -> cb_mlp -> lastattempt = current_time;
  1793.     return OK;
  1794. }
  1795.  
  1796. /* ARGSUSED */
  1797. static int do_readqueue (sd, ds, args, arg)
  1798. int    sd;
  1799. struct client_dispatch *ds;
  1800. char    **args;
  1801. struct type_Qmgr_Pseudo__readqueue **arg;
  1802. {
  1803.     Connblk *cb;
  1804.  
  1805.     PP_TRACE (("do_readqueue (%d)", sd));
  1806.     if ((cb = findcblk (sd)) == NULLCB)
  1807.         return NOTOK;
  1808.     cb -> cb_state = cb_proc_sent;
  1809.     return OK;
  1810. }
  1811.  
  1812.  
  1813.  
  1814. /* ARGSUSED */
  1815. static int do_initchannel (sd, ds, args, arg)
  1816. int    sd;
  1817. struct client_dispatch *ds;
  1818. char **args;
  1819. struct type_Qmgr_Channel **arg;
  1820. {
  1821.     Connblk *cb;
  1822.  
  1823.     if ((cb = findcblk (sd)) == NULLCB)
  1824.         return NOTOK;
  1825.  
  1826.     PP_TRACE (("do_initchannel (%s)", cb_print (cb) ));
  1827.  
  1828.     *arg = str2qb (*args, strlen (*args), 1);
  1829.     cb -> cb_state = cb_init_sent;
  1830.     if (cb -> cb_clp -> chan_special)
  1831.         cb -> cb_ttl = current_time + CHAN_TIMEOUT;
  1832.     else    cb -> cb_ttl = current_time + 60;
  1833.     return OK;
  1834. }
  1835.  
  1836. /* ARGSUSED */
  1837. static int processmessage_result (sd, id, dummy, result, roi)
  1838. int    sd,
  1839.     id,
  1840.     dummy;
  1841. register struct type_Qmgr_DeliveryStatus *result;
  1842. struct RoSAPindication *roi;
  1843. {
  1844.     Connblk *cb;
  1845.     struct type_Qmgr_DeliveryStatus *ds;
  1846.     int    rno;
  1847.     char    buf[LINESIZE];
  1848.     char    *info;
  1849.     int first_mta = 1, first_msg = 1;
  1850.  
  1851.     PP_TRACE (("processmessage_result (%d)", sd));
  1852.  
  1853.     if ((cb = findcblk (sd)) == NULLCB) {
  1854.         PP_LOG (LLOG_EXCEPTIONS, ("No connection block for %d",
  1855.                       sd));
  1856.         return NOTOK;
  1857.     }
  1858.     PP_TRACE (("processmessage_result (%s)", cb_print (cb)));
  1859.     for (ds = result; ds; ds = ds -> next) {
  1860.         rno = ds -> IndividualDeliveryStatus -> recipient -> parm;
  1861.         (void) sprintf (buf, "Recipient ID %d %s:", rno,
  1862.                 cb_print (cb));
  1863.         if (ds -> IndividualDeliveryStatus -> info)
  1864.             info = qb2str(ds -> IndividualDeliveryStatus -> info);
  1865.         else    info = NULLCP;
  1866.         switch (ds -> IndividualDeliveryStatus -> status) {
  1867.             case int_Qmgr_status_success:
  1868.             PP_NOTICE (("%s success", buf));
  1869.             inc_channel (cb, rno);
  1870.             break;
  1871.  
  1872.             case int_Qmgr_status_negativeDR:
  1873.             case int_Qmgr_status_positiveDR:
  1874.             PP_NOTICE (("%s %s", buf,
  1875.                     ds -> IndividualDeliveryStatus -> status
  1876.                     == int_Qmgr_status_positiveDR ?
  1877.                     "positiveDR" : "negativeDR"));
  1878.             bad_recip (cb, rno);
  1879.             break;
  1880.  
  1881.             case int_Qmgr_status_successSharedDR:
  1882.             case int_Qmgr_status_failureSharedDR:
  1883.             PP_NOTICE (("%s %s", buf,
  1884.                     ds -> IndividualDeliveryStatus -> status
  1885.                     == int_Qmgr_status_failureSharedDR ?
  1886.                     "failureSharedDR" : "sucessSharedDR"));
  1887.             sharedDR(cb, rno);
  1888.             break;
  1889.  
  1890.             case int_Qmgr_status_messageFailure:
  1891.             PP_NOTICE (("%s messageFailure (%s)", buf,
  1892.                     info ? info : ""));
  1893.             delay_message (cb, info, first_msg);
  1894.             first_msg = 0;
  1895.             break;
  1896.  
  1897.             case int_Qmgr_status_mtaFailure:
  1898.             PP_NOTICE (("%s mtaFailure (%s)", buf,
  1899.                     info ? info : ""));
  1900.             delay_host (cb, rno, info, first_mta);
  1901.             first_mta = 9;
  1902.             break;
  1903.  
  1904.             case int_Qmgr_status_mtaAndMessageFailure:
  1905.             PP_NOTICE (("%s mtaAndMessageFailure (%s)", buf,
  1906.                     info ? info : ""));
  1907.             delay_message (cb, info, first_msg);
  1908.             delay_host (cb, rno, info, first_mta);
  1909.             first_mta = first_msg = 0;
  1910.             break;
  1911.  
  1912.             default:
  1913.             PP_NOTICE (("%s Unknown response", buf));
  1914.             break;
  1915.         }
  1916.         if (info)
  1917.             free (info);
  1918.     }
  1919.     cb -> cb_state = cb_active;
  1920.     /* quick attempt to fire something off now - if its easy
  1921.      * otherwise - do it in the main loop and do more clever scheduling.
  1922.      */
  1923.     if (cb -> cb_clp -> chan_enabled == 0 || opmode ||
  1924.         (cb -> cb_ml = nextmsg (cb -> cb_clp, &cb -> cb_mlp, 1, 1))== NULL )
  1925.             cb -> cb_ml = NULL;
  1926.     else
  1927.         (void) chan_invoke (cb, DIS_PROC, (caddr_t *)0);
  1928.         
  1929.     return OK;
  1930. }
  1931.  
  1932. /*   RPC procedures... */
  1933.  
  1934. /* ARGSUSED */
  1935. static int readqueue_result (sd, id, dummy, result, roi)
  1936. int    sd,
  1937.     id,
  1938.     dummy;
  1939. register struct type_Qmgr_MsgList *result;
  1940. struct RoSAPindication *roi;
  1941. {
  1942.     struct type_Qmgr_MsgStructList *ml;
  1943.     MsgStruct *ms, *oldms;
  1944.     char    *p;
  1945.     int    count;
  1946.     Connblk *cb;
  1947.     struct type_Qmgr_QidList *ql;
  1948.  
  1949.     PP_TRACE (("readqueue_result (%d, %d)", sd, id));
  1950.  
  1951.     if ((cb = findcblk (sd)) == NULLCB)
  1952.         return NOTOK;
  1953.     cb -> cb_state = cb_active;
  1954.     for (count = 0,ml = result -> msgs; ml; ml = ml -> next) {
  1955.         ms = newmsgstruct (ml -> MsgStruct);
  1956.         if ((oldms = find_msg (ms -> queid)) != NULLMS) {
  1957.             (void) updatemessage (oldms, ms);
  1958.             freems (ms);
  1959.         }
  1960.         else
  1961.             (void) insertmessage (ms);
  1962.         count ++;
  1963.     }
  1964.     for (ql = result -> deleted; ql; ql = ql -> next) {
  1965.         p = qb2str (ql -> QID);
  1966.         if ((ms = find_msg (p)) != NULLMS)
  1967.             kill_msg (ms);
  1968.         free (p);
  1969.     }
  1970.     if (count == 0)
  1971.         cb -> cb_state = cb_finished;
  1972.     cb -> cb_ttl = current_time + CHAN_TIMEOUT;
  1973.     cb -> cb_clp -> lastsuccess = current_time;
  1974.     return OK;
  1975. }
  1976.  
  1977. /* ARGSUSED */
  1978. static int channelinit_result (sd, id, dummy, result, roi)
  1979. int    sd,
  1980.     id,
  1981.     dummy;
  1982. struct type_Qmgr_Pseudo__channelInitialise *result;
  1983. struct RoSAPindication *roi;
  1984. {
  1985.     PP_TRACE (("channelinit_result (%d, %d)", sd, id));
  1986.  
  1987.     return OK;
  1988. }
  1989.     
  1990. /* ARGSUSED */
  1991. static int general_error (sd, id, error, parameter, roi)
  1992. int    sd,
  1993.     id,
  1994.     error;
  1995. caddr_t    parameter;
  1996. struct RoSAPindication *roi;
  1997. {
  1998.     register struct RyError *rye;
  1999.     Connblk *cb;
  2000.  
  2001.     if ((cb = findcblk (sd)) == NULLCB)
  2002.         return NOTOK;
  2003.     PP_TRACE (("general_error (%s)", cb_print(cb)));
  2004.     if (cb -> cb_type != cb_channel) {
  2005.         PP_LOG (LLOG_EXCEPTIONS, ("Not a channel!!!"));
  2006.         return NOTOK;
  2007.     }
  2008.  
  2009.     cb -> cb_state = cb_error;
  2010.     if (error == RY_REJECT) {
  2011.         PP_LOG (LLOG_EXCEPTIONS,
  2012.             ("%s", RoErrString ((int) parameter)));
  2013.         delay_channel (cb);
  2014.         return NOTOK;
  2015.     }
  2016.  
  2017.     if ((rye = finderrbyerr (table_Qmgr_Errors, error)) != NULL)
  2018.         PP_LOG (LLOG_EXCEPTIONS, ("%s", rye -> rye_name));
  2019.     else
  2020.         PP_LOG (LLOG_EXCEPTIONS, ("Error %d", error));
  2021.  
  2022.     delay_channel (cb);
  2023.     return NOTOK;
  2024. }
  2025.  
  2026. /* ARGSUSED */
  2027. static int processmessage_error (sd, id, error, parameter, roi)
  2028. int    sd,
  2029.     id,
  2030.     error;
  2031. caddr_t    parameter;
  2032. struct RoSAPindication *roi;
  2033. {
  2034.     register struct RyError *rye;
  2035.     Connblk *cb;
  2036.     char    tbuf[128];
  2037.  
  2038.     if ((cb = findcblk (sd)) == NULLCB)
  2039.         return NOTOK;
  2040.     PP_TRACE (("processmessage_error (%s)", cb_print(cb)));
  2041.  
  2042.     if (cb -> cb_type != cb_channel) {
  2043.         PP_LOG (LLOG_EXCEPTIONS, ("Not a channel!!!"));
  2044.         return NOTOK;
  2045.     }
  2046.  
  2047.     if (error == RY_REJECT) {
  2048.         PP_LOG (LLOG_EXCEPTIONS,
  2049.             ("%s", RoErrString ((int) parameter)));
  2050.         delay_message (cb, RoErrString ((int) parameter), 1);
  2051.         return OK;
  2052.     }
  2053.  
  2054.     if ((rye = finderrbyerr (table_Qmgr_Errors, error)) != NULL)
  2055.         (void) sprintf (tbuf, "process message error %s",
  2056.                 rye -> rye_name);
  2057.     else
  2058.         (void) sprintf (tbuf, "process message error %d", error);
  2059.     PP_LOG (LLOG_EXCEPTIONS, ("%s", tbuf));
  2060.  
  2061.     delay_message (cb, tbuf, 1);
  2062.     return OK;
  2063. }
  2064.  
  2065. void delay_channel (cb)
  2066. Connblk *cb;
  2067. {
  2068.     PP_TRACE (("delay_channel (%s)", cb_print (cb) ));
  2069.  
  2070.     if (cb -> cb_type == cb_responder || cb -> cb_type == cb_timer)
  2071.         return;
  2072.     if (cb -> cb_clp == NULL)
  2073.         return;
  2074.     cache_inc (&(cb -> cb_clp -> cache), cache_time);
  2075.     cb -> cb_clp -> nextevent = cb -> cb_clp -> cache.cachetime;
  2076. }
  2077.  
  2078. void investigate_chan (clp, now)
  2079. Chanlist *clp;
  2080. time_t    now;
  2081. {
  2082.     Mtalist *mlp;
  2083.     int nmtas = clp -> nmtas;
  2084.  
  2085.     PP_TRACE (("investigate_chan (%s)", clp -> channame));
  2086.  
  2087.     clp -> nextevent = now + MAX_SLEEP;
  2088.     clp -> oldest = now;
  2089.     clp -> nmtas = 0;
  2090.  
  2091.     for (mlp = clp -> mtas -> mta_forw; mlp != clp -> mtas;
  2092.          mlp = mlp -> mta_forw) {
  2093.         if (mlp -> mta_changed || mlp -> nextevent < current_time) {
  2094.             investigate_mta (mlp, now);
  2095.             mlp -> mta_changed = 0;
  2096.         }
  2097.         if (mlp -> oldest < clp -> oldest)
  2098.             clp -> oldest = mlp -> oldest;
  2099.         clp -> nmtas ++;
  2100.         if (!mtaready (mlp))
  2101.             continue;
  2102.         if (mlp -> nextevent < clp -> nextevent)
  2103.             clp -> nextevent = mlp -> nextevent;
  2104.     }
  2105.     if (clp -> chan_special)
  2106.         clp -> nextevent = now;
  2107.     if (clp -> cache.cachetime > now)
  2108.         clp -> nextevent = clp -> cache.cachetime;
  2109.     if (clp -> chan_enabled == 0)
  2110.         clp -> nextevent = now + MAX_SLEEP;
  2111.     if (nmtas != clp -> nmtas)
  2112.         PP_LOG(LLOG_EXCEPTIONS, ("mta count mismatch on %s (%d != %d)",
  2113.                      clp -> channame,
  2114.                      clp -> nmtas, nmtas));
  2115. }
  2116.  
  2117. void investigate_mta (mlp, now)
  2118. Mtalist *mlp;
  2119. time_t    now;
  2120. {
  2121.     Mlist    *ml;
  2122.     time_t cachet;
  2123.  
  2124.     PP_TRACE (("investigate_mta (%s)", mlp -> mtaname));
  2125.  
  2126.     mlp -> nextevent = now + MAX_SLEEP + 1;
  2127.     mlp -> oldest = now;
  2128.  
  2129.     for (ml = mlp -> msgs -> ml_forw; ml != mlp -> msgs;
  2130.          ml = ml -> ml_forw) {
  2131.         /* this calc has to be done  - or we could optimise */
  2132.         if (ml -> ms -> age < mlp -> oldest)
  2133.             mlp -> oldest = ml -> ms -> age;
  2134.  
  2135.         if (!msgready (ml))
  2136.             continue;
  2137.  
  2138.         if ((cachet = msgmincache (ml)) > now) {
  2139.             if (cachet < mlp -> nextevent)
  2140.                 mlp -> nextevent = cachet;
  2141.         }
  2142.         else if (ml -> ms -> defferedtime &&
  2143.              ml -> ms -> defferedtime < mlp -> nextevent)
  2144.             mlp -> nextevent = ml -> ms -> defferedtime;
  2145.         else mlp -> nextevent = now;
  2146.     }
  2147.     if (mlp -> cache.cachetime > now)     /* oops - cached */
  2148.         mlp -> nextevent = mlp -> cache.cachetime;
  2149.     if (mlp -> mta_enabled == 0)
  2150.         mlp -> nextevent = now + MAX_SLEEP + 1;
  2151. }
  2152.  
  2153. void cleanup_conn (cb)
  2154. Connblk *cb;
  2155. {
  2156.     PP_TRACE (("cleanup_conn(%s)", cb_print (cb)));
  2157.  
  2158.     switch (cb -> cb_type) {
  2159.         case cb_responder:
  2160.         case cb_timer:
  2161.         break;
  2162.         default:
  2163.         if (cb -> cb_clp) {
  2164.             if (cb -> cb_mlp) {
  2165.                 cb -> cb_mlp -> nactive --;
  2166.                 if (cb -> cb_state == cb_idle ||
  2167.                     cb -> cb_state == cb_proc_sent)
  2168.                     delay_message (cb, "Connection broke", 1);
  2169.             }
  2170.             if (cb -> cb_state != cb_idle) {
  2171.                 cb -> cb_clp -> nactive --;
  2172.                 nchansrunning --;
  2173.             }
  2174.             if (cb -> cb_clp -> chan_special) {
  2175.                 nspecials --;
  2176.                 if (cb -> cb_clp -> chan -> ch_chan_type ==
  2177.                     CH_QMGR_LOAD)
  2178.                     if (delete_chan -> chan_enabled == 0 &&
  2179.                         delete_chan -> chan_syssusp == 1) {
  2180.                         delete_chan -> chan_enabled = 1;
  2181.                         delete_chan -> chan_syssusp = 0;
  2182.                         delete_chan -> chan_update = 1;
  2183.                     }
  2184.  
  2185.             }
  2186.         }
  2187.         if (cb -> cb_ml)
  2188.             msg_unlock (cb -> cb_ml -> ms);
  2189.             
  2190.         break;
  2191.     }
  2192.     freecblk (cb);
  2193. }
  2194.  
  2195. static int timeout_proc ()
  2196. {
  2197.     MsgStruct **msp, *ms;
  2198.     time_t warn_t;
  2199.  
  2200.     PP_TRACE (("timeout_proc"));
  2201.     for (msp = msg_hash; msp < &msg_hash[QUE_HASHSIZE]; ) {
  2202.         for (ms = *msp; ms; ms = ms -> ms_forw) {
  2203.             if (ms -> m_locked)
  2204.                 continue;
  2205.             if (ms -> expirytime < current_time &&
  2206.                 expiremsg (ms) != NOTOK)
  2207.                 continue;
  2208.  
  2209.             if (ms -> numberwarns >= warn_number)
  2210.                 continue;
  2211.  
  2212.             warn_t = ms -> age + (ms -> warninterval *
  2213.                           (ms -> numberwarns+1));
  2214.             if (warn_t < current_time)
  2215.                 (void) warnmsg (ms);
  2216.         }
  2217.         if (!ms)
  2218.             msp ++;
  2219.     }
  2220. }
  2221.  
  2222. static int expiremsg (ms)
  2223. MsgStruct *ms;
  2224. {
  2225.     int first = 1;
  2226.     Reciplist *rlp;
  2227.     Chanlist *clp;
  2228.     Mtalist *mlp;
  2229.     Mlist *ml;
  2230.  
  2231.     PP_TRACE (("expiremsg (%s)", ms -> queid));
  2232.     if (timeout_chan == NULLCHANLIST  || timeout_chan -> chan_enabled == 0)
  2233.         return NOTOK;
  2234.  
  2235.     for (rlp = ms -> recips; rlp; rlp = rlp -> rp_next) {
  2236.         if (rlp -> id == 0)
  2237.             continue;
  2238.         switch (rlp -> status) {
  2239.             case ST_DR:
  2240.             case ST_DELETE:
  2241.             case ST_TIMEOUT:
  2242.             case ST_WARNING:
  2243.             continue; /* leave these alone! */
  2244.  
  2245.             case ST_NORMAL:
  2246.             if ((ml = rlp -> ml) == NULLMLIST) {
  2247.                 PP_LOG (LLOG_EXCEPTIONS,
  2248.                     ("Recipient not on msg q"));
  2249.                 continue;
  2250.             }
  2251.             if ((mlp = ml -> mlp) == NULLMTALIST) {
  2252.                 PP_LOG (LLOG_EXCEPTIONS,
  2253.                     ("msg q not on mta"));
  2254.                 continue;
  2255.             }
  2256.             if ((clp = mlp -> clp) == NULLCHANLIST) {
  2257.                 PP_LOG (LLOG_EXCEPTIONS,
  2258.                     ("mta not on chan"));
  2259.                 continue;
  2260.             }
  2261.             (void) delfromchan (clp, mlp, ml, rlp -> id);
  2262.             rlp -> status = ST_TIMEOUT;
  2263.             if (first) {
  2264.                 insertinchan (timeout_chan -> chan, ms, rlp,
  2265.                           chan2mta(timeout_chan -> chan,
  2266.                                rlp));
  2267.                 first = 0;
  2268.             }
  2269.         }
  2270.     }
  2271.     return first == 1 ? OK : DONE;
  2272. }
  2273.  
  2274. static int warnmsg (ms)
  2275. MsgStruct *ms;
  2276. {
  2277.     int first = 1;
  2278.     Reciplist *rlp;
  2279.     Chanlist *clp;
  2280.     Mtalist *mlp;
  2281.     Mlist *ml;
  2282.  
  2283.     PP_TRACE (("warnmsg (%s)", ms -> queid));
  2284.  
  2285.     if (warn_chan == NULLCHANLIST || warn_chan -> chan_enabled == 0)
  2286.         return NOTOK;
  2287.     for (rlp = ms -> recips; rlp; rlp = rlp -> rp_next) {
  2288.         if (rlp -> id == 0)
  2289.             continue;
  2290.         switch (rlp -> status) {
  2291.             case ST_DR:
  2292.             case ST_DELETE:
  2293.             case ST_TIMEOUT:
  2294.             case ST_WARNING:
  2295.             continue; /* leave these alone! */
  2296.  
  2297.             case ST_NORMAL:
  2298.             if ((ml = rlp -> ml) == NULLMLIST) {
  2299.                 PP_LOG (LLOG_EXCEPTIONS,
  2300.                     ("Recipient not on msg q"));
  2301.                 continue;
  2302.             }
  2303.             if ((mlp = ml -> mlp) == NULLMTALIST) {
  2304.                 PP_LOG (LLOG_EXCEPTIONS,
  2305.                     ("msg q not on mta"));
  2306.                 continue;
  2307.             }
  2308.             if ((clp = mlp -> clp) == NULLCHANLIST) {
  2309.                 PP_LOG (LLOG_EXCEPTIONS,
  2310.                     ("mta not on chan"));
  2311.                 continue;
  2312.             }
  2313.             (void) delfromchan (clp, mlp, ml, rlp -> id);
  2314.             rlp -> status = ST_WARNING;
  2315.             insertinchan (warn_chan -> chan, ms, rlp,
  2316.                       chan2mta(warn_chan -> chan,
  2317.                            rlp));
  2318.             first = 1;
  2319.         }
  2320.     }
  2321.     return first == 1 ? OK : DONE;
  2322. }
  2323.  
  2324. static int channel_ttl ()
  2325. {
  2326.     struct AcSAPindication    acis;
  2327.     struct RoSAPindication    rois;
  2328.     register struct RoSAPindication *roi = &rois;
  2329.     Connblk *cb;
  2330.  
  2331.     PP_TRACE (("channel_ttl ()"));
  2332.  
  2333.     if (cb_once_only == 0)
  2334.         return OK;
  2335.  
  2336.     for (cb = CHead -> cb_forw; cb != CHead; cb = cb -> cb_forw) {
  2337.         if (cb -> cb_type == cb_channel && cb -> cb_ttl &&
  2338.             cb -> cb_ttl < current_time) {
  2339.             pcblock_error (cb);
  2340.             (void) AcUAbortRequest (cb -> cb_fd, NULLPEP,
  2341.                         0, &acis);
  2342.             (void) RyLose (cb -> cb_fd, roi);
  2343.             (void) chan_lose (cb -> cb_fd,
  2344.                       (struct AcSAPfinish *)0);
  2345.         }
  2346.     }
  2347.     timer_running = 0;
  2348.     if (nchansrunning > 0)
  2349.         start_timer ();
  2350.     return OK;            
  2351. }
  2352.  
  2353. static void start_timer ()
  2354. {
  2355.     Connblk *cb;
  2356.  
  2357.     if (timer_running)
  2358.         return;
  2359.  
  2360.     cb = newcblk (cb_timer);
  2361.     cb -> cb_proc = channel_ttl;
  2362.     cb -> cb_reload = 0;
  2363.     (void) newevblk (NULLCHANLIST, cb, cb_timer, CHAN_TIMEOUT);
  2364.     timer_running = 1;
  2365. }
  2366.