On 10.6.2013 19:28, Patrik Rak wrote:
attached is my first take on the problem we have discussed few weeks ago - limiting amount of deferred messages in the active queue and limiting amount of delivery agents used by presumably "slow" deferred messages.
Here is an updated version. I have added the possibility to mark too large messages as possibly slow, and made both the size and deferred time limits configurable per transport at the same time...
With this in place, qmgr shall be much more resistant when it comes to congestion.
Patrik
diff --git a/src/global/mail_params.h b/src/global/mail_params.h index 1e368f8..e6d3d71 100644 --- a/src/global/mail_params.h +++ b/src/global/mail_params.h @@ -721,6 +721,10 @@ extern int var_delay_warn_time; #define DEF_QMGR_ACT_LIMIT 20000 extern int var_qmgr_active_limit; +#define VAR_QMGR_DFR_LIMIT "qmgr_message_deferred_limit" +#define DEF_QMGR_DFR_LIMIT 1000 +extern int var_qmgr_deferred_limit; + #define VAR_QMGR_RCPT_LIMIT "qmgr_message_recipient_limit" #define DEF_QMGR_RCPT_LIMIT 20000 extern int var_qmgr_rcpt_limit; @@ -806,6 +810,24 @@ extern int var_dest_rcpt_limit; extern int var_local_rcpt_lim; /* + * Queue manager: default limits for "slow" messages. + */ +#define VAR_SLOW_XPORT_LIMIT "default_slow_transport_limit" +#define _SLOW_XPORT_LIMIT "_slow_transport_limit" +#define DEF_SLOW_XPORT_LIMIT (DEF_PROC_LIMIT - DEF_PROC_LIMIT / 10) +extern int var_slow_xport_limit; + +#define VAR_SLOW_MSG_TIME "default_slow_message_time_limit" +#define _SLOW_MSG_TIME "_slow_message_time_limit" +#define DEF_SLOW_MSG_TIME "600s" +extern int var_slow_msg_time; + +#define VAR_SLOW_MSG_SIZE "default_slow_message_size_limit" +#define _SLOW_MSG_SIZE "_slow_message_size_limit" +#define DEF_SLOW_MSG_SIZE 1024000 +extern int var_slow_msg_size; + + /* * Queue manager: default delay before retrying a dead transport. */ #define VAR_XPORT_RETRY_TIME "transport_retry_time" diff --git a/src/qmgr/qmgr.c b/src/qmgr/qmgr.c index fbf7dab..3a9e1e2 100644 --- a/src/qmgr/qmgr.c +++ b/src/qmgr/qmgr.c @@ -202,6 +202,10 @@ /* The default per-transport maximum delay between recipients refills. /* .IP "\fItransport\fB_recipient_refill_delay ($default_recipient_refill_delay)\fR" /* Idem, for delivery via the named message \fItransport\fR. +/* .PP +/* Available in Postfix version 2.11 and later: +/* .IP "\fBqmgr_message_deferred_limit (1000)\fR" +/* How much of the active queue must be left available for messages from the incoming queue. /* DELIVERY CONCURRENCY CONTROLS /* .ad /* .fi @@ -239,6 +243,24 @@ /* .IP "\fBdestination_concurrency_feedback_debug (no)\fR" /* Make the queue manager's feedback algorithm verbose for performance /* analysis purposes. +/* .PP +/* Available in Postfix version 2.11 and later: +/* .IP "\fBdefault_slow_transport_limit (90)\fR" +/* Deferred messages older than \fItransport\fB_slow_message_time_limit\fR +/* and messages larger than \fItransport\fB_slow_message_size_limit\fR +/* are prevented from using more than this many delivery agents. 0 means no limit. +/* .IP "\fItransport\fB_slow_transport_limit ($default_slow_transport_limit)\fR" +/* Idem, for delivery via the named message \fItransport\fR. +/* .IP "\fBdefault_slow_message_time_limit (600s)\fR" +/* Deferred messages older than this are considered "slow" +/* and are subject to \fItransport\fB_slow_transport_limit\fR limiting. +/* .IP "\fItransport\fB_slow_message_time_limit ($default_slow_message_time_limit)\fR" +/* Idem, for delivery via the named message \fItransport\fR. +/* .IP "\fBdefault_slow_message_size_limit (1024000)\fR" +/* Messages larger than this are considered "slow" +/* and are subject to \fItransport\fB_slow_transport_limit\fR limiting. 0 means no limit. +/* .IP "\fItransport\fB_slow_message_size_limit ($default_slow_message_size_limit)\fR" +/* Idem, for delivery via the named message \fItransport\fR. /* RECIPIENT SCHEDULING CONTROLS /* .ad /* .fi @@ -416,6 +438,7 @@ int var_max_backoff_time; int var_max_queue_time; int var_dsn_queue_time; int var_qmgr_active_limit; +int var_qmgr_deferred_limit; int var_qmgr_rcpt_limit; int var_qmgr_msg_rcpt_limit; int var_xport_rcpt_limit; @@ -426,6 +449,9 @@ int var_delivery_slot_cost; int var_delivery_slot_loan; int var_delivery_slot_discount; int var_min_delivery_slots; +int var_slow_xport_limit; +int var_slow_msg_time; +int var_slow_msg_size; int var_init_dest_concurrency; int var_transport_retry_time; int var_dest_con_limit; @@ -533,8 +559,7 @@ static int qmgr_loop(char *unused_name, char **unused_argv) int token_count; int feed = 0; int scan_idx; /* Priority order scan index */ - static int first_scan_idx = QMGR_SCAN_IDX_INCOMING; - int last_scan_idx = QMGR_SCAN_IDX_COUNT - 1; + static int last_scan_idx = QMGR_SCAN_IDX_COUNT - 1; int delay; /* @@ -556,7 +581,8 @@ static int qmgr_loop(char *unused_name, char **unused_argv) /* * Let some new blood into the active queue when the queue size is - * smaller than some configurable limit. + * smaller than some configurable limit. We round-robin the queue scans, + * but eventually prevent the deferred queue from using all resources. * * We import one message per interrupt, to optimally tune the input count * for the number of delivery agent protocol wait states, as explained in @@ -565,7 +591,10 @@ static int qmgr_loop(char *unused_name, char **unused_argv) delay = WAIT_FOR_EVENT; for (scan_idx = 0; qmgr_message_count < var_qmgr_active_limit && scan_idx < QMGR_SCAN_IDX_COUNT; ++scan_idx) { - last_scan_idx = (scan_idx + first_scan_idx) % QMGR_SCAN_IDX_COUNT; + last_scan_idx = (last_scan_idx + 1) % QMGR_SCAN_IDX_COUNT; + if (last_scan_idx == QMGR_SCAN_IDX_DEFERRED && + qmgr_deferred_message_count >= var_qmgr_deferred_limit) + continue; if ((path = qmgr_scan_next(qmgr_scans[last_scan_idx])) != 0) { delay = DONT_WAIT; if ((feed = qmgr_active_feed(qmgr_scans[last_scan_idx], path)) != 0) @@ -574,16 +603,6 @@ static int qmgr_loop(char *unused_name, char **unused_argv) } /* - * Round-robin the queue scans. When the active queue becomes full, - * prefer new mail over deferred mail. - */ - if (qmgr_message_count < var_qmgr_active_limit) { - first_scan_idx = (last_scan_idx + 1) % QMGR_SCAN_IDX_COUNT; - } else if (first_scan_idx != QMGR_SCAN_IDX_INCOMING) { - first_scan_idx = QMGR_SCAN_IDX_INCOMING; - } - - /* * Global flow control. If enabled, slow down receiving processes that * get ahead of the queue manager, but don't block them completely. */ @@ -638,6 +657,12 @@ static void qmgr_post_init(char *name, char **unused_argv) /* * Sanity check. */ + var_qmgr_deferred_limit = var_qmgr_active_limit - var_qmgr_deferred_limit; + if (var_qmgr_deferred_limit <= var_qmgr_active_limit / 4) { + msg_warn("%s is too large for %s - adjusting %s", + VAR_QMGR_DFR_LIMIT, VAR_QMGR_ACT_LIMIT, VAR_QMGR_DFR_LIMIT); + var_qmgr_deferred_limit = var_qmgr_active_limit - var_qmgr_active_limit / 10; + } if (var_qmgr_rcpt_limit < var_qmgr_active_limit) { msg_warn("%s is smaller than %s - adjusting %s", VAR_QMGR_RCPT_LIMIT, VAR_QMGR_ACT_LIMIT, VAR_QMGR_RCPT_LIMIT); @@ -692,6 +717,7 @@ int main(int argc, char **argv) VAR_MAX_BACKOFF_TIME, DEF_MAX_BACKOFF_TIME, &var_max_backoff_time, 1, 0, VAR_MAX_QUEUE_TIME, DEF_MAX_QUEUE_TIME, &var_max_queue_time, 0, 8640000, VAR_DSN_QUEUE_TIME, DEF_DSN_QUEUE_TIME, &var_dsn_queue_time, 0, 8640000, + VAR_SLOW_MSG_TIME, DEF_SLOW_MSG_TIME, &var_slow_msg_time, 0, 0, VAR_XPORT_RETRY_TIME, DEF_XPORT_RETRY_TIME, &var_transport_retry_time, 1, 0, VAR_QMGR_CLOG_WARN_TIME, DEF_QMGR_CLOG_WARN_TIME, &var_qmgr_clog_warn_time, 0, 0, VAR_XPORT_REFILL_DELAY, DEF_XPORT_REFILL_DELAY, &var_xport_refill_delay, 1, 0, @@ -702,6 +728,7 @@ int main(int argc, char **argv) }; static const CONFIG_INT_TABLE int_table[] = { VAR_QMGR_ACT_LIMIT, DEF_QMGR_ACT_LIMIT, &var_qmgr_active_limit, 1, 0, + VAR_QMGR_DFR_LIMIT, DEF_QMGR_DFR_LIMIT, &var_qmgr_deferred_limit, 0, 0, VAR_QMGR_RCPT_LIMIT, DEF_QMGR_RCPT_LIMIT, &var_qmgr_rcpt_limit, 1, 0, VAR_QMGR_MSG_RCPT_LIMIT, DEF_QMGR_MSG_RCPT_LIMIT, &var_qmgr_msg_rcpt_limit, 1, 0, VAR_XPORT_RCPT_LIMIT, DEF_XPORT_RCPT_LIMIT, &var_xport_rcpt_limit, 0, 0, @@ -711,6 +738,8 @@ int main(int argc, char **argv) VAR_DELIVERY_SLOT_LOAN, DEF_DELIVERY_SLOT_LOAN, &var_delivery_slot_loan, 0, 0, VAR_DELIVERY_SLOT_DISCOUNT, DEF_DELIVERY_SLOT_DISCOUNT, &var_delivery_slot_discount, 0, 100, VAR_MIN_DELIVERY_SLOTS, DEF_MIN_DELIVERY_SLOTS, &var_min_delivery_slots, 0, 0, + VAR_SLOW_XPORT_LIMIT, DEF_SLOW_XPORT_LIMIT, &var_slow_xport_limit, 0, 0, + VAR_SLOW_MSG_SIZE, DEF_SLOW_MSG_SIZE, &var_slow_msg_size, 0, 0, VAR_INIT_DEST_CON, DEF_INIT_DEST_CON, &var_init_dest_concurrency, 1, 0, VAR_DEST_CON_LIMIT, DEF_DEST_CON_LIMIT, &var_dest_con_limit, 0, 0, VAR_DEST_RCPT_LIMIT, DEF_DEST_RCPT_LIMIT, &var_dest_rcpt_limit, 0, 0, diff --git a/src/qmgr/qmgr.h b/src/qmgr/qmgr.h index 6737e42..861c738 100644 --- a/src/qmgr/qmgr.h +++ b/src/qmgr/qmgr.h @@ -169,6 +169,11 @@ struct QMGR_JOB_LIST { struct QMGR_TRANSPORT { int flags; /* blocked, etc. */ int pending; /* incomplete DA connections */ + int active; /* active DA connections */ + int slow_active; /* active slow DA connections */ + int slow_limit; /* limit for slow DA connections */ + int slow_msg_time; /* how old deferred messages are "slow" */ + int slow_msg_size; /* how large messages are "slow" */ char *name; /* transport name */ int dest_concurrency_limit; /* concurrency per domain */ int init_dest_concurrency; /* init. per-domain concurrency */ @@ -338,6 +343,7 @@ struct QMGR_MESSAGE { time_t warn_time; /* time next warning to be sent */ long data_offset; /* data seek offset */ char *queue_name; /* queue name */ + char *orig_queue_name; /* from which queue this came from */ char *queue_id; /* queue file */ char *encoding; /* content encoding */ char *sender; /* complete address */ @@ -376,12 +382,14 @@ struct QMGR_MESSAGE { #define QMGR_MESSAGE_LOCKED ((QMGR_MESSAGE *) 1) extern int qmgr_message_count; +extern int qmgr_deferred_message_count; extern int qmgr_recipient_count; extern void qmgr_message_free(QMGR_MESSAGE *); extern void qmgr_message_update_warn(QMGR_MESSAGE *); extern void qmgr_message_kill_record(QMGR_MESSAGE *, long); -extern QMGR_MESSAGE *qmgr_message_alloc(const char *, const char *, int, mode_t); +extern int qmgr_message_is_slow(QMGR_MESSAGE *, QMGR_TRANSPORT *); +extern QMGR_MESSAGE *qmgr_message_alloc(const char *, const char *, const char *, int, mode_t); extern QMGR_MESSAGE *qmgr_message_realloc(QMGR_MESSAGE *); #define QMGR_MSG_STATS(stats, message) \ @@ -410,6 +418,7 @@ struct QMGR_JOB { QMGR_JOB *stack_parent; /* stack parent */ QMGR_JOB_LIST stack_children; /* all stack children */ QMGR_JOB_LIST stack_siblings; /* stack children linkage */ + int flags; /* various flags */ int stack_level; /* job stack nesting level (-1 means * it's not on the lists at all) */ int blocker_tag; /* tagged if blocks the job list */ @@ -435,6 +444,10 @@ struct QMGR_PEER { QMGR_PEER_LIST peers; /* neighbor linkage */ }; +#define QMGR_JOB_SLOW_FLAG (1<<0) /* job is marked as slow */ + +#define QMGR_JOB_SLOW(job) (((job)->flags & QMGR_JOB_SLOW_FLAG) != 0) + extern QMGR_ENTRY *qmgr_job_entry_select(QMGR_TRANSPORT *); extern QMGR_PEER *qmgr_peer_select(QMGR_JOB *); extern void qmgr_job_blocker_update(QMGR_QUEUE *); diff --git a/src/qmgr/qmgr_active.c b/src/qmgr/qmgr_active.c index d93a2cd..302c0a3 100644 --- a/src/qmgr/qmgr_active.c +++ b/src/qmgr/qmgr_active.c @@ -229,7 +229,7 @@ int qmgr_active_feed(QMGR_SCAN *scan_info, const char *queue_id) */ #define QMGR_FLUSH_AFTER (QMGR_FLUSH_EACH | QMGR_FLUSH_DFXP) - if ((message = qmgr_message_alloc(MAIL_QUEUE_ACTIVE, queue_id, + if ((message = qmgr_message_alloc(MAIL_QUEUE_ACTIVE, scan_info->queue, queue_id, (st.st_mode & MAIL_QUEUE_STAT_UNTHROTTLE) ? scan_info->flags | QMGR_FLUSH_AFTER : scan_info->flags, diff --git a/src/qmgr/qmgr_deliver.c b/src/qmgr/qmgr_deliver.c index 2fbb049..6f349f6 100644 --- a/src/qmgr/qmgr_deliver.c +++ b/src/qmgr/qmgr_deliver.c @@ -223,6 +223,7 @@ static void qmgr_deliver_update(int unused_event, char *context) QMGR_QUEUE *queue = entry->queue; QMGR_TRANSPORT *transport = queue->transport; QMGR_MESSAGE *message = entry->message; + QMGR_JOB *job = entry->peer->job; static DSN_BUF *dsb; int status; @@ -234,12 +235,22 @@ static void qmgr_deliver_update(int unused_event, char *context) (void) vstream_fclose(entry->stream); \ entry->stream = 0; \ qmgr_deliver_concurrency--; \ + transport->active--; \ + if (QMGR_JOB_SLOW(job)) \ + transport->slow_active--; \ } while (0) if (dsb == 0) dsb = dsb_create(); /* + * If this was one of the slow messages while the slow delivery agents + * were maxed out, let the transport know that it can now try sending more. + */ + if (QMGR_JOB_SLOW(job) && transport->slow_active == transport->slow_limit) + transport->job_current = transport->job_list.next; + + /* * The message transport has responded. Stop the watchdog timer. */ event_cancel_timer(qmgr_deliver_abort, context); @@ -418,11 +429,26 @@ void qmgr_deliver(QMGR_TRANSPORT *transport, VSTREAM *stream) * If we get this far, go wait for the delivery status report. */ qmgr_deliver_concurrency++; + transport->active++; + if (QMGR_JOB_SLOW(entry->peer->job)) + transport->slow_active++; entry->stream = stream; event_enable_read(vstream_fileno(stream), qmgr_deliver_update, (char *) entry); /* + * Warn about misconfigured systems. + * + * Unfortunately, we do not know what the transport delivery agent limit is, + * so we can only keep checking this as the delivery agents get maxed out. + */ + if (transport->slow_limit > 0 && transport->slow_limit < transport->active / 2) { + msg_warn("%s%s is too small for %s transport limit - adjusting %s%s", + transport->name, _SLOW_XPORT_LIMIT, transport->name, transport->name, _SLOW_XPORT_LIMIT); + transport->slow_limit = transport->active - transport->active / 10; + } + + /* * Guard against broken systems. */ event_request_timer(qmgr_deliver_abort, (char *) entry, var_daemon_timeout); diff --git a/src/qmgr/qmgr_job.c b/src/qmgr/qmgr_job.c index 7de70f2..2864954 100644 --- a/src/qmgr/qmgr_job.c +++ b/src/qmgr/qmgr_job.c @@ -103,6 +103,9 @@ static QMGR_JOB *qmgr_job_create(QMGR_MESSAGE *message, QMGR_TRANSPORT *transpor job = (QMGR_JOB *) mymalloc(sizeof(QMGR_JOB)); job->message = message; + job->flags = 0; + if (qmgr_message_is_slow(message, transport)) + job->flags |= QMGR_JOB_SLOW_FLAG; QMGR_LIST_APPEND(message->job_list, job, message_peers); htable_enter(transport->job_byname, message->queue_id, (char *) job); job->transport = transport; @@ -873,6 +876,13 @@ QMGR_ENTRY *qmgr_job_entry_select(QMGR_TRANSPORT *transport) if (IS_BLOCKER(job, transport)) continue; + /* + * Skip any slow jobs once we have reached the transport limit for slow jobs. + */ + if (QMGR_JOB_SLOW(job) && + (transport->slow_limit > 0 && transport->slow_active >= transport->slow_limit)) + continue; + if ((peer = qmgr_job_peer_select(job)) != 0) { /* @@ -939,7 +949,8 @@ QMGR_ENTRY *qmgr_job_entry_select(QMGR_TRANSPORT *transport) /* * We have not found any entry we could use for delivery. Well, things * must have changed since this transport was selected for asynchronous - * allocation. Never mind. Clear the current job pointer and reluctantly + * allocation. Or there are only slow jobs and we have hit the limit. + * Never mind. Clear the current job pointer and reluctantly * report back that we have failed in our task. */ transport->job_current = 0; diff --git a/src/qmgr/qmgr_message.c b/src/qmgr/qmgr_message.c index 576fb2d..bd7b229 100644 --- a/src/qmgr/qmgr_message.c +++ b/src/qmgr/qmgr_message.c @@ -27,6 +27,10 @@ /* void qmgr_message_kill_record(message, offset) /* QMGR_MESSAGE *message; /* long offset; +/* +/* int qmgr_message_is_slow(message, transport) +/* QMGR_MESSAGE *message; +/* QMGR_TRANSPORT *transport; /* DESCRIPTION /* This module performs en-gross operations on queue messages. /* @@ -71,6 +75,10 @@ /* the record type at the given offset to "killed", and closes the file. /* A killed envelope record is ignored. Killed records are not allowed /* inside the message content. +/* +/* qmgr_message_is_slow() tests if given message is considered slow +/* in context of given transport. Currently, too old messages coming from +/* deferred queue and too large messages may be considered slow. /* DIAGNOSTICS /* Warnings: malformed message file. Fatal errors: out of memory. /* SEE ALSO @@ -148,12 +156,13 @@ #include "qmgr.h" int qmgr_message_count; +int qmgr_deferred_message_count; int qmgr_recipient_count; /* qmgr_message_create - create in-core message structure */ static QMGR_MESSAGE *qmgr_message_create(const char *queue_name, - const char *queue_id, int qflags) + const char *orig_queue_name, const char *queue_id, int qflags) { QMGR_MESSAGE *message; @@ -175,6 +184,9 @@ static QMGR_MESSAGE *qmgr_message_create(const char *queue_name, message->data_offset = 0; message->queue_id = mystrdup(queue_id); message->queue_name = mystrdup(queue_name); + message->orig_queue_name = mystrdup(orig_queue_name); + if (strcmp(message->orig_queue_name, MAIL_QUEUE_DEFERRED) == 0) + qmgr_deferred_message_count++; message->encoding = 0; message->sender = 0; message->dsn_envid = 0; @@ -214,6 +226,18 @@ static void qmgr_message_close(QMGR_MESSAGE *message) message->fp = 0; } +/* qmgr_message_is_slow - recognize messages slow for given transport*/ + +int qmgr_message_is_slow(QMGR_MESSAGE *message, QMGR_TRANSPORT *transport) +{ + if (transport->slow_msg_size > 0 && message->data_size >= transport->slow_msg_size) + return (1); + if (strcmp(message->orig_queue_name, MAIL_QUEUE_DEFERRED) == 0 && + message->queued_time - message->create_time >= transport->slow_msg_time) + return (1); + return (0); +} + /* qmgr_message_open - open queue file */ static int qmgr_message_open(QMGR_MESSAGE *message) @@ -1385,8 +1409,11 @@ void qmgr_message_free(QMGR_MESSAGE *message) msg_panic("qmgr_message_free: queue file is open"); while ((job = message->job_list.next) != 0) qmgr_job_free(job); + if (strcmp(message->orig_queue_name, MAIL_QUEUE_DEFERRED) == 0) + qmgr_deferred_message_count--; myfree(message->queue_id); myfree(message->queue_name); + myfree(message->orig_queue_name); if (message->dsn_envid) myfree(message->dsn_envid); if (message->encoding) @@ -1428,19 +1455,19 @@ void qmgr_message_free(QMGR_MESSAGE *message) /* qmgr_message_alloc - create in-core message structure */ -QMGR_MESSAGE *qmgr_message_alloc(const char *queue_name, const char *queue_id, - int qflags, mode_t mode) +QMGR_MESSAGE *qmgr_message_alloc(const char *queue_name, const char *orig_queue_name, + const char *queue_id, int qflags, mode_t mode) { const char *myname = "qmgr_message_alloc"; QMGR_MESSAGE *message; if (msg_verbose) - msg_info("%s: %s %s", myname, queue_name, queue_id); + msg_info("%s: %s %s (from %s)", myname, queue_name, queue_id, orig_queue_name); /* * Create an in-core message structure. */ - message = qmgr_message_create(queue_name, queue_id, qflags); + message = qmgr_message_create(queue_name, orig_queue_name, queue_id, qflags); /* * Extract message envelope information: time of arrival, sender address, diff --git a/src/qmgr/qmgr_transport.c b/src/qmgr/qmgr_transport.c index 434d75e..8fa115e 100644 --- a/src/qmgr/qmgr_transport.c +++ b/src/qmgr/qmgr_transport.c @@ -282,11 +282,18 @@ QMGR_TRANSPORT *qmgr_transport_select(void) * the number of pending delivery agent connections, until all delivery * agent concurrency windows are maxed out, or until we run out of "todo" * queue entries. + * + * XXX Rather than duplicating the window testing logic below for slow jobs, + * we allow the transport to give it a try first and only back off after it + * finds out it has no but slow jobs available. As it only happens when the slow + * messages max out all delivery agents without anything else to be delivered, + * it's not critical. */ #define MIN5af51743e4eef(x, y) ((x) < (y) ? (x) : (y)) for (xport = qmgr_transport_list.next; xport; xport = xport->peers.next) { if ((xport->flags & QMGR_TRANSPORT_STAT_DEAD) != 0 + || xport->job_current == 0 || xport->pending >= QMGR_TRANSPORT_MAX_PEND) continue; need = xport->pending + 1; @@ -378,11 +385,22 @@ QMGR_TRANSPORT *qmgr_transport_create(const char *name) transport = (QMGR_TRANSPORT *) mymalloc(sizeof(QMGR_TRANSPORT)); transport->flags = 0; transport->pending = 0; + transport->active = 0; + transport->slow_active = 0; transport->name = mystrdup(name); /* * Use global configuration settings or transport-specific settings. */ + transport->slow_limit = + get_mail_conf_int2(name, _SLOW_XPORT_LIMIT, + var_slow_xport_limit, 0, 0); + transport->slow_msg_size = + get_mail_conf_int2(name, _SLOW_MSG_SIZE, + var_slow_msg_size, 0, 0); + transport->slow_msg_time = + get_mail_conf_time2(name, _SLOW_MSG_TIME, + var_slow_msg_time, 's', 0, 0); transport->dest_concurrency_limit = get_mail_conf_int2(name, _DEST_CON_LIMIT, var_dest_con_limit, 0, 0);