This patch implements analysers for parsing the CLI and extra features
for the master's CLI.

For each command (sent alone, or separated by ; or \n) the request
analyser will determine to which server it should send the request.

The 'mode cli' proxy is able to parse a prefix for each command which is
used to select the apropriate server. The prefix start by @ and is
followed by "master", the PID preceded by ! or the relative PID. (e.g.
@master, @1, @!1234).

The command is sent with a SHUTW which force the server to close the
connection after sending its response. However the proxy allows a
keepalive connection on the client side and does not close.

The response analyser does not do much stuff, it only reinits the
connection when it received a close from the server, and forward the
response. It does not analyze the response data.
The only guarantee of the end of the response is the close of the
server, we can't rely on the double \n since it's not send by every
command.

This could be reimplemented later as a filter.
---
 include/proto/cli.h     |   7 +
 include/types/channel.h |   3 +
 include/types/proxy.h   |   1 +
 include/types/stream.h  |   1 +
 src/cfgparse.c          |   9 ++
 src/cli.c               | 377 +++++++++++++++++++++++++++++++++++++++++++++++-
 src/proxy.c             |   2 +
 src/stream.c            |   3 +
 8 files changed, 402 insertions(+), 1 deletion(-)

diff --git a/include/proto/cli.h b/include/proto/cli.h
index 467a86ea7..74052f714 100644
--- a/include/proto/cli.h
+++ b/include/proto/cli.h
@@ -32,5 +32,12 @@ int mworker_cli_proxy_create();
 int mworker_cli_proxy_new_listener(char *line);
 int mworker_cli_sockpair_new(struct mworker_proc *mworker_proc, int proc);
 
+/* proxy mode cli functions */
+
+/* analyzers */
+int pcli_wait_for_request(struct stream *s, struct channel *req, int an_bit);
+int pcli_wait_for_response(struct stream *s, struct channel *rep, int an_bit);
+
+
 #endif /* _PROTO_CLI_H */
 
diff --git a/include/types/channel.h b/include/types/channel.h
index 7879b1258..3fb496bf4 100644
--- a/include/types/channel.h
+++ b/include/types/channel.h
@@ -180,6 +180,9 @@
 #define AN_RES_FLT_XFER_DATA    0x04000000
 #define AN_RES_FLT_END          0x08000000
 
+#define AN_REQ_WAIT_CLI         0x10000000
+#define AN_RES_WAIT_CLI         0x20000000
+
 /* Magic value to forward infinite size (TCP, ...), used with ->to_forward */
 #define CHN_INFINITE_FORWARD    MAX_RANGE(unsigned int)
 
