On 10.6.2013 19:28, Patrik Rak wrote:

attached is my first take on the problem we have discussed few weeks ago -
limiting amount of deferred messages in the active queue and limiting
amount of delivery agents used by presumably "slow" deferred messages.

Here is an updated version. I have added the possibility to mark too large messages as possibly slow, and made both the size and deferred time limits configurable per transport at the same time...

With this in place, qmgr shall be much more resistant when it comes to congestion.

Patrik

diff --git a/src/global/mail_params.h b/src/global/mail_params.h
index 1e368f8..e6d3d71 100644
--- a/src/global/mail_params.h
+++ b/src/global/mail_params.h
@@ -721,6 +721,10 @@ extern int var_delay_warn_time;
 #define DEF_QMGR_ACT_LIMIT     20000
 extern int var_qmgr_active_limit;
 
+#define VAR_QMGR_DFR_LIMIT     "qmgr_message_deferred_limit"
+#define DEF_QMGR_DFR_LIMIT     1000
+extern int var_qmgr_deferred_limit;
+
 #define VAR_QMGR_RCPT_LIMIT    "qmgr_message_recipient_limit"
 #define DEF_QMGR_RCPT_LIMIT    20000
 extern int var_qmgr_rcpt_limit;
@@ -806,6 +810,24 @@ extern int var_dest_rcpt_limit;
 extern int var_local_rcpt_lim;
 
  /*
+  * Queue manager: default limits for "slow" messages.
+  */
+#define VAR_SLOW_XPORT_LIMIT   "default_slow_transport_limit"
+#define _SLOW_XPORT_LIMIT      "_slow_transport_limit"
+#define DEF_SLOW_XPORT_LIMIT   (DEF_PROC_LIMIT - DEF_PROC_LIMIT / 10)
+extern int var_slow_xport_limit;
+
+#define VAR_SLOW_MSG_TIME      "default_slow_message_time_limit"
+#define _SLOW_MSG_TIME         "_slow_message_time_limit"
+#define DEF_SLOW_MSG_TIME      "600s"
+extern int var_slow_msg_time;
+
+#define VAR_SLOW_MSG_SIZE      "default_slow_message_size_limit"
+#define _SLOW_MSG_SIZE         "_slow_message_size_limit"
+#define DEF_SLOW_MSG_SIZE      1024000
+extern int var_slow_msg_size;
+
+ /*
   * Queue manager: default delay before retrying a dead transport.
   */
 #define VAR_XPORT_RETRY_TIME   "transport_retry_time"
