Hello,
attached is my first take on the problem we have discussed few weeks ago -
limiting amount of deferred messages in the active queue and limiting
amount of delivery agents used by presumably "slow" deferred messages.
The patch contains several incremental parts which show how I developed
this and which make it more readable at the same time.
The first half of the patch implements the active queue limit. Few
comments:
- The qmgr_loop() now always round robins the queues. The original version
stopped doing that when active queue was full and incoming mail kept
flowing in. I understand that this was done to prevent the deferred queue
from dominating the active queue, however the downside was that it could
also entirely starve the deferred queue indefinitely. IMO the new
mechanism prevents the active queue from being stalled in much better
way, while remaining fair at the same time.
- Coincidentally, the previous change can now results in more in-flow back
pressure being applied when the active queue becomes full, as deferred
queue doesn't get ignored entirely. So this seems like an improvement, not
a regression which Viktor was afraid of.
- The qmgr_deferred_message_limit is internally used to limit the number
of deferred messages in the active queue. However, I later realized that
from user's point of view, it is perhaps easier if they can instead
specify how much of the active queue has to be left available for the
incoming queue. This has the advantage that they don't have to adjust this
when they increase the active queue size, so the default works better,
too. But then I am not entirely sure how (or if at all) the name of the
config variable shouldn't somehow change, too...
The second half implements the delivery agent limit. Few comments:
- Skipping the slow jobs in qmgr_job_entry_select() turned out not to be
that difficult - the tricky part was of course verifying all the
implications of this change and coming up with relevant adjustments. The
trickiest part was in qmgr_transport_select(), which shall decide whether
transport has some recipient entries ready for delivery. Rather than
duplicating all the window counting logic for slow jobs, I have decided to
use the fact that qmgr can change its mind after contacting the delivery
agent. This is something which already happens now from time to time, so
it seemed like a reasonable solution for a situation which is not time
critical.
- The fact that qmgr doesn't know the transport limit makes some things a
bit difficult. I could not use the "how much has to be left unused"
approach for the slow delivery agent limit, for example, so people have to
adjust this limit when increasing the transport or process limit. I have
also added some warning to make sure people don't set this limit too low,
but it can only trigger after the delivery agents start getting maxed out.
It doesn't detect the case when this limit is set too high either.
I don't know how much of an issue all this is, but it might be worth
considering to somehow let master to pass these limits to qmgr on startup.
And few overall comments:
- I have only added the basic docs to qmgr(8). I didn't regenerate
the manpages, didn't touch the postconf docs, nor any other docs like
postfix tuning. This all can IMO wait until the names and meaning of
the new variables gets finalized.
- Regardless of how much I would like to do that, I didn't have time nor
suitable testing environment to actually test this patch. I did my best to
make sure it is correct, though, and I hope someone here will be kind
enough to put it to test.
Thanks,
Patrik
commit 5f0aac23479274448e722bede9102f1b3decddd4
Author: Patrik Rak <pat...@raxoft.cz>
Date: Thu May 30 15:52:12 2013 +0200
Keep track of orig queue name and number of originally deferred messsages.
diff --git a/src/qmgr/qmgr.h b/src/qmgr/qmgr.h
index 6737e42..4db27c5 100644
--- a/src/qmgr/qmgr.h
+++ b/src/qmgr/qmgr.h
@@ -338,6 +338,7 @@ struct QMGR_MESSAGE {
time_t warn_time; /* time next warning to be sent */
long data_offset; /* data seek offset */
char *queue_name; /* queue name */
+ char *orig_queue_name; /* from which queue this came from */
char *queue_id; /* queue file */
char *encoding; /* content encoding */
char *sender; /* complete address */
@@ -376,12 +377,13 @@ struct QMGR_MESSAGE {
#define QMGR_MESSAGE_LOCKED ((QMGR_MESSAGE *) 1)
extern int qmgr_message_count;
+extern int qmgr_deferred_message_count;
extern int qmgr_recipient_count;
extern void qmgr_message_free(QMGR_MESSAGE *);
extern void qmgr_message_update_warn(QMGR_MESSAGE *);
extern void qmgr_message_kill_record(QMGR_MESSAGE *, long);
-extern QMGR_MESSAGE *qmgr_message_alloc(const char *, const char *, int, mode_t);
+extern QMGR_MESSAGE *qmgr_message_alloc(const char *, const char *, const char *, int, mode_t);
extern QMGR_MESSAGE *qmgr_message_realloc(QMGR_MESSAGE *);
#define QMGR_MSG_STATS(stats, message) \
diff --git a/src/qmgr/qmgr_active.c b/src/qmgr/qmgr_active.c
index d93a2cd..302c0a3 100644
--- a/src/qmgr/qmgr_active.c
+++ b/src/qmgr/qmgr_active.c
@@ -229,7 +229,7 @@ int qmgr_active_feed(QMGR_SCAN *scan_info, const char *queue_id)
*/
#define QMGR_FLUSH_AFTER (QMGR_FLUSH_EACH | QMGR_FLUSH_DFXP)
- if ((message = qmgr_message_alloc(MAIL_QUEUE_ACTIVE, queue_id,
+ if ((message = qmgr_message_alloc(MAIL_QUEUE_ACTIVE, scan_info->queue, queue_id,
(st.st_mode & MAIL_QUEUE_STAT_UNTHROTTLE) ?
scan_info->flags | QMGR_FLUSH_AFTER :
scan_info->flags,
diff --git a/src/qmgr/qmgr_message.c b/src/qmgr/qmgr_message.c
index 576fb2d..a1fcdc9 100644
--- a/src/qmgr/qmgr_message.c
+++ b/src/qmgr/qmgr_message.c
@@ -148,12 +148,13 @@
#include "qmgr.h"
int qmgr_message_count;
+int qmgr_deferred_message_count;
int qmgr_recipient_count;
/* qmgr_message_create - create in-core message structure */
static QMGR_MESSAGE *qmgr_message_create(const char *queue_name,
- const char *queue_id, int qflags)
+ const char *orig_queue_name, const char *queue_id, int qflags)
{
QMGR_MESSAGE *message;
@@ -175,6 +176,9 @@ static QMGR_MESSAGE *qmgr_message_create(const char *queue_name,
message->data_offset = 0;
message->queue_id = mystrdup(queue_id);
message->queue_name = mystrdup(queue_name);
+ message->orig_queue_name = mystrdup(orig_queue_name);
+ if (strcmp(message->orig_queue_name, MAIL_QUEUE_DEFERRED) == 0)
+ qmgr_deferred_message_count++;
message->encoding = 0;
message->sender = 0;
message->dsn_envid = 0;
@@ -1385,8 +1389,11 @@ void qmgr_message_free(QMGR_MESSAGE *message)
msg_panic("qmgr_message_free: queue file is open");
while ((job = message->job_list.next) != 0)
qmgr_job_free(job);
+ if (strcmp(message->orig_queue_name, MAIL_QUEUE_DEFERRED) == 0)
+ qmgr_deferred_message_count--;
myfree(message->queue_id);
myfree(message->queue_name);
+ myfree(message->orig_queue_name);
if (message->dsn_envid)
myfree(message->dsn_envid);
if (message->encoding)
@@ -1428,19 +1435,19 @@ void qmgr_message_free(QMGR_MESSAGE *message)
/* qmgr_message_alloc - create in-core message structure */
-QMGR_MESSAGE *qmgr_message_alloc(const char *queue_name, const char *queue_id,
- int qflags, mode_t mode)
+QMGR_MESSAGE *qmgr_message_alloc(const char *queue_name, const char *orig_queue_name,
+ const char *queue_id, int qflags, mode_t mode)
{
const char *myname = "qmgr_message_alloc";
QMGR_MESSAGE *message;
if (msg_verbose)
- msg_info("%s: %s %s", myname, queue_name, queue_id);
+ msg_info("%s: %s %s (from %s)", myname, queue_name, queue_id, orig_queue_name);
/*
* Create an in-core message structure.
*/
- message = qmgr_message_create(queue_name, queue_id, qflags);
+ message = qmgr_message_create(queue_name, orig_queue_name, queue_id, qflags);
/*
* Extract message envelope information: time of arrival, sender address,
commit 1478f76d6e2b76757e41486d534108a842a36da7
Author: Patrik Rak <pat...@raxoft.cz>
Date: Thu May 30 16:10:06 2013 +0200
Limit amount of deferred messages in active queue.
diff --git a/src/qmgr/qmgr.c b/src/qmgr/qmgr.c
index fbf7dab..a2cea03 100644
--- a/src/qmgr/qmgr.c
+++ b/src/qmgr/qmgr.c
@@ -416,6 +416,7 @@ int var_max_backoff_time;
int var_max_queue_time;
int var_dsn_queue_time;
int var_qmgr_active_limit;
+int var_qmgr_deferred_limit;
int var_qmgr_rcpt_limit;
int var_qmgr_msg_rcpt_limit;
int var_xport_rcpt_limit;
@@ -533,8 +534,7 @@ static int qmgr_loop(char *unused_name, char **unused_argv)
int token_count;
int feed = 0;
int scan_idx; /* Priority order scan index */
- static int first_scan_idx = QMGR_SCAN_IDX_INCOMING;
- int last_scan_idx = QMGR_SCAN_IDX_COUNT - 1;
+ static int last_scan_idx = QMGR_SCAN_IDX_COUNT - 1;
int delay;
/*
@@ -556,7 +556,8 @@ static int qmgr_loop(char *unused_name, char **unused_argv)
/*
* Let some new blood into the active queue when the queue size is
- * smaller than some configurable limit.
+ * smaller than some configurable limit. We round-robin the queue scans,
+ * but eventually prevent the deferred queue from using all resources.
*
* We import one message per interrupt, to optimally tune the input count
* for the number of delivery agent protocol wait states, as explained in
@@ -565,7 +566,10 @@ static int qmgr_loop(char *unused_name, char **unused_argv)
delay = WAIT_FOR_EVENT;
for (scan_idx = 0; qmgr_message_count < var_qmgr_active_limit
&& scan_idx < QMGR_SCAN_IDX_COUNT; ++scan_idx) {
- last_scan_idx = (scan_idx + first_scan_idx) % QMGR_SCAN_IDX_COUNT;
+ last_scan_idx = (last_scan_idx + 1) % QMGR_SCAN_IDX_COUNT;
+ if (last_scan_idx == QMGR_SCAN_IDX_DEFERRED &&
+ qmgr_deferred_message_count >= var_qmgr_deferred_limit)
+ continue;
if ((path = qmgr_scan_next(qmgr_scans[last_scan_idx])) != 0) {
delay = DONT_WAIT;
if ((feed = qmgr_active_feed(qmgr_scans[last_scan_idx], path)) != 0)
@@ -574,16 +578,6 @@ static int qmgr_loop(char *unused_name, char **unused_argv)
}
/*
- * Round-robin the queue scans. When the active queue becomes full,
- * prefer new mail over deferred mail.
- */
- if (qmgr_message_count < var_qmgr_active_limit) {
- first_scan_idx = (last_scan_idx + 1) % QMGR_SCAN_IDX_COUNT;
- } else if (first_scan_idx != QMGR_SCAN_IDX_INCOMING) {
- first_scan_idx = QMGR_SCAN_IDX_INCOMING;
- }
-
- /*
* Global flow control. If enabled, slow down receiving processes that
* get ahead of the queue manager, but don't block them completely.
*/
commit 66c699c5ef543044a6ea1d753347cfad915acdfa
Author: Patrik Rak <pat...@raxoft.cz>
Date: Thu May 30 16:50:52 2013 +0200
Implement setting of the newly introduced limit.
diff --git a/src/global/mail_params.h b/src/global/mail_params.h
index 1e368f8..cbcf826 100644
--- a/src/global/mail_params.h
+++ b/src/global/mail_params.h
@@ -721,6 +721,10 @@ extern int var_delay_warn_time;
#define DEF_QMGR_ACT_LIMIT 20000
extern int var_qmgr_active_limit;
+#define VAR_QMGR_DFR_LIMIT "qmgr_message_deferred_limit"
+#define DEF_QMGR_DFR_LIMIT 1000
+extern int var_qmgr_deferred_limit;
+
#define VAR_QMGR_RCPT_LIMIT "qmgr_message_recipient_limit"
#define DEF_QMGR_RCPT_LIMIT 20000
extern int var_qmgr_rcpt_limit;
diff --git a/src/qmgr/qmgr.c b/src/qmgr/qmgr.c
index a2cea03..f008867 100644
--- a/src/qmgr/qmgr.c
+++ b/src/qmgr/qmgr.c
@@ -175,6 +175,8 @@
/* clogging up the Postfix active queue.
/* .IP "\fBqmgr_message_active_limit (20000)\fR"
/* The maximal number of messages in the active queue.
+/* .IP "\fBqmgr_message_deferred_limit (1000)\fR"
+/* How much of the active queue must be left reserved for messages from the incoming queue.
/* .IP "\fBqmgr_message_recipient_limit (20000)\fR"
/* The maximal number of recipients held in memory by the Postfix
/* queue manager, and the maximal size of the short-term,
@@ -632,6 +634,12 @@ static void qmgr_post_init(char *name, char **unused_argv)
/*
* Sanity check.
*/
+ var_qmgr_deferred_limit = var_qmgr_active_limit - var_qmgr_deferred_limit;
+ if (var_qmgr_deferred_limit <= var_qmgr_active_limit / 4) {
+ msg_warn("%s is too large for %s - adjusting %s",
+ VAR_QMGR_DFR_LIMIT, VAR_QMGR_ACT_LIMIT, VAR_QMGR_DFR_LIMIT);
+ var_qmgr_deferred_limit = var_qmgr_active_limit - var_qmgr_active_limit / 10;
+ }
if (var_qmgr_rcpt_limit < var_qmgr_active_limit) {
msg_warn("%s is smaller than %s - adjusting %s",
VAR_QMGR_RCPT_LIMIT, VAR_QMGR_ACT_LIMIT, VAR_QMGR_RCPT_LIMIT);
@@ -696,6 +704,7 @@ int main(int argc, char **argv)
};
static const CONFIG_INT_TABLE int_table[] = {
VAR_QMGR_ACT_LIMIT, DEF_QMGR_ACT_LIMIT, &var_qmgr_active_limit, 1, 0,
+ VAR_QMGR_DFR_LIMIT, DEF_QMGR_DFR_LIMIT, &var_qmgr_deferred_limit, 0, 0,
VAR_QMGR_RCPT_LIMIT, DEF_QMGR_RCPT_LIMIT, &var_qmgr_rcpt_limit, 1, 0,
VAR_QMGR_MSG_RCPT_LIMIT, DEF_QMGR_MSG_RCPT_LIMIT, &var_qmgr_msg_rcpt_limit, 1, 0,
VAR_XPORT_RCPT_LIMIT, DEF_XPORT_RCPT_LIMIT, &var_xport_rcpt_limit, 0, 0,
commit 155cf58c5d679a732d7df2b90e4eb5d064f2fcab
Author: Patrik Rak <pat...@raxoft.cz>
Date: Mon Jun 10 13:16:30 2013 +0200
Prevent "slow" messages from using all delivery agents.
diff --git a/src/global/mail_params.h b/src/global/mail_params.h
index cbcf826..5ff08b2 100644
--- a/src/global/mail_params.h
+++ b/src/global/mail_params.h
@@ -810,6 +810,18 @@ extern int var_dest_rcpt_limit;
extern int var_local_rcpt_lim;
/*
+ * Queue manager: default limits for "slow" messages.
+ */
+#define VAR_SLOW_XPORT_LIMIT "default_slow_transport_limit"
+#define _SLOW_XPORT_LIMIT "_slow_transport_limit"
+#define DEF_SLOW_XPORT_LIMIT (DEF_PROC_LIMIT - DEF_PROC_LIMIT / 10)
+extern int var_slow_xport_limit;
+
+#define VAR_SLOW_MSG_TIME "slow_message_time_limit"
+#define DEF_SLOW_MSG_TIME "600s"
+extern int var_slow_msg_time;
+
+ /*
* Queue manager: default delay before retrying a dead transport.
*/
#define VAR_XPORT_RETRY_TIME "transport_retry_time"
diff --git a/src/qmgr/qmgr.c b/src/qmgr/qmgr.c
index f008867..ccdb551 100644
--- a/src/qmgr/qmgr.c
+++ b/src/qmgr/qmgr.c
@@ -429,6 +429,8 @@ int var_delivery_slot_cost;
int var_delivery_slot_loan;
int var_delivery_slot_discount;
int var_min_delivery_slots;
+int var_slow_xport_limit;
+int var_slow_msg_time;
int var_init_dest_concurrency;
int var_transport_retry_time;
int var_dest_con_limit;
@@ -694,6 +696,7 @@ int main(int argc, char **argv)
VAR_MAX_BACKOFF_TIME, DEF_MAX_BACKOFF_TIME, &var_max_backoff_time, 1, 0,
VAR_MAX_QUEUE_TIME, DEF_MAX_QUEUE_TIME, &var_max_queue_time, 0, 8640000,
VAR_DSN_QUEUE_TIME, DEF_DSN_QUEUE_TIME, &var_dsn_queue_time, 0, 8640000,
+ VAR_SLOW_MSG_TIME, DEF_SLOW_MSG_TIME, &var_slow_msg_time, 0, 0,
VAR_XPORT_RETRY_TIME, DEF_XPORT_RETRY_TIME, &var_transport_retry_time, 1, 0,
VAR_QMGR_CLOG_WARN_TIME, DEF_QMGR_CLOG_WARN_TIME, &var_qmgr_clog_warn_time, 0, 0,
VAR_XPORT_REFILL_DELAY, DEF_XPORT_REFILL_DELAY, &var_xport_refill_delay, 1, 0,
@@ -714,6 +717,7 @@ int main(int argc, char **argv)
VAR_DELIVERY_SLOT_LOAN, DEF_DELIVERY_SLOT_LOAN, &var_delivery_slot_loan, 0, 0,
VAR_DELIVERY_SLOT_DISCOUNT, DEF_DELIVERY_SLOT_DISCOUNT, &var_delivery_slot_discount, 0, 100,
VAR_MIN_DELIVERY_SLOTS, DEF_MIN_DELIVERY_SLOTS, &var_min_delivery_slots, 0, 0,
+ VAR_SLOW_XPORT_LIMIT, DEF_SLOW_XPORT_LIMIT, &var_slow_xport_limit, 0, 0,
VAR_INIT_DEST_CON, DEF_INIT_DEST_CON, &var_init_dest_concurrency, 1, 0,
VAR_DEST_CON_LIMIT, DEF_DEST_CON_LIMIT, &var_dest_con_limit, 0, 0,
VAR_DEST_RCPT_LIMIT, DEF_DEST_RCPT_LIMIT, &var_dest_rcpt_limit, 0, 0,
diff --git a/src/qmgr/qmgr.h b/src/qmgr/qmgr.h
index 4db27c5..2036c8a 100644
--- a/src/qmgr/qmgr.h
+++ b/src/qmgr/qmgr.h
@@ -169,6 +169,9 @@ struct QMGR_JOB_LIST {
struct QMGR_TRANSPORT {
int flags; /* blocked, etc. */
int pending; /* incomplete DA connections */
+ int active; /* active DA connections */
+ int slow_active; /* active slow DA connections */
+ int slow_limit; /* limit for slow DA connections */
char *name; /* transport name */
int dest_concurrency_limit; /* concurrency per domain */
int init_dest_concurrency; /* init. per-domain concurrency */
@@ -383,6 +386,7 @@ extern int qmgr_recipient_count;
extern void qmgr_message_free(QMGR_MESSAGE *);
extern void qmgr_message_update_warn(QMGR_MESSAGE *);
extern void qmgr_message_kill_record(QMGR_MESSAGE *, long);
+extern int qmgr_message_is_slow(QMGR_MESSAGE *);
extern QMGR_MESSAGE *qmgr_message_alloc(const char *, const char *, const char *, int, mode_t);
extern QMGR_MESSAGE *qmgr_message_realloc(QMGR_MESSAGE *);
@@ -412,6 +416,7 @@ struct QMGR_JOB {
QMGR_JOB *stack_parent; /* stack parent */
QMGR_JOB_LIST stack_children; /* all stack children */
QMGR_JOB_LIST stack_siblings; /* stack children linkage */
+ int flags; /* various flags */
int stack_level; /* job stack nesting level (-1 means
* it's not on the lists at all) */
int blocker_tag; /* tagged if blocks the job list */
@@ -437,6 +442,10 @@ struct QMGR_PEER {
QMGR_PEER_LIST peers; /* neighbor linkage */
};
+#define QMGR_JOB_SLOW_FLAG (1<<0) /* job is marked as slow */
+
+#define QMGR_JOB_SLOW(job) (((job)->flags & QMGR_JOB_SLOW_FLAG) != 0)
+
extern QMGR_ENTRY *qmgr_job_entry_select(QMGR_TRANSPORT *);
extern QMGR_PEER *qmgr_peer_select(QMGR_JOB *);
extern void qmgr_job_blocker_update(QMGR_QUEUE *);
diff --git a/src/qmgr/qmgr_deliver.c b/src/qmgr/qmgr_deliver.c
index 2fbb049..4e1f75c 100644
--- a/src/qmgr/qmgr_deliver.c
+++ b/src/qmgr/qmgr_deliver.c
@@ -223,6 +223,7 @@ static void qmgr_deliver_update(int unused_event, char *context)
QMGR_QUEUE *queue = entry->queue;
QMGR_TRANSPORT *transport = queue->transport;
QMGR_MESSAGE *message = entry->message;
+ QMGR_JOB *job = entry->peer->job;
static DSN_BUF *dsb;
int status;
@@ -234,12 +235,22 @@ static void qmgr_deliver_update(int unused_event, char *context)
(void) vstream_fclose(entry->stream); \
entry->stream = 0; \
qmgr_deliver_concurrency--; \
+ transport->active--; \
+ if (QMGR_JOB_SLOW(job)) \
+ transport->slow_active--; \
} while (0)
if (dsb == 0)
dsb = dsb_create();
/*
+ * If this was one of the slow messages while the slow delivery agents
+ * were maxed out, let the transport know that it can now try sending more.
+ */
+ if (QMGR_JOB_SLOW(job) && transport->slow_active == transport->slow_limit)
+ transport->job_current = transport->job_list.next;
+
+ /*
* The message transport has responded. Stop the watchdog timer.
*/
event_cancel_timer(qmgr_deliver_abort, context);
@@ -418,6 +429,9 @@ void qmgr_deliver(QMGR_TRANSPORT *transport, VSTREAM *stream)
* If we get this far, go wait for the delivery status report.
*/
qmgr_deliver_concurrency++;
+ transport->active++;
+ if (QMGR_JOB_SLOW(entry->peer->job))
+ transport->slow_active++;
entry->stream = stream;
event_enable_read(vstream_fileno(stream),
qmgr_deliver_update, (char *) entry);
diff --git a/src/qmgr/qmgr_job.c b/src/qmgr/qmgr_job.c
index 7de70f2..7681986 100644
--- a/src/qmgr/qmgr_job.c
+++ b/src/qmgr/qmgr_job.c
@@ -103,6 +103,9 @@ static QMGR_JOB *qmgr_job_create(QMGR_MESSAGE *message, QMGR_TRANSPORT *transpor
job = (QMGR_JOB *) mymalloc(sizeof(QMGR_JOB));
job->message = message;
+ job->flags = 0;
+ if (qmgr_message_is_slow(message))
+ job->flags |= QMGR_JOB_SLOW_FLAG;
QMGR_LIST_APPEND(message->job_list, job, message_peers);
htable_enter(transport->job_byname, message->queue_id, (char *) job);
job->transport = transport;
@@ -873,6 +876,13 @@ QMGR_ENTRY *qmgr_job_entry_select(QMGR_TRANSPORT *transport)
if (IS_BLOCKER(job, transport))
continue;
+ /*
+ * Skip any slow jobs once we have reached the transport limit for slow jobs.
+ */
+ if (QMGR_JOB_SLOW(job) &&
+ (transport->slow_limit > 0 && transport->slow_active >= transport->slow_limit))
+ continue;
+
if ((peer = qmgr_job_peer_select(job)) != 0) {
/*
@@ -939,7 +949,8 @@ QMGR_ENTRY *qmgr_job_entry_select(QMGR_TRANSPORT *transport)
/*
* We have not found any entry we could use for delivery. Well, things
* must have changed since this transport was selected for asynchronous
- * allocation. Never mind. Clear the current job pointer and reluctantly
+ * allocation. Or there are only slow jobs and we have hit the limit.
+ * Never mind. Clear the current job pointer and reluctantly
* report back that we have failed in our task.
*/
transport->job_current = 0;
diff --git a/src/qmgr/qmgr_message.c b/src/qmgr/qmgr_message.c
index a1fcdc9..a4e47d0 100644
--- a/src/qmgr/qmgr_message.c
+++ b/src/qmgr/qmgr_message.c
@@ -27,6 +27,9 @@
/* void qmgr_message_kill_record(message, offset)
/* QMGR_MESSAGE *message;
/* long offset;
+/*
+/* int qmgr_message_is_slow(message)
+/* QMGR_MESSAGE *message;
/* DESCRIPTION
/* This module performs en-gross operations on queue messages.
/*
@@ -71,6 +74,10 @@
/* the record type at the given offset to "killed", and closes the file.
/* A killed envelope record is ignored. Killed records are not allowed
/* inside the message content.
+/*
+/* qmgr_message_is_slow() tests if given message is considered slow.
+/* Currently, messages coming from deferred queue which are older
+/* than \fIslow_message_time_limit\fR are considered slow.
/* DIAGNOSTICS
/* Warnings: malformed message file. Fatal errors: out of memory.
/* SEE ALSO
@@ -218,6 +225,17 @@ static void qmgr_message_close(QMGR_MESSAGE *message)
message->fp = 0;
}
+/* qmgr_message_is_slow - recognize slow messages */
+
+int qmgr_message_is_slow(QMGR_MESSAGE *message)
+{
+ if (strcmp(message->orig_queue_name, MAIL_QUEUE_DEFERRED) != 0)
+ return (0);
+ if (message->queued_time - message->create_time < var_slow_msg_time)
+ return (0);
+ return (1);
+}
+
/* qmgr_message_open - open queue file */
static int qmgr_message_open(QMGR_MESSAGE *message)
diff --git a/src/qmgr/qmgr_transport.c b/src/qmgr/qmgr_transport.c
index 434d75e..befbf82 100644
--- a/src/qmgr/qmgr_transport.c
+++ b/src/qmgr/qmgr_transport.c
@@ -282,11 +282,18 @@ QMGR_TRANSPORT *qmgr_transport_select(void)
* the number of pending delivery agent connections, until all delivery
* agent concurrency windows are maxed out, or until we run out of "todo"
* queue entries.
+ *
+ * XXX Rather than duplicating the window testing logic below for slow jobs,
+ * we allow the transport to give it a try first and only back off after it
+ * finds out it has no but slow jobs available. As it only happens when the slow
+ * messages max out all delivery agents without anything else to be delivered,
+ * it's not critical.
*/
#define MIN5af51743e4eef(x, y) ((x) < (y) ? (x) : (y))
for (xport = qmgr_transport_list.next; xport; xport = xport->peers.next) {
if ((xport->flags & QMGR_TRANSPORT_STAT_DEAD) != 0
+ || xport->job_current == 0
|| xport->pending >= QMGR_TRANSPORT_MAX_PEND)
continue;
need = xport->pending + 1;
@@ -378,11 +385,16 @@ QMGR_TRANSPORT *qmgr_transport_create(const char *name)
transport = (QMGR_TRANSPORT *) mymalloc(sizeof(QMGR_TRANSPORT));
transport->flags = 0;
transport->pending = 0;
+ transport->active = 0;
+ transport->slow_active = 0;
transport->name = mystrdup(name);
/*
* Use global configuration settings or transport-specific settings.
*/
+ transport->slow_limit =
+ get_mail_conf_int2(name, _SLOW_XPORT_LIMIT,
+ var_slow_xport_limit, 0, 0);
transport->dest_concurrency_limit =
get_mail_conf_int2(name, _DEST_CON_LIMIT,
var_dest_con_limit, 0, 0);
commit 8700ffb9c0f4f35136ac297f4ca9563d95846fa8
Author: Patrik Rak <pat...@raxoft.cz>
Date: Mon Jun 10 13:56:59 2013 +0200
Added docs.
diff --git a/src/qmgr/qmgr.c b/src/qmgr/qmgr.c
index ccdb551..4f061c6 100644
--- a/src/qmgr/qmgr.c
+++ b/src/qmgr/qmgr.c
@@ -175,8 +175,6 @@
/* clogging up the Postfix active queue.
/* .IP "\fBqmgr_message_active_limit (20000)\fR"
/* The maximal number of messages in the active queue.
-/* .IP "\fBqmgr_message_deferred_limit (1000)\fR"
-/* How much of the active queue must be left reserved for messages from the incoming queue.
/* .IP "\fBqmgr_message_recipient_limit (20000)\fR"
/* The maximal number of recipients held in memory by the Postfix
/* queue manager, and the maximal size of the short-term,
@@ -204,6 +202,10 @@
/* The default per-transport maximum delay between recipients refills.
/* .IP "\fItransport\fB_recipient_refill_delay ($default_recipient_refill_delay)\fR"
/* Idem, for delivery via the named message \fItransport\fR.
+/* .PP
+/* Available in Postfix version 2.11 and later:
+/* .IP "\fBqmgr_message_deferred_limit (1000)\fR"
+/* How much of the active queue must be left available for messages from the incoming queue.
/* DELIVERY CONCURRENCY CONTROLS
/* .ad
/* .fi
@@ -241,6 +243,16 @@
/* .IP "\fBdestination_concurrency_feedback_debug (no)\fR"
/* Make the queue manager's feedback algorithm verbose for performance
/* analysis purposes.
+/* .PP
+/* Available in Postfix version 2.11 and later:
+/* .IP "\fBdefault_slow_transport_limit (90)\fR"
+/* Deferred messages older than \fBslow_message_time_limit\fR are prevented
+/* from using more than this many delivery agents. 0 means no limit.
+/* .IP "\fItransport\fB_slow_transport_limit ($default_slow_transport_limit)\fR"
+/* Idem, for delivery via the named message \fItransport\fR.
+/* .IP "\fBslow_message_time_limit (600s)\fR"
+/* Deferred messages older than this are considered "slow"
+/* and are subject to \fItransport\fB_slow_transport_limit\fR limiting.
/* RECIPIENT SCHEDULING CONTROLS
/* .ad
/* .fi
commit 1e2699f44b77f2726ffcfb1f6449bac48ba2d534
Author: Patrik Rak <pat...@raxoft.cz>
Date: Mon Jun 10 14:13:06 2013 +0200
Warn about too low slow delivery agent limit.
diff --git a/src/qmgr/qmgr_deliver.c b/src/qmgr/qmgr_deliver.c
index 4e1f75c..6f349f6 100644
--- a/src/qmgr/qmgr_deliver.c
+++ b/src/qmgr/qmgr_deliver.c
@@ -437,6 +437,18 @@ void qmgr_deliver(QMGR_TRANSPORT *transport, VSTREAM *stream)
qmgr_deliver_update, (char *) entry);
/*
+ * Warn about misconfigured systems.
+ *
+ * Unfortunately, we do not know what the transport delivery agent limit is,
+ * so we can only keep checking this as the delivery agents get maxed out.
+ */
+ if (transport->slow_limit > 0 && transport->slow_limit <= transport->active / 2) {
+ msg_warn("%s%s is too small for %s transport limit - adjusting %s%s",
+ transport->name, _SLOW_XPORT_LIMIT, transport->name, transport->name, _SLOW_XPORT_LIMIT);
+ transport->slow_limit = transport->active - transport->active / 10;
+ }
+
+ /*
* Guard against broken systems.
*/
event_request_timer(qmgr_deliver_abort, (char *) entry, var_daemon_timeout);