Hello,

attached is my first take on the problem we have discussed few weeks ago - 
limiting amount of deferred messages in the active queue and limiting 
amount of delivery agents used by presumably "slow" deferred messages.

The patch contains several incremental parts which show how I developed 
this and which make it more readable at the same time.

The first half of the patch implements the active queue limit. Few 
comments:

- The qmgr_loop() now always round robins the queues. The original version 
stopped doing that when active queue was full and incoming mail kept 
flowing in. I understand that this was done to prevent the deferred queue 
from dominating the active queue, however the downside was that it could 
also entirely starve the deferred queue indefinitely. IMO the new 
mechanism prevents the active queue from being stalled in much better 
way, while remaining fair at the same time.

- Coincidentally, the previous change can now results in more in-flow back 
pressure being applied when the active queue becomes full, as deferred 
queue doesn't get ignored entirely. So this seems like an improvement, not 
a regression which Viktor was afraid of.

- The qmgr_deferred_message_limit is internally used to limit the number 
of deferred messages in the active queue. However, I later realized that 
from user's point of view, it is perhaps easier if they can instead 
specify how much of the active queue has to be left available for the 
incoming queue. This has the advantage that they don't have to adjust this 
when they increase the active queue size, so the default works better, 
too. But then I am not entirely sure how (or if at all) the name of the 
config variable shouldn't somehow change, too...

The second half implements the delivery agent limit. Few comments:

- Skipping the slow jobs in qmgr_job_entry_select() turned out not to be 
that difficult - the tricky part was of course verifying all the 
implications of this change and coming up with relevant adjustments. The 
trickiest part was in qmgr_transport_select(), which shall decide whether 
transport has some recipient entries ready for delivery. Rather than 
duplicating all the window counting logic for slow jobs, I have decided to 
use the fact that qmgr can change its mind after contacting the delivery 
agent. This is something which already happens now from time to time, so 
it seemed like a reasonable solution for a situation which is not time 
critical.

- The fact that qmgr doesn't know the transport limit makes some things a 
bit difficult. I could not use the "how much has to be left unused" 
approach for the slow delivery agent limit, for example, so people have to 
adjust this limit when increasing the transport or process limit. I have 
also added some warning to make sure people don't set this limit too low, 
but it can only trigger after the delivery agents start getting maxed out. 
It doesn't detect the case when this limit is set too high either.
I don't know how much of an issue all this is, but it might be worth 
considering to somehow let master to pass these limits to qmgr on startup.

And few overall comments:

- I have only added the basic docs to qmgr(8). I didn't regenerate 
the manpages, didn't touch the postconf docs, nor any other docs like 
postfix tuning. This all can IMO wait until the names and meaning of 
the new variables gets finalized.

- Regardless of how much I would like to do that, I didn't have time nor 
suitable testing environment to actually test this patch. I did my best to 
make sure it is correct, though, and I hope someone here will be kind 
enough to put it to test.

Thanks,

Patrik
commit 5f0aac23479274448e722bede9102f1b3decddd4
Author: Patrik Rak <pat...@raxoft.cz>
Date:   Thu May 30 15:52:12 2013 +0200

    Keep track of orig queue name and number of originally deferred messsages.