diff --git a/include/types/proxy.h b/include/types/proxy.h
index da098485e..b7c9038d6 100644
--- a/include/types/proxy.h
+++ b/include/types/proxy.h
@@ -65,6 +65,7 @@ enum pr_mode {
        PR_MODE_TCP = 0,
        PR_MODE_HTTP,
        PR_MODE_HEALTH,
+       PR_MODE_CLI,
 } __attribute__((packed));
 
 enum PR_SRV_STATE_FILE {
diff --git a/include/types/stream.h b/include/types/stream.h
index feeb56b12..87bdf46ed 100644
--- a/include/types/stream.h
+++ b/include/types/stream.h
@@ -162,6 +162,7 @@ struct stream {
        void (*srv_error)(struct stream *s,     /* the function to call upon 
unrecoverable server errors (or NULL) */
                          struct stream_interface *si);
 
+       int pcli_next_pid;                      /* next target PID to use for 
the CLI proxy */
        char *unique_id;                        /* custom unique ID */
 
        /* These two pointers are used to resume the execution of the rule 
lists. */
diff --git a/src/cfgparse.c b/src/cfgparse.c
index 8d8b6ea32..65afadca6 100644
--- a/src/cfgparse.c
+++ b/src/cfgparse.c
@@ -7654,6 +7654,10 @@ int check_config_validity()
                case PR_MODE_HTTP:
                        curproxy->http_needed = 1;
                        break;
+
+               case PR_MODE_CLI:
+                       cfgerr += proxy_cfg_ensure_no_http(curproxy);
+                       break;
                }
 
                if (curproxy != global.stats_fe && (curproxy->cap & PR_CAP_FE) 
&& LIST_ISEMPTY(&curproxy->conf.listeners)) {
@@ -8745,6 +8749,11 @@ out_uri_auth_compat:
                                curproxy->fe_rsp_ana |= AN_RES_WAIT_HTTP | 
AN_RES_HTTP_PROCESS_FE;
                        }
 
+                       if (curproxy->mode == PR_MODE_CLI) {
+                               curproxy->fe_req_ana |= AN_REQ_WAIT_CLI;
+                               curproxy->fe_rsp_ana |= AN_RES_WAIT_CLI;
+                       }
+
                        /* both TCP and HTTP must check switching rules */
                        curproxy->fe_req_ana |= AN_REQ_SWITCHING_RULES;
 
diff --git a/src/cli.c b/src/cli.c
index 2d4d1281c..f1d22a489 100644
--- a/src/cli.c
+++ b/src/cli.c
@@ -1619,6 +1619,8 @@ static int cli_parse_simple(char **args, char *payload, 
struct appctx *appctx, v
 }
 
 
+/* The pcli_* functions are used for the CLI proxy in the master */
+
 static enum obj_type *pcli_pid_to_server(int proc_pid)
 {
        struct mworker_proc *child;
@@ -1692,6 +1694,379 @@ static int pcli_prefix_to_pid(const char *prefix)
        return -1;
 }
 
+/* Parse the CLI request:
+ *
+ *  - it can rewrite the buffer by trimming the prefix
+ *  - fill dst with the destination server if there is one
+ *
+ *  Return:
+ *  - the amount of data to forward or
+ *  - -1 if there is no end to the command or
+ *  - 0 everything has been trimmed (only a prefix)
+ */
+#define PCLI_REQ_INIT      0
+#define PCLI_REQ_PFX       1
+#define PCLI_REQ_TRIM      2
+#define PCLI_REQ_CMD       3
+
+int pcli_parse_request(struct channel *req, int *target_pid)
+{
+       char *input = (char *)ci_head(req);
+       const char *end;
+       char *ptr, *trim = NULL, *pfx_b = NULL, *cmd_b = NULL;
+       struct buffer *buf = &req->buf;
+       int ret = 0;
+       int state = PCLI_REQ_INIT;
+
+       ptr = input;
+       end = b_stop(buf);
+
+        /* The while loop condition is checking the end of the command.
+           It is needed to iterate for each ptr++ done in the parser */
+       while (ptr < end && *ptr != '\n' && *ptr != '\r' && *ptr != ';') {
+               switch (state) {
+                       /* The init state only trims the useless chars */
+                       case PCLI_REQ_INIT:
+
+                               /* skip every  spaces at the start of the 
command */
+                               if (*ptr == ' ') {
+                                       ptr++;
+                                       continue;
+                               }
+                               pfx_b = ptr; /* this is the start of the 
command or of the @ prefix */
+                               state = PCLI_REQ_PFX;
+
+                       /* the atprefix state looks for a @ prefix. If it finds
+                          it, it will check to which server send the request.
+                          It also ajust the trim pointer */
+                       case PCLI_REQ_PFX:
+
+                               if (*pfx_b != '@') {
+                                       /* there is no prefix */
+                                       pfx_b = NULL;
+                                       cmd_b = ptr;
+                                       state = PCLI_REQ_CMD;
+                                       continue;
+                               }
+
+                               if (*ptr != ' ') {
+                                       ptr++;
+                                       continue;
+                               }
+                               *ptr = '\0';  /* this the end of the prefix */
+                               ptr++;
+                               trim = ptr;
+                               state = PCLI_REQ_TRIM;
+                       break;
+
+                       /* we really need to trim there because that's the only
+                          way to know if we are going to send a command or if
+                          there is only a prefix */
+                       case PCLI_REQ_TRIM:
+                               if (*ptr == ' ') {
+                                       ptr++;
+                                       continue;
+                               }
+                               cmd_b = trim = ptr;
+                               state = PCLI_REQ_CMD;
+
+                       /* just look for the end of the command */
+                       case PCLI_REQ_CMD:
+                               ptr++;
+                               continue;
+               }
+       }
+
+       /* we didn't find a command separator, not enough data */
+       if (ptr >= end)
+               return -1;
+
+       if (!pfx_b && !cmd_b) {
+               /* probably just a \n or a ; */
+               return 1;
+       } else if (pfx_b && !cmd_b) {
+               /* it's only a prefix, we don't want to forward it */
+               *ptr = '\0';
+               trim = ptr + 1; /* we want to trim the whole command */
+               ret = 0;
+       } else if (cmd_b) {
+               /* command without a prefix */
+               *ptr = '\n';
+               ret = ptr - cmd_b + 1;
+       }
+
+       if (pfx_b)
+               *target_pid = pcli_prefix_to_pid(pfx_b);
+
+       /* trim the useless chars */
+       if (trim)
+               b_del(&req->buf, trim - input);
+
+       return ret;
+}
+
+int pcli_wait_for_request(struct stream *s, struct channel *req, int an_bit)
+{
+       int target_pid;
+       int to_forward;
+
+       target_pid = s->pcli_next_pid;
+
+read_again:
+       /* if the channel is closed for read, we won't receive any more data
+          from the client, but we don't want to forward this close to the
+          server */
+       channel_dont_close(req);
+
+       /* We don't know yet to which server we will connect */
+       channel_dont_connect(req);
+
+
+       /* we are not waiting for a response, there is no more request and we
+        * receive a close from the client, we can leave */
+       if (!(ci_data(req)) && req->flags & CF_SHUTR) {
+               channel_shutw_now(&s->res);
+               s->req.analysers &= ~AN_REQ_WAIT_CLI;
+               return 1;
+       }
+
+       req->flags |= CF_READ_DONTWAIT;
+
+       /* need more data */
+       if (!ci_data(req))
+               return 0;
+
+       /* If there is data available for analysis, log the end of the idle 
time. */
+       if (c_data(req) && s->logs.t_idle == -1)
+               s->logs.t_idle = tv_ms_elapsed(&s->logs.tv_accept, &now) - 
s->logs.t_handshake;
+
+       to_forward = pcli_parse_request(req, &target_pid);
+       if (to_forward > 0) {
+               /* enough data */
+
+               /* we didn't find the process, send an error and close */
+               if (target_pid < 0) {
+                       pcli_reply_and_close(s, "Can't find the target CLI!\n");
+                       return 0;
+               }
+
+               /* forward only 1 command */
+               channel_forward(req, to_forward);
+               /* we send only 1 command per request, and we write close after 
it */
+               channel_shutw_now(req);
+
+               /* remove the XFER_DATA analysers, which forwards all
+                * the data, we don't want to forward the next requests
+                * We need to add CF_FLT_ANALYZE to abort the forward too.
+                */
+               req->analysers &= ~(AN_REQ_FLT_XFER_DATA|AN_REQ_WAIT_CLI);
+               req->analysers |= AN_REQ_FLT_END|CF_FLT_ANALYZE;
+               s->res.analysers |= AN_RES_WAIT_CLI;
+
+               /* we can connect now */
+               s->target = pcli_pid_to_server(target_pid);
+               if (!s->target) {
+                       s->target = &cli_applet.obj_type;
+               }
+
+               s->flags |= (SF_DIRECT | SF_ASSIGNED);
+               channel_auto_connect(req);
+
+       } else if (to_forward == 0) {
+               /* we only received a prefix without command, which
+                  mean that we want to store it for every other
+                  command for this session */
+               if (target_pid > -1) {
+                       s->pcli_next_pid = target_pid;
+                       // TODO: pcli_reply the prompt
+               } else {
+                       // TODO: pcli_reply() error
+                       s->pcli_next_pid = 0;
+               }
+
+               /* we trimmed things but we might have other commands to 
consume */
+               goto read_again;
+       } else if (to_forward == -1 && channel_full(req, 
global.tune.maxrewrite)) {
+               /* buffer is full and we didn't catch the end of a command */
+               goto send_help;
+       }
+
+       return 0;
+
+send_help:
+       b_reset(&req->buf);
+       b_putblk(&req->buf, "help\n", 5);
+       goto read_again;
+}
+
+int pcli_wait_for_response(struct stream *s, struct channel *rep, int an_bit)
+{
+       struct proxy *fe = strm_fe(s);
+       struct proxy *be = s->be;
+
+       rep->flags |= CF_READ_DONTWAIT; /* try to get back here ASAP */
+       rep->flags |= CF_NEVER_WAIT;
+
+       /* don't forward the close */
+       channel_dont_close(&s->res);
+       channel_dont_close(&s->req);
+
+       /* forward the data */
+       if (ci_data(rep)) {
+               c_adv(rep, ci_data(rep));
+               return 0;
+       }
+
+       if ((rep->flags & (CF_SHUTR|CF_READ_NULL))) {
+               /* stream cleanup */
+
+               s->si[1].flags |= SI_FL_NOLINGER | SI_FL_NOHALF;
+               si_shutr(&s->si[1]);
+               si_shutw(&s->si[1]);
+
+               /*
+                * starting from there this the same code as
+                * http_end_txn_clean_session().
+                *
+                * It allows to do frontend keepalive while reconnecting to a
+                * new server for each request.
+                */
+
+               if (s->flags & SF_BE_ASSIGNED) {
+                       HA_ATOMIC_SUB(&be->beconn, 1);
+                       if (unlikely(s->srv_conn))
+                               sess_change_server(s, NULL);
+               }
+
+               s->logs.t_close = tv_ms_elapsed(&s->logs.tv_accept, &now);
+               stream_process_counters(s);
+
+               /* don't count other requests' data */
+               s->logs.bytes_in  -= ci_data(&s->req);
+               s->logs.bytes_out -= ci_data(&s->res);
+
+               /* we may need to know the position in the queue */
+               pendconn_free(s);
+
+               /* let's do a final log if we need it */
+               if (!LIST_ISEMPTY(&fe->logformat) && s->logs.logwait &&
+                   !(s->flags & SF_MONITOR) &&
+                   (!(fe->options & PR_O_NULLNOLOG) || s->req.total)) {
+                       s->do_log(s);
+               }
+
+               /* stop tracking content-based counters */
+               stream_stop_content_counters(s);
+               stream_update_time_stats(s);
+
+               s->logs.accept_date = date; /* user-visible date for logging */
+               s->logs.tv_accept = now;  /* corrected date for internal use */
+               s->logs.t_handshake = 0; /* There are no handshake in keep 
alive connection. */
+               s->logs.t_idle = -1;
+               tv_zero(&s->logs.tv_request);
+               s->logs.t_queue = -1;
+               s->logs.t_connect = -1;
+               s->logs.t_data = -1;
+               s->logs.t_close = 0;
+               s->logs.prx_queue_pos = 0;  /* we get the number of pending 
conns before us */
+               s->logs.srv_queue_pos = 0; /* we will get this number soon */
+
+               s->logs.bytes_in = s->req.total = ci_data(&s->req);
+               s->logs.bytes_out = s->res.total = ci_data(&s->res);
+
+               stream_del_srv_conn(s);
+               if (objt_server(s->target)) {
+                       if (s->flags & SF_CURR_SESS) {
+                               s->flags &= ~SF_CURR_SESS;
+                               
HA_ATOMIC_SUB(&objt_server(s->target)->cur_sess, 1);
+                       }
+                       if (may_dequeue_tasks(objt_server(s->target), be))
+                               process_srv_queue(objt_server(s->target));
+               }
+
+               s->target = NULL;
+
+               /* only release our endpoint if we don't intend to reuse the
+                * connection.
+                */
+               if (!si_conn_ready(&s->si[1])) {
+                       si_release_endpoint(&s->si[1]);
+                       s->srv_conn = NULL;
+               }
+
+               s->si[1].state     = s->si[1].prev_state = SI_ST_INI;
+               s->si[1].err_type  = SI_ET_NONE;
+               s->si[1].conn_retries = 0;  /* used for logging too */
+               s->si[1].exp       = TICK_ETERNITY;
+               s->si[1].flags    &= SI_FL_ISBACK | SI_FL_DONT_WAKE; /* we're 
in the context of process_stream */
+               s->req.flags &= 
~(CF_SHUTW|CF_SHUTW_NOW|CF_AUTO_CONNECT|CF_WRITE_ERROR|CF_STREAMER|CF_STREAMER_FAST|CF_NEVER_WAIT|CF_WAKE_CONNECT|CF_WROTE_DATA);
+               s->res.flags &= 
~(CF_SHUTR|CF_SHUTR_NOW|CF_READ_ATTACHED|CF_READ_ERROR|CF_READ_NOEXP|CF_STREAMER|CF_STREAMER_FAST|CF_WRITE_PARTIAL|CF_NEVER_WAIT|CF_WROTE_DATA|CF_WRITE_EVENT);
+               s->flags &= 
~(SF_DIRECT|SF_ASSIGNED|SF_ADDR_SET|SF_BE_ASSIGNED|SF_FORCE_PRST|SF_IGNORE_PRST);
+               s->flags &= ~(SF_CURR_SESS|SF_REDIRECTABLE|SF_SRV_REUSED);
+               s->flags &= ~(SF_ERR_MASK|SF_FINST_MASK|SF_REDISP);
+               /* reinitialise the current rule list pointer to NULL. We are 
sure that
+                * any rulelist match the NULL pointer.
+                */
+               s->current_rule_list = NULL;
+
+               s->be = strm_fe(s);
+               s->logs.logwait = strm_fe(s)->to_log;
+               s->logs.level = 0;
+               stream_del_srv_conn(s);
+               s->target = NULL;
+               /* re-init store persistence */
+               s->store_count = 0;
+               s->uniq_id = global.req_count++;
+
+               s->req.flags |= CF_READ_DONTWAIT; /* one read is usually enough 
*/
+
+               s->req.flags |= CF_WAKE_ONCE; /* need to be called again if 
there is some command left in the request */
+
+               s->req.analysers |= AN_REQ_WAIT_CLI;
+               s->res.analysers &= ~AN_RES_WAIT_CLI;
+
+               /* We must trim any excess data from the response buffer, 
because we
+                * may have blocked an invalid response from a server that we 
don't
+                * want to accidentely forward once we disable the analysers, 
nor do
+                * we want those data to come along with next response. A 
typical
+                * example of such data would be from a buggy server responding 
to
+                * a HEAD with some data, or sending more than the advertised
+                * content-length.
+                */
+               if (unlikely(ci_data(&s->res)))
+                       b_set_data(&s->res.buf, co_data(&s->res));
+
+               /* Now we can realign the response buffer */
+               c_realign_if_empty(&s->res);
+
+               s->req.rto = strm_fe(s)->timeout.client;
+               s->req.wto = TICK_ETERNITY;
+
+               s->res.rto = TICK_ETERNITY;
+               s->res.wto = strm_fe(s)->timeout.client;
+
+               s->req.rex = TICK_ETERNITY;
+               s->req.wex = TICK_ETERNITY;
+               s->req.analyse_exp = TICK_ETERNITY;
+               s->res.rex = TICK_ETERNITY;
+               s->res.wex = TICK_ETERNITY;
+               s->res.analyse_exp = TICK_ETERNITY;
+               s->si[1].hcto = TICK_ETERNITY;
+
+               /* we're removing the analysers, we MUST re-enable events 
detection.
+                * We don't enable close on the response channel since it's 
either
+                * already closed, or in keep-alive with an idle connection 
handler.
+                */
+               channel_auto_read(&s->req);
+               channel_auto_close(&s->req);
+               channel_auto_read(&s->res);
+
+
+               return 1;
+       }
+       return 0;
+}
+
 /*
  * The mworker functions are used to initialize the CLI in the master process
  */
@@ -1711,7 +2086,7 @@ int mworker_cli_proxy_create()
        mworker_proxy->next = proxies_list;
        proxies_list = mworker_proxy;
        mworker_proxy->id = strdup("MASTER");
-       mworker_proxy->mode = PR_MODE_TCP;
+       mworker_proxy->mode = PR_MODE_CLI;
        mworker_proxy->state = PR_STNEW;
        mworker_proxy->last_change = now.tv_sec;
        mworker_proxy->cap = PR_CAP_LISTEN; /* this is a listen section */
diff --git a/src/proxy.c b/src/proxy.c
index 0fbfce9c6..9a6313a1c 100644
--- a/src/proxy.c
+++ b/src/proxy.c
@@ -88,6 +88,8 @@ const char *proxy_mode_str(int mode) {
                return "http";
        else if (mode == PR_MODE_HEALTH)
                return "health";
+       else if (mode == PR_MODE_CLI)
+               return "cli";
        else
                return "unknown";
 }
diff --git a/src/stream.c b/src/stream.c
index 8df5c7563..3143a2a82 100644
--- a/src/stream.c
+++ b/src/stream.c
@@ -160,6 +160,7 @@ struct stream *stream_new(struct session *sess, enum 
obj_type *origin)
        s->buffer_wait.wakeup_cb = (int (*)(void *))stream_res_wakeup;
 
        s->flags |= SF_INITIALIZED;
+       s->pcli_next_pid = 0;
        s->unique_id = NULL;
 
        if ((t = task_new(tid_bit)) == NULL)
@@ -1967,6 +1968,7 @@ redo:
                                FLT_ANALYZE(s, req, process_sticking_rules,     
ana_list, ana_back, AN_REQ_STICKING_RULES);
                                ANALYZE    (s, req, flt_analyze_http_headers,   
ana_list, ana_back, AN_REQ_FLT_HTTP_HDRS);
                                ANALYZE    (s, req, http_request_forward_body,  
ana_list, ana_back, AN_REQ_HTTP_XFER_BODY);
+                               ANALYZE    (s, req, pcli_wait_for_request,      
ana_list, ana_back, AN_REQ_WAIT_CLI);
                                ANALYZE    (s, req, flt_xfer_data,              
ana_list, ana_back, AN_REQ_FLT_XFER_DATA);
                                ANALYZE    (s, req, flt_end_analyze,            
ana_list, ana_back, AN_REQ_FLT_END);
                                break;
@@ -2035,6 +2037,7 @@ redo:
                                FLT_ANALYZE(s, res, http_process_res_common,    
ana_list, ana_back, AN_RES_HTTP_PROCESS_BE, s->be);
                                ANALYZE    (s, res, flt_analyze_http_headers,   
ana_list, ana_back, AN_RES_FLT_HTTP_HDRS);
                                ANALYZE    (s, res, http_response_forward_body, 
ana_list, ana_back, AN_RES_HTTP_XFER_BODY);
+                               ANALYZE    (s, res, pcli_wait_for_response,     
ana_list, ana_back, AN_RES_WAIT_CLI);
                                ANALYZE    (s, res, flt_xfer_data,              
ana_list, ana_back, AN_RES_FLT_XFER_DATA);
                                ANALYZE    (s, res, flt_end_analyze,            
ana_list, ana_back, AN_RES_FLT_END);
                                break;
-- 
2.16.4


Reply via email to