diff --git a/src/qmgr/qmgr.c b/src/qmgr/qmgr.c
index fbf7dab..3a9e1e2 100644
--- a/src/qmgr/qmgr.c
+++ b/src/qmgr/qmgr.c
@@ -202,6 +202,10 @@
 /*     The default per-transport maximum delay between recipients refills.
 /* .IP "\fItransport\fB_recipient_refill_delay 
($default_recipient_refill_delay)\fR"
 /*     Idem, for delivery via the named message \fItransport\fR.
+/* .PP
+/*     Available in Postfix version 2.11 and later:
+/* .IP "\fBqmgr_message_deferred_limit (1000)\fR"
+/*     How much of the active queue must be left available for messages from 
the incoming queue.
 /* DELIVERY CONCURRENCY CONTROLS
 /* .ad
 /* .fi
@@ -239,6 +243,24 @@
 /* .IP "\fBdestination_concurrency_feedback_debug (no)\fR"
 /*     Make the queue manager's feedback algorithm verbose for performance
 /*     analysis purposes.
+/* .PP
+/*     Available in Postfix version 2.11 and later:
+/* .IP "\fBdefault_slow_transport_limit (90)\fR"
+/*     Deferred messages older than \fItransport\fB_slow_message_time_limit\fR
+/*     and messages larger than \fItransport\fB_slow_message_size_limit\fR
+/*     are prevented from using more than this many delivery agents. 0 means 
no limit.
+/* .IP "\fItransport\fB_slow_transport_limit 
($default_slow_transport_limit)\fR"
+/*     Idem, for delivery via the named message \fItransport\fR.
+/* .IP "\fBdefault_slow_message_time_limit (600s)\fR"
+/*     Deferred messages older than this are considered "slow"
+/*     and are subject to \fItransport\fB_slow_transport_limit\fR limiting.
+/* .IP "\fItransport\fB_slow_message_time_limit 
($default_slow_message_time_limit)\fR"
+/*     Idem, for delivery via the named message \fItransport\fR.
+/* .IP "\fBdefault_slow_message_size_limit (1024000)\fR"
+/*     Messages larger than this are considered "slow"
+/*     and are subject to \fItransport\fB_slow_transport_limit\fR limiting. 0 
means no limit.
+/* .IP "\fItransport\fB_slow_message_size_limit 
($default_slow_message_size_limit)\fR"
+/*     Idem, for delivery via the named message \fItransport\fR.
 /* RECIPIENT SCHEDULING CONTROLS
 /* .ad
 /* .fi
@@ -416,6 +438,7 @@ int     var_max_backoff_time;
 int     var_max_queue_time;
 int     var_dsn_queue_time;
 int     var_qmgr_active_limit;
+int     var_qmgr_deferred_limit;
 int     var_qmgr_rcpt_limit;
 int     var_qmgr_msg_rcpt_limit;
 int     var_xport_rcpt_limit;
@@ -426,6 +449,9 @@ int     var_delivery_slot_cost;
 int     var_delivery_slot_loan;
 int     var_delivery_slot_discount;
 int     var_min_delivery_slots;
+int     var_slow_xport_limit;
+int     var_slow_msg_time;
+int     var_slow_msg_size;
 int     var_init_dest_concurrency;
 int     var_transport_retry_time;
 int     var_dest_con_limit;
@@ -533,8 +559,7 @@ static int qmgr_loop(char *unused_name, char **unused_argv)
     int     token_count;
     int     feed = 0;
     int     scan_idx;                  /* Priority order scan index */
-    static int first_scan_idx = QMGR_SCAN_IDX_INCOMING;
-    int     last_scan_idx = QMGR_SCAN_IDX_COUNT - 1;
+    static int last_scan_idx = QMGR_SCAN_IDX_COUNT - 1;
     int     delay;
 
     /*
@@ -556,7 +581,8 @@ static int qmgr_loop(char *unused_name, char **unused_argv)
 
     /*
      * Let some new blood into the active queue when the queue size is
-     * smaller than some configurable limit.
+     * smaller than some configurable limit. We round-robin the queue scans,
+     * but eventually prevent the deferred queue from using all resources.
      * 
      * We import one message per interrupt, to optimally tune the input count
      * for the number of delivery agent protocol wait states, as explained in
@@ -565,7 +591,10 @@ static int qmgr_loop(char *unused_name, char **unused_argv)
     delay = WAIT_FOR_EVENT;
     for (scan_idx = 0; qmgr_message_count < var_qmgr_active_limit
         && scan_idx < QMGR_SCAN_IDX_COUNT; ++scan_idx) {
-       last_scan_idx = (scan_idx + first_scan_idx) % QMGR_SCAN_IDX_COUNT;
+       last_scan_idx = (last_scan_idx + 1) % QMGR_SCAN_IDX_COUNT;
+       if (last_scan_idx == QMGR_SCAN_IDX_DEFERRED &&
+           qmgr_deferred_message_count >= var_qmgr_deferred_limit)
+           continue;
        if ((path = qmgr_scan_next(qmgr_scans[last_scan_idx])) != 0) {
            delay = DONT_WAIT;
            if ((feed = qmgr_active_feed(qmgr_scans[last_scan_idx], path)) != 0)
@@ -574,16 +603,6 @@ static int qmgr_loop(char *unused_name, char **unused_argv)
     }
 
     /*
-     * Round-robin the queue scans. When the active queue becomes full,
-     * prefer new mail over deferred mail.
-     */
-    if (qmgr_message_count < var_qmgr_active_limit) {
-       first_scan_idx = (last_scan_idx + 1) % QMGR_SCAN_IDX_COUNT;
-    } else if (first_scan_idx != QMGR_SCAN_IDX_INCOMING) {
-       first_scan_idx = QMGR_SCAN_IDX_INCOMING;
-    }
-
-    /*
      * Global flow control. If enabled, slow down receiving processes that
      * get ahead of the queue manager, but don't block them completely.
      */