diff --git a/src/qmgr/qmgr.h b/src/qmgr/qmgr.h
index 6737e42..4db27c5 100644
--- a/src/qmgr/qmgr.h
+++ b/src/qmgr/qmgr.h
@@ -338,6 +338,7 @@ struct QMGR_MESSAGE {
     time_t  warn_time;			/* time next warning to be sent */
     long    data_offset;		/* data seek offset */
     char   *queue_name;			/* queue name */
+    char   *orig_queue_name;		/* from which queue this came from */
     char   *queue_id;			/* queue file */
     char   *encoding;			/* content encoding */
     char   *sender;			/* complete address */
@@ -376,12 +377,13 @@ struct QMGR_MESSAGE {
 #define QMGR_MESSAGE_LOCKED	((QMGR_MESSAGE *) 1)
 
 extern int qmgr_message_count;
+extern int qmgr_deferred_message_count;
 extern int qmgr_recipient_count;
 
 extern void qmgr_message_free(QMGR_MESSAGE *);
 extern void qmgr_message_update_warn(QMGR_MESSAGE *);
 extern void qmgr_message_kill_record(QMGR_MESSAGE *, long);
-extern QMGR_MESSAGE *qmgr_message_alloc(const char *, const char *, int, mode_t);
+extern QMGR_MESSAGE *qmgr_message_alloc(const char *, const char *, const char *, int, mode_t);
 extern QMGR_MESSAGE *qmgr_message_realloc(QMGR_MESSAGE *);
 
 #define QMGR_MSG_STATS(stats, message) \
diff --git a/src/qmgr/qmgr_active.c b/src/qmgr/qmgr_active.c
index d93a2cd..302c0a3 100644
--- a/src/qmgr/qmgr_active.c
+++ b/src/qmgr/qmgr_active.c
@@ -229,7 +229,7 @@ int     qmgr_active_feed(QMGR_SCAN *scan_info, const char *queue_id)
      */
 #define QMGR_FLUSH_AFTER	(QMGR_FLUSH_EACH | QMGR_FLUSH_DFXP)
 
-    if ((message = qmgr_message_alloc(MAIL_QUEUE_ACTIVE, queue_id,
+    if ((message = qmgr_message_alloc(MAIL_QUEUE_ACTIVE, scan_info->queue, queue_id,
 				 (st.st_mode & MAIL_QUEUE_STAT_UNTHROTTLE) ?
 				      scan_info->flags | QMGR_FLUSH_AFTER :
 				      scan_info->flags,
diff --git a/src/qmgr/qmgr_message.c b/src/qmgr/qmgr_message.c
index 576fb2d..a1fcdc9 100644
--- a/src/qmgr/qmgr_message.c
+++ b/src/qmgr/qmgr_message.c
@@ -148,12 +148,13 @@
 #include "qmgr.h"
 
 int     qmgr_message_count;
+int     qmgr_deferred_message_count;
 int     qmgr_recipient_count;
 
 /* qmgr_message_create - create in-core message structure */
 
 static QMGR_MESSAGE *qmgr_message_create(const char *queue_name,
-				           const char *queue_id, int qflags)
+				           const char *orig_queue_name, const char *queue_id, int qflags)
 {
     QMGR_MESSAGE *message;
 
@@ -175,6 +176,9 @@ static QMGR_MESSAGE *qmgr_message_create(const char *queue_name,
     message->data_offset = 0;
     message->queue_id = mystrdup(queue_id);
     message->queue_name = mystrdup(queue_name);
+    message->orig_queue_name = mystrdup(orig_queue_name);
+    if (strcmp(message->orig_queue_name, MAIL_QUEUE_DEFERRED) == 0)
+	qmgr_deferred_message_count++;
     message->encoding = 0;
     message->sender = 0;
     message->dsn_envid = 0;
@@ -1385,8 +1389,11 @@ void    qmgr_message_free(QMGR_MESSAGE *message)
 	msg_panic("qmgr_message_free: queue file is open");
     while ((job = message->job_list.next) != 0)
 	qmgr_job_free(job);
+    if (strcmp(message->orig_queue_name, MAIL_QUEUE_DEFERRED) == 0)
+	qmgr_deferred_message_count--;
     myfree(message->queue_id);
     myfree(message->queue_name);
+    myfree(message->orig_queue_name);
     if (message->dsn_envid)
 	myfree(message->dsn_envid);
     if (message->encoding)
@@ -1428,19 +1435,19 @@ void    qmgr_message_free(QMGR_MESSAGE *message)
 
 /* qmgr_message_alloc - create in-core message structure */
 
-QMGR_MESSAGE *qmgr_message_alloc(const char *queue_name, const char *queue_id,
-				         int qflags, mode_t mode)
+QMGR_MESSAGE *qmgr_message_alloc(const char *queue_name, const char *orig_queue_name,
+				         const char *queue_id, int qflags, mode_t mode)
 {
     const char *myname = "qmgr_message_alloc";
     QMGR_MESSAGE *message;
 
     if (msg_verbose)
-	msg_info("%s: %s %s", myname, queue_name, queue_id);
+	msg_info("%s: %s %s (from %s)", myname, queue_name, queue_id, orig_queue_name);
 
     /*
      * Create an in-core message structure.
      */
-    message = qmgr_message_create(queue_name, queue_id, qflags);
+    message = qmgr_message_create(queue_name, orig_queue_name, queue_id, qflags);
 
     /*
      * Extract message envelope information: time of arrival, sender address,

commit 1478f76d6e2b76757e41486d534108a842a36da7
Author: Patrik Rak <pat...@raxoft.cz>
Date:   Thu May 30 16:10:06 2013 +0200

    Limit amount of deferred messages in active queue.

diff --git a/src/qmgr/qmgr.c b/src/qmgr/qmgr.c
index fbf7dab..a2cea03 100644
--- a/src/qmgr/qmgr.c
+++ b/src/qmgr/qmgr.c
@@ -416,6 +416,7 @@ int     var_max_backoff_time;
 int     var_max_queue_time;
 int     var_dsn_queue_time;
 int     var_qmgr_active_limit;
+int     var_qmgr_deferred_limit;
 int     var_qmgr_rcpt_limit;
 int     var_qmgr_msg_rcpt_limit;
 int     var_xport_rcpt_limit;
@@ -533,8 +534,7 @@ static int qmgr_loop(char *unused_name, char **unused_argv)
     int     token_count;
     int     feed = 0;
     int     scan_idx;			/* Priority order scan index */
-    static int first_scan_idx = QMGR_SCAN_IDX_INCOMING;
-    int     last_scan_idx = QMGR_SCAN_IDX_COUNT - 1;
+    static int last_scan_idx = QMGR_SCAN_IDX_COUNT - 1;
     int     delay;
 
     /*
@@ -556,7 +556,8 @@ static int qmgr_loop(char *unused_name, char **unused_argv)
 
     /*
      * Let some new blood into the active queue when the queue size is
-     * smaller than some configurable limit.
+     * smaller than some configurable limit. We round-robin the queue scans,
+     * but eventually prevent the deferred queue from using all resources.
      * 
      * We import one message per interrupt, to optimally tune the input count
      * for the number of delivery agent protocol wait states, as explained in
@@ -565,7 +566,10 @@ static int qmgr_loop(char *unused_name, char **unused_argv)
     delay = WAIT_FOR_EVENT;
     for (scan_idx = 0; qmgr_message_count < var_qmgr_active_limit
 	 && scan_idx < QMGR_SCAN_IDX_COUNT; ++scan_idx) {
-	last_scan_idx = (scan_idx + first_scan_idx) % QMGR_SCAN_IDX_COUNT;
+	last_scan_idx = (last_scan_idx + 1) % QMGR_SCAN_IDX_COUNT;
+	if (last_scan_idx == QMGR_SCAN_IDX_DEFERRED &&
+	    qmgr_deferred_message_count >= var_qmgr_deferred_limit)
+	    continue;
 	if ((path = qmgr_scan_next(qmgr_scans[last_scan_idx])) != 0) {
 	    delay = DONT_WAIT;
 	    if ((feed = qmgr_active_feed(qmgr_scans[last_scan_idx], path)) != 0)
@@ -574,16 +578,6 @@ static int qmgr_loop(char *unused_name, char **unused_argv)
     }
 
     /*
-     * Round-robin the queue scans. When the active queue becomes full,
-     * prefer new mail over deferred mail.
-     */
-    if (qmgr_message_count < var_qmgr_active_limit) {
-	first_scan_idx = (last_scan_idx + 1) % QMGR_SCAN_IDX_COUNT;
-    } else if (first_scan_idx != QMGR_SCAN_IDX_INCOMING) {
-	first_scan_idx = QMGR_SCAN_IDX_INCOMING;
-    }
-
-    /*
      * Global flow control. If enabled, slow down receiving processes that
      * get ahead of the queue manager, but don't block them completely.
      */

commit 66c699c5ef543044a6ea1d753347cfad915acdfa
Author: Patrik Rak <pat...@raxoft.cz>
Date:   Thu May 30 16:50:52 2013 +0200

    Implement setting of the newly introduced limit.

diff --git a/src/global/mail_params.h b/src/global/mail_params.h
index 1e368f8..cbcf826 100644
--- a/src/global/mail_params.h
+++ b/src/global/mail_params.h
@@ -721,6 +721,10 @@ extern int var_delay_warn_time;
 #define DEF_QMGR_ACT_LIMIT	20000
 extern int var_qmgr_active_limit;
 
+#define VAR_QMGR_DFR_LIMIT	"qmgr_message_deferred_limit"
+#define DEF_QMGR_DFR_LIMIT	1000
+extern int var_qmgr_deferred_limit;
+
 #define VAR_QMGR_RCPT_LIMIT	"qmgr_message_recipient_limit"
 #define DEF_QMGR_RCPT_LIMIT	20000
 extern int var_qmgr_rcpt_limit;
diff --git a/src/qmgr/qmgr.c b/src/qmgr/qmgr.c
index a2cea03..f008867 100644
--- a/src/qmgr/qmgr.c
+++ b/src/qmgr/qmgr.c
@@ -175,6 +175,8 @@
 /*	clogging up the Postfix active queue.
 /* .IP "\fBqmgr_message_active_limit (20000)\fR"
 /*	The maximal number of messages in the active queue.
+/* .IP "\fBqmgr_message_deferred_limit (1000)\fR"
+/*	How much of the active queue must be left reserved for messages from the incoming queue.
 /* .IP "\fBqmgr_message_recipient_limit (20000)\fR"
 /*	The maximal number of recipients held in memory by the Postfix
 /*	queue manager, and the maximal size of the short-term,
@@ -632,6 +634,12 @@ static void qmgr_post_init(char *name, char **unused_argv)
     /*
      * Sanity check.
      */
+    var_qmgr_deferred_limit = var_qmgr_active_limit - var_qmgr_deferred_limit;
+    if (var_qmgr_deferred_limit <= var_qmgr_active_limit / 4) {
+	msg_warn("%s is too large for %s - adjusting %s",
+	      VAR_QMGR_DFR_LIMIT, VAR_QMGR_ACT_LIMIT, VAR_QMGR_DFR_LIMIT);
+	var_qmgr_deferred_limit = var_qmgr_active_limit - var_qmgr_active_limit / 10;
+    }
     if (var_qmgr_rcpt_limit < var_qmgr_active_limit) {
 	msg_warn("%s is smaller than %s - adjusting %s",
 	      VAR_QMGR_RCPT_LIMIT, VAR_QMGR_ACT_LIMIT, VAR_QMGR_RCPT_LIMIT);
@@ -696,6 +704,7 @@ int     main(int argc, char **argv)
     };
     static const CONFIG_INT_TABLE int_table[] = {
 	VAR_QMGR_ACT_LIMIT, DEF_QMGR_ACT_LIMIT, &var_qmgr_active_limit, 1, 0,
+	VAR_QMGR_DFR_LIMIT, DEF_QMGR_DFR_LIMIT, &var_qmgr_deferred_limit, 0, 0,
 	VAR_QMGR_RCPT_LIMIT, DEF_QMGR_RCPT_LIMIT, &var_qmgr_rcpt_limit, 1, 0,
 	VAR_QMGR_MSG_RCPT_LIMIT, DEF_QMGR_MSG_RCPT_LIMIT, &var_qmgr_msg_rcpt_limit, 1, 0,
 	VAR_XPORT_RCPT_LIMIT, DEF_XPORT_RCPT_LIMIT, &var_xport_rcpt_limit, 0, 0,

commit 155cf58c5d679a732d7df2b90e4eb5d064f2fcab
Author: Patrik Rak <pat...@raxoft.cz>
Date:   Mon Jun 10 13:16:30 2013 +0200

    Prevent "slow" messages from using all delivery agents.

diff --git a/src/global/mail_params.h b/src/global/mail_params.h
index cbcf826..5ff08b2 100644
--- a/src/global/mail_params.h
+++ b/src/global/mail_params.h
@@ -810,6 +810,18 @@ extern int var_dest_rcpt_limit;
 extern int var_local_rcpt_lim;
 
  /*
+  * Queue manager: default limits for "slow" messages.
+  */
+#define VAR_SLOW_XPORT_LIMIT	"default_slow_transport_limit"
+#define _SLOW_XPORT_LIMIT	"_slow_transport_limit"
+#define DEF_SLOW_XPORT_LIMIT	(DEF_PROC_LIMIT - DEF_PROC_LIMIT / 10)
+extern int var_slow_xport_limit;
+
+#define VAR_SLOW_MSG_TIME	"slow_message_time_limit"
+#define DEF_SLOW_MSG_TIME	"600s"
+extern int var_slow_msg_time;
+
+ /*
   * Queue manager: default delay before retrying a dead transport.
   */
 #define VAR_XPORT_RETRY_TIME	"transport_retry_time"
diff --git a/src/qmgr/qmgr.c b/src/qmgr/qmgr.c
index f008867..ccdb551 100644
--- a/src/qmgr/qmgr.c
+++ b/src/qmgr/qmgr.c
@@ -429,6 +429,8 @@ int     var_delivery_slot_cost;
 int     var_delivery_slot_loan;
 int     var_delivery_slot_discount;
 int     var_min_delivery_slots;
+int     var_slow_xport_limit;
+int     var_slow_msg_time;
 int     var_init_dest_concurrency;
 int     var_transport_retry_time;
 int     var_dest_con_limit;
@@ -694,6 +696,7 @@ int     main(int argc, char **argv)
 	VAR_MAX_BACKOFF_TIME, DEF_MAX_BACKOFF_TIME, &var_max_backoff_time, 1, 0,
 	VAR_MAX_QUEUE_TIME, DEF_MAX_QUEUE_TIME, &var_max_queue_time, 0, 8640000,
 	VAR_DSN_QUEUE_TIME, DEF_DSN_QUEUE_TIME, &var_dsn_queue_time, 0, 8640000,
+	VAR_SLOW_MSG_TIME, DEF_SLOW_MSG_TIME, &var_slow_msg_time, 0, 0,
 	VAR_XPORT_RETRY_TIME, DEF_XPORT_RETRY_TIME, &var_transport_retry_time, 1, 0,
 	VAR_QMGR_CLOG_WARN_TIME, DEF_QMGR_CLOG_WARN_TIME, &var_qmgr_clog_warn_time, 0, 0,
 	VAR_XPORT_REFILL_DELAY, DEF_XPORT_REFILL_DELAY, &var_xport_refill_delay, 1, 0,
@@ -714,6 +717,7 @@ int     main(int argc, char **argv)
 	VAR_DELIVERY_SLOT_LOAN, DEF_DELIVERY_SLOT_LOAN, &var_delivery_slot_loan, 0, 0,
 	VAR_DELIVERY_SLOT_DISCOUNT, DEF_DELIVERY_SLOT_DISCOUNT, &var_delivery_slot_discount, 0, 100,
 	VAR_MIN_DELIVERY_SLOTS, DEF_MIN_DELIVERY_SLOTS, &var_min_delivery_slots, 0, 0,
+	VAR_SLOW_XPORT_LIMIT, DEF_SLOW_XPORT_LIMIT, &var_slow_xport_limit, 0, 0,
 	VAR_INIT_DEST_CON, DEF_INIT_DEST_CON, &var_init_dest_concurrency, 1, 0,
 	VAR_DEST_CON_LIMIT, DEF_DEST_CON_LIMIT, &var_dest_con_limit, 0, 0,
 	VAR_DEST_RCPT_LIMIT, DEF_DEST_RCPT_LIMIT, &var_dest_rcpt_limit, 0, 0,
diff --git a/src/qmgr/qmgr.h b/src/qmgr/qmgr.h
index 4db27c5..2036c8a 100644
--- a/src/qmgr/qmgr.h
+++ b/src/qmgr/qmgr.h
@@ -169,6 +169,9 @@ struct QMGR_JOB_LIST {
 struct QMGR_TRANSPORT {
     int     flags;			/* blocked, etc. */
     int     pending;			/* incomplete DA connections */
+    int     active;			/* active DA connections */
+    int     slow_active;		/* active slow DA connections */
+    int     slow_limit;			/* limit for slow DA connections */
     char   *name;			/* transport name */
     int     dest_concurrency_limit;	/* concurrency per domain */
     int     init_dest_concurrency;	/* init. per-domain concurrency */
@@ -383,6 +386,7 @@ extern int qmgr_recipient_count;
 extern void qmgr_message_free(QMGR_MESSAGE *);
 extern void qmgr_message_update_warn(QMGR_MESSAGE *);
 extern void qmgr_message_kill_record(QMGR_MESSAGE *, long);
+extern int qmgr_message_is_slow(QMGR_MESSAGE *);
 extern QMGR_MESSAGE *qmgr_message_alloc(const char *, const char *, const char *, int, mode_t);
 extern QMGR_MESSAGE *qmgr_message_realloc(QMGR_MESSAGE *);
 
@@ -412,6 +416,7 @@ struct QMGR_JOB {
     QMGR_JOB *stack_parent;		/* stack parent */
     QMGR_JOB_LIST stack_children;	/* all stack children */
     QMGR_JOB_LIST stack_siblings;	/* stack children linkage */
+    int     flags;			/* various flags */
     int     stack_level;		/* job stack nesting level (-1 means
 					 * it's not on the lists at all) */
     int     blocker_tag;		/* tagged if blocks the job list */
@@ -437,6 +442,10 @@ struct QMGR_PEER {
     QMGR_PEER_LIST peers;		/* neighbor linkage */
 };
 
+#define QMGR_JOB_SLOW_FLAG	(1<<0)	/* job is marked as slow */
+
+#define QMGR_JOB_SLOW(job)      (((job)->flags & QMGR_JOB_SLOW_FLAG) != 0)
+
 extern QMGR_ENTRY *qmgr_job_entry_select(QMGR_TRANSPORT *);
 extern QMGR_PEER *qmgr_peer_select(QMGR_JOB *);
 extern void qmgr_job_blocker_update(QMGR_QUEUE *);
diff --git a/src/qmgr/qmgr_deliver.c b/src/qmgr/qmgr_deliver.c
index 2fbb049..4e1f75c 100644
--- a/src/qmgr/qmgr_deliver.c
+++ b/src/qmgr/qmgr_deliver.c
@@ -223,6 +223,7 @@ static void qmgr_deliver_update(int unused_event, char *context)
     QMGR_QUEUE *queue = entry->queue;
     QMGR_TRANSPORT *transport = queue->transport;
     QMGR_MESSAGE *message = entry->message;
+    QMGR_JOB *job = entry->peer->job;
     static DSN_BUF *dsb;
     int     status;
 
@@ -234,12 +235,22 @@ static void qmgr_deliver_update(int unused_event, char *context)
 	(void) vstream_fclose(entry->stream); \
 	entry->stream = 0; \
 	qmgr_deliver_concurrency--; \
+	transport->active--; \
+	if (QMGR_JOB_SLOW(job)) \
+	    transport->slow_active--; \
     } while (0)
 
     if (dsb == 0)
 	dsb = dsb_create();
 
     /*
+     * If this was one of the slow messages while the slow delivery agents
+     * were maxed out, let the transport know that it can now try sending more.
+     */
+    if (QMGR_JOB_SLOW(job) && transport->slow_active == transport->slow_limit)
+	transport->job_current = transport->job_list.next;
+
+    /*
      * The message transport has responded. Stop the watchdog timer.
      */
     event_cancel_timer(qmgr_deliver_abort, context);
@@ -418,6 +429,9 @@ void    qmgr_deliver(QMGR_TRANSPORT *transport, VSTREAM *stream)
      * If we get this far, go wait for the delivery status report.
      */
     qmgr_deliver_concurrency++;
+    transport->active++;
+    if (QMGR_JOB_SLOW(entry->peer->job))
+	transport->slow_active++;
     entry->stream = stream;
     event_enable_read(vstream_fileno(stream),
 		      qmgr_deliver_update, (char *) entry);
diff --git a/src/qmgr/qmgr_job.c b/src/qmgr/qmgr_job.c
index 7de70f2..7681986 100644
--- a/src/qmgr/qmgr_job.c
+++ b/src/qmgr/qmgr_job.c
@@ -103,6 +103,9 @@ static QMGR_JOB *qmgr_job_create(QMGR_MESSAGE *message, QMGR_TRANSPORT *transpor
 
     job = (QMGR_JOB *) mymalloc(sizeof(QMGR_JOB));
     job->message = message;
+    job->flags = 0;
+    if (qmgr_message_is_slow(message))
+	job->flags |= QMGR_JOB_SLOW_FLAG;
     QMGR_LIST_APPEND(message->job_list, job, message_peers);
     htable_enter(transport->job_byname, message->queue_id, (char *) job);
     job->transport = transport;
@@ -873,6 +876,13 @@ QMGR_ENTRY *qmgr_job_entry_select(QMGR_TRANSPORT *transport)
 	if (IS_BLOCKER(job, transport))
 	    continue;
 
+	/*
+	 * Skip any slow jobs once we have reached the transport limit for slow jobs.
+	 */
+	if (QMGR_JOB_SLOW(job) &&
+	    (transport->slow_limit > 0 && transport->slow_active >= transport->slow_limit))
+	    continue;
+
 	if ((peer = qmgr_job_peer_select(job)) != 0) {
 
 	    /*
@@ -939,7 +949,8 @@ QMGR_ENTRY *qmgr_job_entry_select(QMGR_TRANSPORT *transport)
     /*
      * We have not found any entry we could use for delivery. Well, things
      * must have changed since this transport was selected for asynchronous
-     * allocation. Never mind. Clear the current job pointer and reluctantly
+     * allocation. Or there are only slow jobs and we have hit the limit.
+     * Never mind. Clear the current job pointer and reluctantly
      * report back that we have failed in our task.
      */
     transport->job_current = 0;
diff --git a/src/qmgr/qmgr_message.c b/src/qmgr/qmgr_message.c
index a1fcdc9..a4e47d0 100644
--- a/src/qmgr/qmgr_message.c
+++ b/src/qmgr/qmgr_message.c
@@ -27,6 +27,9 @@
 /*	void	qmgr_message_kill_record(message, offset)
 /*	QMGR_MESSAGE *message;
 /*	long	offset;
+/*
+/*	int	qmgr_message_is_slow(message)
+/*	QMGR_MESSAGE *message;
 /* DESCRIPTION
 /*	This module performs en-gross operations on queue messages.
 /*
@@ -71,6 +74,10 @@
 /*	the record type at the given offset to "killed", and closes the file.
 /*	A killed envelope record is ignored. Killed records are not allowed
 /*	inside the message content.
+/*
+/*	qmgr_message_is_slow() tests if given message is considered slow.
+/*	Currently, messages coming from deferred queue which are older
+/*	than \fIslow_message_time_limit\fR are considered slow.
 /* DIAGNOSTICS
 /*	Warnings: malformed message file. Fatal errors: out of memory.
 /* SEE ALSO
@@ -218,6 +225,17 @@ static void qmgr_message_close(QMGR_MESSAGE *message)
     message->fp = 0;
 }
 
+/* qmgr_message_is_slow - recognize slow messages */
+
+int qmgr_message_is_slow(QMGR_MESSAGE *message)
+{
+    if (strcmp(message->orig_queue_name, MAIL_QUEUE_DEFERRED) != 0)
+	return (0);
+    if (message->queued_time - message->create_time < var_slow_msg_time)
+	return (0);
+    return (1);
+}
+
 /* qmgr_message_open - open queue file */
 
 static int qmgr_message_open(QMGR_MESSAGE *message)
diff --git a/src/qmgr/qmgr_transport.c b/src/qmgr/qmgr_transport.c
index 434d75e..befbf82 100644
--- a/src/qmgr/qmgr_transport.c
+++ b/src/qmgr/qmgr_transport.c
@@ -282,11 +282,18 @@ QMGR_TRANSPORT *qmgr_transport_select(void)
      * the number of pending delivery agent connections, until all delivery
      * agent concurrency windows are maxed out, or until we run out of "todo"
      * queue entries.
+     *
+     * XXX Rather than duplicating the window testing logic below for slow jobs,
+     * we allow the transport to give it a try first and only back off after it
+     * finds out it has no but slow jobs available. As it only happens when the slow
+     * messages max out all delivery agents without anything else to be delivered,
+     * it's not critical.
      */
 #define MIN5af51743e4eef(x, y) ((x) < (y) ? (x) : (y))
 
     for (xport = qmgr_transport_list.next; xport; xport = xport->peers.next) {
 	if ((xport->flags & QMGR_TRANSPORT_STAT_DEAD) != 0
+	    || xport->job_current == 0
 	    || xport->pending >= QMGR_TRANSPORT_MAX_PEND)
 	    continue;
 	need = xport->pending + 1;
@@ -378,11 +385,16 @@ QMGR_TRANSPORT *qmgr_transport_create(const char *name)
     transport = (QMGR_TRANSPORT *) mymalloc(sizeof(QMGR_TRANSPORT));
     transport->flags = 0;
     transport->pending = 0;
+    transport->active = 0;
+    transport->slow_active = 0;
     transport->name = mystrdup(name);
 
     /*
      * Use global configuration settings or transport-specific settings.
      */
+    transport->slow_limit =
+	get_mail_conf_int2(name, _SLOW_XPORT_LIMIT,
+			   var_slow_xport_limit, 0, 0);
     transport->dest_concurrency_limit =
 	get_mail_conf_int2(name, _DEST_CON_LIMIT,
 			   var_dest_con_limit, 0, 0);

commit 8700ffb9c0f4f35136ac297f4ca9563d95846fa8
Author: Patrik Rak <pat...@raxoft.cz>
Date:   Mon Jun 10 13:56:59 2013 +0200

    Added docs.

diff --git a/src/qmgr/qmgr.c b/src/qmgr/qmgr.c
index ccdb551..4f061c6 100644
--- a/src/qmgr/qmgr.c
+++ b/src/qmgr/qmgr.c
@@ -175,8 +175,6 @@
 /*	clogging up the Postfix active queue.
 /* .IP "\fBqmgr_message_active_limit (20000)\fR"
 /*	The maximal number of messages in the active queue.
-/* .IP "\fBqmgr_message_deferred_limit (1000)\fR"
-/*	How much of the active queue must be left reserved for messages from the incoming queue.
 /* .IP "\fBqmgr_message_recipient_limit (20000)\fR"
 /*	The maximal number of recipients held in memory by the Postfix
 /*	queue manager, and the maximal size of the short-term,
@@ -204,6 +202,10 @@
 /*	The default per-transport maximum delay between recipients refills.
 /* .IP "\fItransport\fB_recipient_refill_delay ($default_recipient_refill_delay)\fR"
 /*	Idem, for delivery via the named message \fItransport\fR.
+/* .PP
+/*	Available in Postfix version 2.11 and later:
+/* .IP "\fBqmgr_message_deferred_limit (1000)\fR"
+/*	How much of the active queue must be left available for messages from the incoming queue.
 /* DELIVERY CONCURRENCY CONTROLS
 /* .ad
 /* .fi
@@ -241,6 +243,16 @@
 /* .IP "\fBdestination_concurrency_feedback_debug (no)\fR"
 /*	Make the queue manager's feedback algorithm verbose for performance
 /*	analysis purposes.
+/* .PP
+/*	Available in Postfix version 2.11 and later:
+/* .IP "\fBdefault_slow_transport_limit (90)\fR"
+/*	Deferred messages older than \fBslow_message_time_limit\fR are prevented
+/*	from using more than this many delivery agents. 0 means no limit.
+/* .IP "\fItransport\fB_slow_transport_limit ($default_slow_transport_limit)\fR"
+/*	Idem, for delivery via the named message \fItransport\fR.
+/* .IP "\fBslow_message_time_limit (600s)\fR"
+/*	Deferred messages older than this are considered "slow"
+/*	and are subject to \fItransport\fB_slow_transport_limit\fR limiting.
 /* RECIPIENT SCHEDULING CONTROLS
 /* .ad
 /* .fi

commit 1e2699f44b77f2726ffcfb1f6449bac48ba2d534
Author: Patrik Rak <pat...@raxoft.cz>
Date:   Mon Jun 10 14:13:06 2013 +0200

    Warn about too low slow delivery agent limit.

diff --git a/src/qmgr/qmgr_deliver.c b/src/qmgr/qmgr_deliver.c
index 4e1f75c..6f349f6 100644
--- a/src/qmgr/qmgr_deliver.c
+++ b/src/qmgr/qmgr_deliver.c
@@ -437,6 +437,18 @@ void    qmgr_deliver(QMGR_TRANSPORT *transport, VSTREAM *stream)
 		      qmgr_deliver_update, (char *) entry);
 
     /*
+     * Warn about misconfigured systems.
+     *
+     * Unfortunately, we do not know what the transport delivery agent limit is,
+     * so we can only keep checking this as the delivery agents get maxed out.
+     */
+    if (transport->slow_limit > 0 && transport->slow_limit <= transport->active / 2) {
+	msg_warn("%s%s is too small for %s transport limit - adjusting %s%s",
+	      transport->name, _SLOW_XPORT_LIMIT, transport->name, transport->name, _SLOW_XPORT_LIMIT);
+	transport->slow_limit = transport->active - transport->active / 10;
+    }
+
+    /*
      * Guard against broken systems.
      */
     event_request_timer(qmgr_deliver_abort, (char *) entry, var_daemon_timeout);

Reply via email to