@@ -638,6 +657,12 @@ static void qmgr_post_init(char *name, char **unused_argv)
     /*
      * Sanity check.
      */
+    var_qmgr_deferred_limit = var_qmgr_active_limit - var_qmgr_deferred_limit;
+    if (var_qmgr_deferred_limit <= var_qmgr_active_limit / 4) {
+       msg_warn("%s is too large for %s - adjusting %s",
+             VAR_QMGR_DFR_LIMIT, VAR_QMGR_ACT_LIMIT, VAR_QMGR_DFR_LIMIT);
+       var_qmgr_deferred_limit = var_qmgr_active_limit - var_qmgr_active_limit 
/ 10;
+    }
     if (var_qmgr_rcpt_limit < var_qmgr_active_limit) {
        msg_warn("%s is smaller than %s - adjusting %s",
              VAR_QMGR_RCPT_LIMIT, VAR_QMGR_ACT_LIMIT, VAR_QMGR_RCPT_LIMIT);
@@ -692,6 +717,7 @@ int     main(int argc, char **argv)
        VAR_MAX_BACKOFF_TIME, DEF_MAX_BACKOFF_TIME, &var_max_backoff_time, 1, 0,
        VAR_MAX_QUEUE_TIME, DEF_MAX_QUEUE_TIME, &var_max_queue_time, 0, 8640000,
        VAR_DSN_QUEUE_TIME, DEF_DSN_QUEUE_TIME, &var_dsn_queue_time, 0, 8640000,
+       VAR_SLOW_MSG_TIME, DEF_SLOW_MSG_TIME, &var_slow_msg_time, 0, 0,
        VAR_XPORT_RETRY_TIME, DEF_XPORT_RETRY_TIME, &var_transport_retry_time, 
1, 0,
        VAR_QMGR_CLOG_WARN_TIME, DEF_QMGR_CLOG_WARN_TIME, 
&var_qmgr_clog_warn_time, 0, 0,
        VAR_XPORT_REFILL_DELAY, DEF_XPORT_REFILL_DELAY, 
&var_xport_refill_delay, 1, 0,
@@ -702,6 +728,7 @@ int     main(int argc, char **argv)
     };
     static const CONFIG_INT_TABLE int_table[] = {
        VAR_QMGR_ACT_LIMIT, DEF_QMGR_ACT_LIMIT, &var_qmgr_active_limit, 1, 0,
+       VAR_QMGR_DFR_LIMIT, DEF_QMGR_DFR_LIMIT, &var_qmgr_deferred_limit, 0, 0,
        VAR_QMGR_RCPT_LIMIT, DEF_QMGR_RCPT_LIMIT, &var_qmgr_rcpt_limit, 1, 0,
        VAR_QMGR_MSG_RCPT_LIMIT, DEF_QMGR_MSG_RCPT_LIMIT, 
&var_qmgr_msg_rcpt_limit, 1, 0,
        VAR_XPORT_RCPT_LIMIT, DEF_XPORT_RCPT_LIMIT, &var_xport_rcpt_limit, 0, 0,
@@ -711,6 +738,8 @@ int     main(int argc, char **argv)
        VAR_DELIVERY_SLOT_LOAN, DEF_DELIVERY_SLOT_LOAN, 
&var_delivery_slot_loan, 0, 0,
        VAR_DELIVERY_SLOT_DISCOUNT, DEF_DELIVERY_SLOT_DISCOUNT, 
&var_delivery_slot_discount, 0, 100,
        VAR_MIN_DELIVERY_SLOTS, DEF_MIN_DELIVERY_SLOTS, 
&var_min_delivery_slots, 0, 0,
+       VAR_SLOW_XPORT_LIMIT, DEF_SLOW_XPORT_LIMIT, &var_slow_xport_limit, 0, 0,
+       VAR_SLOW_MSG_SIZE, DEF_SLOW_MSG_SIZE, &var_slow_msg_size, 0, 0,
        VAR_INIT_DEST_CON, DEF_INIT_DEST_CON, &var_init_dest_concurrency, 1, 0,
        VAR_DEST_CON_LIMIT, DEF_DEST_CON_LIMIT, &var_dest_con_limit, 0, 0,
        VAR_DEST_RCPT_LIMIT, DEF_DEST_RCPT_LIMIT, &var_dest_rcpt_limit, 0, 0,
diff --git a/src/qmgr/qmgr.h b/src/qmgr/qmgr.h
index 6737e42..861c738 100644
--- a/src/qmgr/qmgr.h
+++ b/src/qmgr/qmgr.h
@@ -169,6 +169,11 @@ struct QMGR_JOB_LIST {
 struct QMGR_TRANSPORT {
     int     flags;                     /* blocked, etc. */
     int     pending;                   /* incomplete DA connections */
+    int     active;                    /* active DA connections */
+    int     slow_active;               /* active slow DA connections */
+    int     slow_limit;                        /* limit for slow DA 
connections */
+    int     slow_msg_time;             /* how old deferred messages are "slow" 
*/
+    int     slow_msg_size;             /* how large messages are "slow" */
     char   *name;                      /* transport name */
     int     dest_concurrency_limit;    /* concurrency per domain */
     int     init_dest_concurrency;     /* init. per-domain concurrency */
@@ -338,6 +343,7 @@ struct QMGR_MESSAGE {
     time_t  warn_time;                 /* time next warning to be sent */
     long    data_offset;               /* data seek offset */
     char   *queue_name;                        /* queue name */
+    char   *orig_queue_name;           /* from which queue this came from */
     char   *queue_id;                  /* queue file */
     char   *encoding;                  /* content encoding */
     char   *sender;                    /* complete address */
@@ -376,12 +382,14 @@ struct QMGR_MESSAGE {
 #define QMGR_MESSAGE_LOCKED    ((QMGR_MESSAGE *) 1)
 
 extern int qmgr_message_count;
+extern int qmgr_deferred_message_count;
 extern int qmgr_recipient_count;
 
 extern void qmgr_message_free(QMGR_MESSAGE *);
 extern void qmgr_message_update_warn(QMGR_MESSAGE *);
 extern void qmgr_message_kill_record(QMGR_MESSAGE *, long);
-extern QMGR_MESSAGE *qmgr_message_alloc(const char *, const char *, int, 
mode_t);
+extern int qmgr_message_is_slow(QMGR_MESSAGE *, QMGR_TRANSPORT *);
+extern QMGR_MESSAGE *qmgr_message_alloc(const char *, const char *, const char 
*, int, mode_t);
 extern QMGR_MESSAGE *qmgr_message_realloc(QMGR_MESSAGE *);
 
 #define QMGR_MSG_STATS(stats, message) \
@@ -410,6 +418,7 @@ struct QMGR_JOB {
     QMGR_JOB *stack_parent;            /* stack parent */
     QMGR_JOB_LIST stack_children;      /* all stack children */
     QMGR_JOB_LIST stack_siblings;      /* stack children linkage */
+    int     flags;                     /* various flags */
     int     stack_level;               /* job stack nesting level (-1 means
                                         * it's not on the lists at all) */
     int     blocker_tag;               /* tagged if blocks the job list */
@@ -435,6 +444,10 @@ struct QMGR_PEER {
     QMGR_PEER_LIST peers;              /* neighbor linkage */
 };
 
+#define QMGR_JOB_SLOW_FLAG     (1<<0)  /* job is marked as slow */
+
+#define QMGR_JOB_SLOW(job)      (((job)->flags & QMGR_JOB_SLOW_FLAG) != 0)
+
 extern QMGR_ENTRY *qmgr_job_entry_select(QMGR_TRANSPORT *);
 extern QMGR_PEER *qmgr_peer_select(QMGR_JOB *);
 extern void qmgr_job_blocker_update(QMGR_QUEUE *);
diff --git a/src/qmgr/qmgr_active.c b/src/qmgr/qmgr_active.c
index d93a2cd..302c0a3 100644
--- a/src/qmgr/qmgr_active.c
+++ b/src/qmgr/qmgr_active.c
@@ -229,7 +229,7 @@ int     qmgr_active_feed(QMGR_SCAN *scan_info, const char 
*queue_id)
      */
 #define QMGR_FLUSH_AFTER       (QMGR_FLUSH_EACH | QMGR_FLUSH_DFXP)
 
-    if ((message = qmgr_message_alloc(MAIL_QUEUE_ACTIVE, queue_id,
+    if ((message = qmgr_message_alloc(MAIL_QUEUE_ACTIVE, scan_info->queue, 
queue_id,
                                 (st.st_mode & MAIL_QUEUE_STAT_UNTHROTTLE) ?
                                      scan_info->flags | QMGR_FLUSH_AFTER :
                                      scan_info->flags,
diff --git a/src/qmgr/qmgr_deliver.c b/src/qmgr/qmgr_deliver.c
index 2fbb049..6f349f6 100644
--- a/src/qmgr/qmgr_deliver.c
+++ b/src/qmgr/qmgr_deliver.c
@@ -223,6 +223,7 @@ static void qmgr_deliver_update(int unused_event, char 
*context)
     QMGR_QUEUE *queue = entry->queue;
     QMGR_TRANSPORT *transport = queue->transport;
     QMGR_MESSAGE *message = entry->message;
+    QMGR_JOB *job = entry->peer->job;
     static DSN_BUF *dsb;
     int     status;
 
@@ -234,12 +235,22 @@ static void qmgr_deliver_update(int unused_event, char 
*context)
        (void) vstream_fclose(entry->stream); \
        entry->stream = 0; \
        qmgr_deliver_concurrency--; \
+       transport->active--; \
+       if (QMGR_JOB_SLOW(job)) \
+           transport->slow_active--; \
     } while (0)
 
     if (dsb == 0)
        dsb = dsb_create();
 
     /*
+     * If this was one of the slow messages while the slow delivery agents
+     * were maxed out, let the transport know that it can now try sending more.
+     */
+    if (QMGR_JOB_SLOW(job) && transport->slow_active == transport->slow_limit)
+       transport->job_current = transport->job_list.next;
+
+    /*
      * The message transport has responded. Stop the watchdog timer.
      */
     event_cancel_timer(qmgr_deliver_abort, context);
@@ -418,11 +429,26 @@ void    qmgr_deliver(QMGR_TRANSPORT *transport, VSTREAM 
*stream)
      * If we get this far, go wait for the delivery status report.
      */
     qmgr_deliver_concurrency++;
+    transport->active++;
+    if (QMGR_JOB_SLOW(entry->peer->job))
+       transport->slow_active++;
     entry->stream = stream;
     event_enable_read(vstream_fileno(stream),
                      qmgr_deliver_update, (char *) entry);
 
     /*
+     * Warn about misconfigured systems.
+     *
+     * Unfortunately, we do not know what the transport delivery agent limit 
is,
+     * so we can only keep checking this as the delivery agents get maxed out.
+     */
+    if (transport->slow_limit > 0 && transport->slow_limit < transport->active 
/ 2) {
+       msg_warn("%s%s is too small for %s transport limit - adjusting %s%s",
+             transport->name, _SLOW_XPORT_LIMIT, transport->name, 
transport->name, _SLOW_XPORT_LIMIT);
+       transport->slow_limit = transport->active - transport->active / 10;
+    }
+
+    /*
      * Guard against broken systems.
      */
     event_request_timer(qmgr_deliver_abort, (char *) entry, 
var_daemon_timeout);
diff --git a/src/qmgr/qmgr_job.c b/src/qmgr/qmgr_job.c
index 7de70f2..2864954 100644
--- a/src/qmgr/qmgr_job.c
+++ b/src/qmgr/qmgr_job.c
@@ -103,6 +103,9 @@ static QMGR_JOB *qmgr_job_create(QMGR_MESSAGE *message, 
QMGR_TRANSPORT *transpor
 
     job = (QMGR_JOB *) mymalloc(sizeof(QMGR_JOB));
     job->message = message;
+    job->flags = 0;
+    if (qmgr_message_is_slow(message, transport))
+       job->flags |= QMGR_JOB_SLOW_FLAG;
     QMGR_LIST_APPEND(message->job_list, job, message_peers);
     htable_enter(transport->job_byname, message->queue_id, (char *) job);
     job->transport = transport;
@@ -873,6 +876,13 @@ QMGR_ENTRY *qmgr_job_entry_select(QMGR_TRANSPORT 
*transport)
        if (IS_BLOCKER(job, transport))
            continue;
 
+       /*
+        * Skip any slow jobs once we have reached the transport limit for slow 
jobs.
+        */
+       if (QMGR_JOB_SLOW(job) &&
+           (transport->slow_limit > 0 && transport->slow_active >= 
transport->slow_limit))
+           continue;
+
        if ((peer = qmgr_job_peer_select(job)) != 0) {
 
            /*
@@ -939,7 +949,8 @@ QMGR_ENTRY *qmgr_job_entry_select(QMGR_TRANSPORT *transport)
     /*
      * We have not found any entry we could use for delivery. Well, things
      * must have changed since this transport was selected for asynchronous
-     * allocation. Never mind. Clear the current job pointer and reluctantly
+     * allocation. Or there are only slow jobs and we have hit the limit.
+     * Never mind. Clear the current job pointer and reluctantly
      * report back that we have failed in our task.
      */
     transport->job_current = 0;
diff --git a/src/qmgr/qmgr_message.c b/src/qmgr/qmgr_message.c
index 576fb2d..bd7b229 100644
--- a/src/qmgr/qmgr_message.c
+++ b/src/qmgr/qmgr_message.c
@@ -27,6 +27,10 @@
 /*     void    qmgr_message_kill_record(message, offset)
 /*     QMGR_MESSAGE *message;
 /*     long    offset;
+/*
+/*     int     qmgr_message_is_slow(message, transport)
+/*     QMGR_MESSAGE *message;
+/*     QMGR_TRANSPORT *transport;
 /* DESCRIPTION
 /*     This module performs en-gross operations on queue messages.
 /*
@@ -71,6 +75,10 @@
 /*     the record type at the given offset to "killed", and closes the file.
 /*     A killed envelope record is ignored. Killed records are not allowed
 /*     inside the message content.
+/*
+/*     qmgr_message_is_slow() tests if given message is considered slow
+/*     in context of given transport. Currently, too old messages coming from
+/*     deferred queue and too large messages may be considered slow.
 /* DIAGNOSTICS
 /*     Warnings: malformed message file. Fatal errors: out of memory.
 /* SEE ALSO
@@ -148,12 +156,13 @@
 #include "qmgr.h"
 
 int     qmgr_message_count;
+int     qmgr_deferred_message_count;
 int     qmgr_recipient_count;
 
 /* qmgr_message_create - create in-core message structure */
 
 static QMGR_MESSAGE *qmgr_message_create(const char *queue_name,
-                                          const char *queue_id, int qflags)
+                                          const char *orig_queue_name, const 
char *queue_id, int qflags)
 {
     QMGR_MESSAGE *message;
 
@@ -175,6 +184,9 @@ static QMGR_MESSAGE *qmgr_message_create(const char 
*queue_name,
     message->data_offset = 0;
     message->queue_id = mystrdup(queue_id);
     message->queue_name = mystrdup(queue_name);
+    message->orig_queue_name = mystrdup(orig_queue_name);
+    if (strcmp(message->orig_queue_name, MAIL_QUEUE_DEFERRED) == 0)
+       qmgr_deferred_message_count++;
     message->encoding = 0;
     message->sender = 0;
     message->dsn_envid = 0;
@@ -214,6 +226,18 @@ static void qmgr_message_close(QMGR_MESSAGE *message)
     message->fp = 0;
 }
 
+/* qmgr_message_is_slow - recognize messages slow for given transport*/
+
+int qmgr_message_is_slow(QMGR_MESSAGE *message, QMGR_TRANSPORT *transport)
+{
+    if (transport->slow_msg_size > 0 && message->data_size >= 
transport->slow_msg_size)
+       return (1);
+    if (strcmp(message->orig_queue_name, MAIL_QUEUE_DEFERRED) == 0 &&
+       message->queued_time - message->create_time >= transport->slow_msg_time)
+       return (1);
+    return (0);
+}
+
 /* qmgr_message_open - open queue file */
 
 static int qmgr_message_open(QMGR_MESSAGE *message)
@@ -1385,8 +1409,11 @@ void    qmgr_message_free(QMGR_MESSAGE *message)
        msg_panic("qmgr_message_free: queue file is open");
     while ((job = message->job_list.next) != 0)
        qmgr_job_free(job);
+    if (strcmp(message->orig_queue_name, MAIL_QUEUE_DEFERRED) == 0)
+       qmgr_deferred_message_count--;
     myfree(message->queue_id);
     myfree(message->queue_name);
+    myfree(message->orig_queue_name);
     if (message->dsn_envid)
        myfree(message->dsn_envid);
     if (message->encoding)
@@ -1428,19 +1455,19 @@ void    qmgr_message_free(QMGR_MESSAGE *message)
 
 /* qmgr_message_alloc - create in-core message structure */
 
-QMGR_MESSAGE *qmgr_message_alloc(const char *queue_name, const char *queue_id,
-                                        int qflags, mode_t mode)
+QMGR_MESSAGE *qmgr_message_alloc(const char *queue_name, const char 
*orig_queue_name,
+                                        const char *queue_id, int qflags, 
mode_t mode)
 {
     const char *myname = "qmgr_message_alloc";
     QMGR_MESSAGE *message;
 
     if (msg_verbose)
-       msg_info("%s: %s %s", myname, queue_name, queue_id);
+       msg_info("%s: %s %s (from %s)", myname, queue_name, queue_id, 
orig_queue_name);
 
     /*
      * Create an in-core message structure.
      */
-    message = qmgr_message_create(queue_name, queue_id, qflags);
+    message = qmgr_message_create(queue_name, orig_queue_name, queue_id, 
qflags);
 
     /*
      * Extract message envelope information: time of arrival, sender address,
diff --git a/src/qmgr/qmgr_transport.c b/src/qmgr/qmgr_transport.c
index 434d75e..8fa115e 100644
--- a/src/qmgr/qmgr_transport.c
+++ b/src/qmgr/qmgr_transport.c
@@ -282,11 +282,18 @@ QMGR_TRANSPORT *qmgr_transport_select(void)
      * the number of pending delivery agent connections, until all delivery
      * agent concurrency windows are maxed out, or until we run out of "todo"
      * queue entries.
+     *
+     * XXX Rather than duplicating the window testing logic below for slow 
jobs,
+     * we allow the transport to give it a try first and only back off after it
+     * finds out it has no but slow jobs available. As it only happens when 
the slow
+     * messages max out all delivery agents without anything else to be 
delivered,
+     * it's not critical.
      */
 #define MIN5af51743e4eef(x, y) ((x) < (y) ? (x) : (y))
 
     for (xport = qmgr_transport_list.next; xport; xport = xport->peers.next) {
        if ((xport->flags & QMGR_TRANSPORT_STAT_DEAD) != 0
+           || xport->job_current == 0
            || xport->pending >= QMGR_TRANSPORT_MAX_PEND)
            continue;
        need = xport->pending + 1;
@@ -378,11 +385,22 @@ QMGR_TRANSPORT *qmgr_transport_create(const char *name)
     transport = (QMGR_TRANSPORT *) mymalloc(sizeof(QMGR_TRANSPORT));
     transport->flags = 0;
     transport->pending = 0;
+    transport->active = 0;
+    transport->slow_active = 0;
     transport->name = mystrdup(name);
 
     /*
      * Use global configuration settings or transport-specific settings.
      */
+    transport->slow_limit =
+       get_mail_conf_int2(name, _SLOW_XPORT_LIMIT,
+                          var_slow_xport_limit, 0, 0);
+    transport->slow_msg_size =
+       get_mail_conf_int2(name, _SLOW_MSG_SIZE,
+                          var_slow_msg_size, 0, 0);
+    transport->slow_msg_time =
+       get_mail_conf_time2(name, _SLOW_MSG_TIME,
+                           var_slow_msg_time, 's', 0, 0);
     transport->dest_concurrency_limit =
        get_mail_conf_int2(name, _DEST_CON_LIMIT,
                           var_dest_con_limit, 0, 0);

Reply via email to