Filippo Giunchedi has uploaded a new change for review.
https://gerrit.wikimedia.org/r/180756
Change subject: Imported Upstream version 0.36+git1acdff3
......................................................................
Imported Upstream version 0.36+git1acdff3
Change-Id: Ia850be88f3e7ffc8dcd23f1b13f04f14996e28e3
---
M README.md
A contrib/relay.conf
A contrib/relay.init
A contrib/relay.logrotate
A contrib/relay.monit
A contrib/relay.spec
A contrib/relay.sysconfig
M dispatcher.c
M dispatcher.h
M receptor.c
M receptor.h
M relay.c
M router.c
13 files changed, 442 insertions(+), 70 deletions(-)
git pull ssh://gerrit.wikimedia.org:29418/operations/debs/carbon-c-relay
refs/changes/56/180756/1
diff --git a/README.md b/README.md
index 4d670e5..bc612b9 100644
--- a/README.md
+++ b/README.md
@@ -28,8 +28,22 @@
right destination(s). The route file supports two main constructs:
clusters and matches. The first define groups of hosts data metrics can
be sent to, the latter define which metrics should be sent to which
-cluster. Aggregation rules are seen as matches. The syntax in this
-file is as follows:
+cluster. Aggregation rules are seen as matches.
+
+For every metric received by the relay, cleansing is performed. The
+following changes are performed before any match, aggregate or rewrite
+rule sees the metric:
+
+ - double dot elimination (necessary for correctly functioning
+ consistent hash routing)
+ - trailing/leading dot elimination
+ - whitespace normalisation (this mostly affects output of the relay
+ to other targets: metric, value and timestamp will be separated by
+ a single space only, ever)
+ - irregular char replacement with underscores (\_), currently
+ irregular is defined as not being in [0-9a-zA-Z-_:#].
+
+The route file syntax is as follows:
```
# comments are allowed in any place and start with a hash (#)
@@ -124,9 +138,11 @@
final, as no new entries are allowed to be added any more. On top of an
aggregation multiple aggregations can be computed. They can be of the
same or different aggregation types, but should write to a unique new
-metric. Produced metrics are sent to the relay as if they were
-submitted from the outside, hence match and aggregation rules apply to
-those. Care should be taken that loops are avoided. Also, since
+metric. The metric names can include back references like in rewrite
+expressions, allowing for powerful single aggregation rules that yield
+in many aggregations. Produced metrics are sent to the relay as if they
+were submitted from the outside, hence match and aggregation rules apply
+to those. Care should be taken that loops are avoided. Also, since
aggregations appear as matches without `stop` keyword, their positioning
matters in the same way ordering of match statements.
@@ -313,8 +329,8 @@
e.g. for each hostname encountered. A typical aggregation looks like:
aggregate
- sys.dc1.somehost-[0-9]+.somecluster.mysql.replication_delay
- sys.dc2.somehost-[0-9]+.somecluster.mysql.replication_delay
+ ^sys\.dc1\.somehost-[0-9]+\.somecluster\.mysql\.replication_delay
+ ^sys\.dc2\.somehost-[0-9]+\.somecluster\.mysql\.replication_delay
every 10 seconds
expire after 35 seconds
compute sum write to
@@ -350,6 +366,31 @@
carbon-c-relay instance, such that it is easy to forward the produced
metrics to another relay instance is a good practice.
+The previous example could also be written as follows to be more
+dynamic:
+
+ aggregate
+ ^sys\.dc[0-9].(somehost-[0-9]+)\.([^.]+)\.mysql\.replication_delay
+ every 10 seconds
+ expire after 35 seconds
+ compute sum write to
+ mysql.host.\1.replication_delay
+ compute sum write to
+ mysql.host.all.replication_delay
+ compute sum write to
+ mysql.cluster.\2.replication_delay
+ compute sum write to
+ mysql.cluster.all.replication_delay
+ ;
+
+Here a single match, results in four aggregations, each of a different
+scope. In this example aggregation based on hostname and cluster are
+being made, as well as the more general `all` targets, which in this
+example have both identical values. Note that with this single
+aggregation rule, both per-cluster, per-host and total aggregations are
+produced. Obviously, the input metrics define which hosts and clusters
+are produced.
+
Author
------
diff --git a/contrib/relay.conf b/contrib/relay.conf
new file mode 100644
index 0000000..a32242c
--- /dev/null
+++ b/contrib/relay.conf
@@ -0,0 +1,63 @@
+# comments are allowed in any place and start with a hash (#)
+#
+#cluster <name>
+# <forward | any_of | <carbon_ch | fnv1a_ch> [replication <count>]>
+# <host[:port] [proto <udp | tcp>]> ...
+# ;
+#match <* | <expression>>
+# send to <cluster | blackhole>
+# [stop]
+# ;
+#rewrite <expression>
+# into <replacement>
+# ;
+#aggregate
+# <expression> ...
+# every <interval> seconds
+# expire after <expiration> seconds
+# compute <sum | count | max | min | average> write to
+# <metric>
+# [compute ...]
+# ;
+
+cluster local_carbon
+ forward
+ 127.0.0.1:2013
+ ;
+
+################################################################################
+## Some example re-writes to make the graphite tree cleaner.
+## This is site specific as the regex assumes the fqdn is 5 elements long.
+
+## rewrite carbon c relay internal statistics
+## from carbon.relays.twiki501.back.test.bc.local.metricsQueued
+## into bc.test.twiki.twiki501.relay.metricsQueued
+#rewrite
^carbon\.relays\.([a-zA-Z]+)([0-9]+)_([a-zA-Z0-9]+)_([a-zA-Z0-9]+)_([a-zA-Z0-9]+)_([a-zA-Z0-9]+)\.(.*)
+# into \5.\4.\1.\1\2.relay.\7
+# ;
+## rewrite metrics to reverse hostname
+## from twiki501.back.test.bc.local.haggar.agent.0.metrics.0
+## into bc.test.twiki.twiki501.haggar.agent.0.metrics.0
+#rewrite ^([a-zA-Z]+)([0-9]+)\.back\.([a-zA-Z0-9]+)\.([a-zA-Z0-9]+)\.local(.*)
+# into \4.\3.\1\.\1\2\5
+# ;
+## rewrite metrics from statsite
+## forom sandbox.test.twiki.twiki001.statsite.gauges.gunicorn.workers
+## from twiki001.back.test.sandbox.local.statsite.gauges.gunicorn.workers
+## into sandbox.test.twiki.twiki001.statsite.gunicorn.gauges.workers
+#rewrite
^([a-zA-Z0-9]+)\.([a-zA-Z0-9]+)\.([a-zA-Z0-9]+)\.([a-zA-Z]+)([0-9]+)\.statsite\.([a-zA-Z]+)\.([a-zA-Z]+)\.(.*)
+# into \1.\2.\3.\4\5.statsite.\7.\6.\8
+# ;
+## clean up statsite numstats metric
+## from sandbox.test.twiki.twiki001.statsite.counts.numStats
+## into sandbox.test.twiki.twiki001.statsite.numStats
+#rewrite
^([a-zA-Z0-9]+)\.([a-zA-Z0-9]+)\.([a-zA-Z0-9]+)\.([a-zA-Z]+)([0-9]+)\.statsite\.counts\.(.*)
+# into \1.\2.\3.\4\5.statsite.\6
+# ;
+################################################################################
+
+
+match *
+ send to local_carbon
+ stop
+ ;
diff --git a/contrib/relay.init b/contrib/relay.init
new file mode 100644
index 0000000..a0089da
--- /dev/null
+++ b/contrib/relay.init
@@ -0,0 +1,82 @@
+#!/bin/bash
+#
+# /etc/rc.d/init.d/relay
+#
+# relay Startup script for the carbon-c-relay metrics aggregation
daemon
+# Packaged for the BBC by Matthew Hollick <[email protected]>
+#
+# description: Carbon-like graphite line mode relay.\n
+#\n
+#This project aims to be a replacement of the original Carbon relay\n
+#\n
+#The main reason to build a replacement is performance and configurability.\n
+#Carbon is single threaded, and sending metrics to multiple consistent-hash\n
+#clusters requires chaining of relays. This project provides a multithreaded\n
+#relay which can address multiple targets and clusters for each and every\n
+#metric based on pattern matches.\n
+#
+# chkconfig: 2345 80 80
+#
+# config: /etc/relay.conf
+# pidfile: /var/run/carbon/relay.pid
+
+# Source function library.
+. /etc/init.d/functions
+
+
+RETVAL=0
+PROG="relay"
+DAEMON_CONFIG=/etc/${PROG}.conf
+DAEMON_SYSCONFIG=/etc/sysconfig/${PROG}
+DAEMON=/usr/bin/${PROG}
+PID_FILE=/var/run/carbon/${PROG}.pid
+LOCK_FILE=/var/lock/subsys/${PROG}
+LOG_FILE=/var/log/carbon/relay.log
+DAEMON_USER="relay"
+FQDN=$(hostname --long)
+
+. ${DAEMON_SYSCONFIG}
+
+start() {
+ echo -n $"Starting Carbon C Relay: "
+ daemonize -u ${DAEMON_USER} -p ${PID_FILE} -l ${LOCK_FILE} -a -e ${LOG_FILE}
-o ${LOG_FILE} ${DAEMON} ${ARGS} -H ${FQDN} -f ${DAEMON_CONFIG}
+ RETVAL=$?
+ echo ""
+ return ${RETVAL}
+}
+
+stop() {
+ echo -n $"Stopping Carbon C Relay: "
+ killproc -p ${PID_FILE} -d 10 ${DAEMON}
+ RETVAL=$?
+ echo
+ [ $RETVAL = 0 ] && rm -f ${LOCK_FILE} ${PID_FILE}
+ return $RETVAL
+}
+
+case "$1" in
+ start)
+ start
+ ;;
+ stop)
+ stop
+ ;;
+ status)
+ status -p ${PID_FILE} ${DAEMON}
+ RETVAL=$?
+ ;;
+ reload|force-reload)
+ reload
+ ;;
+ restart)
+ stop
+ start
+ ;;
+ *)
+ N=/etc/init.d/${NAME}
+ echo "Usage: $N {start|stop|status|restart|force-reload}" >&2
+ RETVAL=2
+ ;;
+esac
+
+exit ${RETVAL}
diff --git a/contrib/relay.logrotate b/contrib/relay.logrotate
new file mode 100644
index 0000000..4f5a69c
--- /dev/null
+++ b/contrib/relay.logrotate
@@ -0,0 +1,12 @@
+# not installed by default as logrotate is used to manage all carbon log files.
+/var/log/carbon/relay.log
+{
+ sharedscripts
+ missingok
+ notifempty
+ rotate 30
+ compress
+ postrotate
+ [ ! -f /var/run/carbon/relay.pid ] || /etc/init.d/relay restart
+ endscript
+}
diff --git a/contrib/relay.monit b/contrib/relay.monit
new file mode 100644
index 0000000..d9064d3
--- /dev/null
+++ b/contrib/relay.monit
@@ -0,0 +1,6 @@
+# Monit script to ensure carbon c relay is always running
+check process relay with pidfile /var/run/relay/relay.pid
+ start program = "/etc/init.d/relay start"
+ stop program = "/etc/init.d/relay stop"
+ if failed port 2003 type tcp then restart
+ if 5 restarts within 5 cycles then timeout
diff --git a/contrib/relay.spec b/contrib/relay.spec
new file mode 100644
index 0000000..b5e5245
--- /dev/null
+++ b/contrib/relay.spec
@@ -0,0 +1,79 @@
+Name: relay
+Version: 0.32
+Release: 1%{?dist}
+Summary: A C implementation of the Graphite carbon relay daemon packaged
for mdr.
+Group: System Environment/Daemons
+License: See the LICENSE file at github.
+URL: https://github.com/grobian/carbon-c-relay
+Source0: %{name}-%{version}.tar.gz
+BuildRoot: %{_tmppath}/%{name}-%{version}-root
+BuildRequires: openssl-devel
+Requires(pre): /usr/sbin/useradd
+Requires: daemonize
+AutoReqProv: No
+
+%description
+
+Carbon-like graphite line mode relay.
+This project aims to be a replacement of the original Carbon relay.
+Carbon C Relay has been packed as part of twiki for the BBC.
+
+%prep
+%setup -q
+
+%build
+make %{?_smp_mflags}
+
+%install
+mkdir -vp $RPM_BUILD_ROOT/var/log/carbon/
+mkdir -vp $RPM_BUILD_ROOT/etc/monit.d/
+mkdir -vp $RPM_BUILD_ROOT/var/run/carbon
+mkdir -vp $RPM_BUILD_ROOT/var/lib/carbon
+install -m 755 relay $RPM_BUILD_ROOT/usr/bin/relay
+install -m 644 contrib/relay.conf $RPM_BUILD_ROOT/etc/relay.conf
+install -m 755 contrib/relay.init $RPM_BUILD_ROOT/etc/init.d/relay
+install -m 644 contrib/relay.sysconfig $RPM_BUILD_ROOT/etc/sysconfig/relay
+install -m 644 contrib/relay.monit $RPM_BUILD_ROOT/etc/monit.d/relay.conf
+
+%clean
+make clean
+
+%pre
+getent group carbon >/dev/null || groupadd -r carbon
+getent passwd carbon >/dev/null || \
+ useradd -r -g carbon -s /sbin/nologin \
+ -d $RPM_BUILD_ROOT/var/lib/carbon/ -c "Carbon Daemons" carbon
+exit 0
+
+%post
+chgrp carbon /var/run/carbon
+chmod 774 /var/run/relay
+chown carbon:carbon /var/log/carbon
+chmod 744 /var/log/carbon
+
+%files
+%defattr(-,root,root,-)
+/usr/bin/relay
+%config(noreplace) /etc/relay.conf
+/etc/init.d/relay
+%config(noreplace) /etc/sysconfig/relay
+%config(noreplace) /etc/monit.d/relay.conf
+#/var/run/carbon
+#/var/log/carbon
+
+%changelog
+* Mon Sep 8 2014 Matthew Hollick <[email protected]>
+- tidy up for github
+- reverted site specific changes
+
+* Tue Aug 8 2014 Matthew Hollick <[email protected]>
+- packaged as part of twiki
+
+* Tue Jul 1 2014 Matthew Hollick <[email protected]>
+- packaged as part of mdr
+- binary renamed from 'relay' to 'cc_relay'
+- pagage renamed to reflect function rather than component
+- user / group named by function
+
+* Tue May 6 2014 Matthew Hollick <[email protected]>
+- Initial package for the BBC
diff --git a/contrib/relay.sysconfig b/contrib/relay.sysconfig
new file mode 100644
index 0000000..523dbca
--- /dev/null
+++ b/contrib/relay.sysconfig
@@ -0,0 +1,17 @@
+#Usage: relay [-vdst] -f <config> [-p <port>] [-w <workers>] [-b <size>] [-q
<size>]
+#
+#Options:
+# -v print version and exit
+# -f read <config> for clusters and routes
+# -p listen on <port> for connections, defaults to 2003
+# -w user <workers> worker threads, defaults to 16
+# -b server send batch size, defaults to 2500
+# -q server queue size, defaults to 25000
+# -d debug mode: currently writes statistics to stdout
+# -s submission mode: write info about errors to stdout
+# -t config test mode: prints rule matches from input on stdin
+# -H hostname: override hostname (used in statistics)
+#
+# Note: The hostname is set dynamically in the init script.
+
+ARGS="-p 2003 -w 4 -b 2500 -q 25000"
diff --git a/dispatcher.c b/dispatcher.c
index 4674f65..30fafd0 100644
--- a/dispatcher.c
+++ b/dispatcher.c
@@ -43,6 +43,7 @@
char buf[METRIC_BUFSIZ];
int buflen;
char needmore:1;
+ char noexpire:1;
char metric[METRIC_BUFSIZ];
destination dests[CONN_DESTS_SIZE];
size_t destlen;
@@ -152,6 +153,7 @@
/**
* Adds a connection socket to the chain of connections.
* Connection sockets are those which need to be read from.
+ * Returns the connection id, or -1 if a failure occurred.
*/
int
dispatch_addconnection(int sock)
@@ -182,7 +184,7 @@
fmtnow(nowbuf), connectionslen);
pthread_rwlock_unlock(&connectionslock);
- return 1;
+ return -1;
}
memset(&newlst[connectionslen], '\0',
@@ -201,13 +203,35 @@
connections[c].sock = sock;
connections[c].buflen = 0;
connections[c].needmore = 0;
+ connections[c].noexpire = 0;
connections[c].destlen = 0;
connections[c].wait = 0;
connections[c].takenby = 0; /* now dispatchers will pick this one up */
acceptedconnections++;
+ return c;
+}
+
+/**
+ * Adds a pseudo-listener for datagram (UDP) sockets, which is pseudo,
+ * for in fact it adds a new connection, but makes sure that connection
+ * won't be closed after being idle, and won't count that connection as
+ * an incoming connection either.
+ */
+int
+dispatch_addlistener_udp(int sock)
+{
+ int conn = dispatch_addconnection(sock);
+
+ if (conn == -1)
+ return 1;
+
+ connections[conn].noexpire = 1;
+ acceptedconnections--;
+
return 0;
}
+
inline static char
dispatch_process_dests(connection *conn, dispatcher *self)
@@ -337,7 +361,7 @@
(*p >= 'a' && *p <= 'z') ||
(*p >= 'A' && *p <= 'Z') ||
(*p >= '0' && *p <= '9') ||
- *p == '-' || *p == '_' || *p == ':')
+ *p == '-' || *p == '_' || *p == ':' ||
*p == '#')
{
/* copy char */
*q++ = *p;
@@ -364,7 +388,9 @@
conn->wait = time(NULL);
conn->takenby = 0;
return 0;
- } else if (time(NULL) - conn->wait > IDLE_DISCONNECT_TIME) {
+ } else if (!conn->noexpire &&
+ time(NULL) - conn->wait > IDLE_DISCONNECT_TIME)
+ {
/* force close connection below */
len = 0;
} else {
@@ -444,7 +470,7 @@
dispatch_check_rlimit_and_warn();
continue;
}
- if
(dispatch_addconnection(client) != 0) {
+ if
(dispatch_addconnection(client) == -1) {
close(client);
continue;
}
diff --git a/dispatcher.h b/dispatcher.h
index 6b10f55..3406449 100644
--- a/dispatcher.h
+++ b/dispatcher.h
@@ -24,6 +24,7 @@
void dispatch_check_rlimit_and_warn(void);
int dispatch_addlistener(int sock);
+int dispatch_addlistener_udp(int sock);
void dispatch_removelistener(int sock);
int dispatch_addconnection(int sock);
dispatcher *dispatch_new_listener(void);
diff --git a/receptor.c b/receptor.c
index 537bff0..b9f9de2 100644
--- a/receptor.c
+++ b/receptor.c
@@ -38,10 +38,13 @@
* Opens up listener sockets. Returns the socket fds in ret, and
* updates retlen. If opening sockets failed, -1 is returned. The
* caller should ensure retlen is at least 1, and ret should be an array
- * large enough to old it.
+ * large enough to hold it.
*/
int
-bindlisten(int ret[], int *retlen, const char *interface, unsigned short port)
+bindlisten(
+ int ret_stream[], int *retlen_stream,
+ int ret_dgram[], int *retlen_dgram,
+ const char *interface, unsigned short port)
{
int sock;
int optval;
@@ -51,64 +54,81 @@
char buf[128];
char saddr[INET6_ADDRSTRLEN];
int err;
- int curlen = 0;
+ int curlen_stream = 0;
+ int curlen_dgram = 0;
+ int socktypes[] = {SOCK_STREAM, SOCK_DGRAM, 0};
+ int *socktype = socktypes;
tv.tv_sec = 0;
tv.tv_usec = 500 * 1000;
- memset(&hint, 0, sizeof(hint));
- hint.ai_family = PF_UNSPEC;
- hint.ai_socktype = 0;
- hint.ai_protocol = IPPROTO_TCP; /* 0 for UDP as well */
- hint.ai_flags = AI_NUMERICSERV | AI_PASSIVE;
- snprintf(buf, sizeof(buf), "%u", port);
+ for (; *socktype != 0; socktype++) {
+ memset(&hint, 0, sizeof(hint));
+ hint.ai_family = PF_UNSPEC;
+ hint.ai_socktype = *socktype;
+ hint.ai_protocol = 0;
+ hint.ai_flags = AI_ADDRCONFIG | AI_NUMERICSERV | AI_PASSIVE;
+ snprintf(buf, sizeof(buf), "%u", port);
- if ((err = getaddrinfo(interface, buf, &hint, &res)) != 0)
- return -1;
-
- for (resw = res; resw != NULL && curlen < *retlen; resw =
resw->ai_next) {
- if (resw->ai_family != PF_INET && resw->ai_family != PF_INET6)
- continue;
- if (resw->ai_protocol != IPPROTO_TCP && resw->ai_protocol !=
IPPROTO_UDP)
- continue;
- if ((sock = socket(resw->ai_family, resw->ai_socktype,
resw->ai_protocol)) < 0)
- continue;
-
- (void) setsockopt(sock, SOL_SOCKET, SO_RCVTIMEO, &tv,
sizeof(tv));
- optval = 1; /* allow takeover */
- (void) setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &optval,
sizeof(optval));
-
- if (bind(sock, resw->ai_addr, resw->ai_addrlen) < 0) {
- close(sock);
- continue;
+ if ((err = getaddrinfo(interface, buf, &hint, &res)) != 0) {
+ fprintf(stderr, "getaddrinfo(%s, %s, ...) failed: %s\n",
+ interface == NULL ? "NULL" : interface,
+ buf, gai_strerror(err));
+ return -1;
}
- if (inet_ntop(resw->ai_family,
- &((struct sockaddr_in
*)resw->ai_addr)->sin_addr,
- saddr, sizeof(saddr)) == NULL)
- snprintf(saddr, sizeof(saddr), "(unknown)");
+ for (resw = res; resw != NULL; resw = resw->ai_next) {
+ if (resw->ai_family != PF_INET && resw->ai_family !=
PF_INET6)
+ continue;
+ if (resw->ai_protocol != IPPROTO_TCP &&
resw->ai_protocol != IPPROTO_UDP)
+ continue;
+ if ((sock = socket(resw->ai_family, resw->ai_socktype,
resw->ai_protocol)) < 0)
+ continue;
- if (resw->ai_protocol == IPPROTO_TCP) {
- if (listen(sock, 3) < 0) { /* backlog of 3, enough? */
+ (void) setsockopt(sock, SOL_SOCKET, SO_RCVTIMEO, &tv,
sizeof(tv));
+ optval = 1; /* allow takeover */
+ (void) setsockopt(sock, SOL_SOCKET, SO_REUSEADDR,
&optval, sizeof(optval));
+ if (resw->ai_family == PF_INET6) {
+ optval = 1;
+ (void) setsockopt(sock, IPPROTO_IPV6,
IPV6_V6ONLY, &optval, sizeof(optval));
+ }
+
+ if (bind(sock, resw->ai_addr, resw->ai_addrlen) < 0) {
close(sock);
continue;
}
- printf("listening on tcp%d %s port %s\n",
- resw->ai_family == PF_INET6 ? 6 : 4,
saddr, buf);
- } else {
- printf("listening on udp%d %s port %s\n",
- resw->ai_family == PF_INET6 ? 6 : 4,
saddr, buf);
+
+ if (inet_ntop(resw->ai_family,
+ &((struct sockaddr_in
*)resw->ai_addr)->sin_addr,
+ saddr, sizeof(saddr)) == NULL)
+ snprintf(saddr, sizeof(saddr), "(unknown)");
+
+ if (resw->ai_protocol == IPPROTO_TCP) {
+ if (listen(sock, 3) < 0) { /* backlog of 3,
enough? */
+ close(sock);
+ continue;
+ }
+ if (curlen_stream < *retlen_stream) {
+ printf("listening on tcp%d %s port
%s\n",
+ resw->ai_family ==
PF_INET6 ? 6 : 4, saddr, buf);
+ ret_stream[curlen_stream++] = sock;
+ }
+ } else {
+ if (curlen_dgram < *retlen_dgram) {
+ printf("listening on udp%d %s port
%s\n",
+ resw->ai_family ==
PF_INET6 ? 6 : 4, saddr, buf);
+ ret_dgram[curlen_dgram++] = sock;
+ }
+ }
}
-
- ret[curlen++] = sock;
+ freeaddrinfo(res);
}
- freeaddrinfo(res);
- if (curlen == 0)
+ if (curlen_stream + curlen_dgram == 0)
return -1;
/* fake loop to simplify breakout below */
- while (curlen < *retlen) {
+ while (curlen_stream < *retlen_stream) {
struct sockaddr_un server;
#ifndef PF_LOCAL
@@ -137,11 +157,12 @@
printf("listening on UNIX socket %s\n", buf);
- ret[curlen++] = sock;
+ ret_stream[curlen_stream++] = sock;
break;
}
- *retlen = curlen;
+ *retlen_stream = curlen_stream;
+ *retlen_dgram = curlen_dgram;
return 0;
}
diff --git a/receptor.h b/receptor.h
index c1c3792..34afe61 100644
--- a/receptor.h
+++ b/receptor.h
@@ -18,7 +18,8 @@
#ifndef RECEPTOR_H
#define RECEPTOR_H 1
-int bindlisten(int ret[], int *retlen, const char *interface, unsigned short
port);
+int bindlisten(int ret_stream[], int *retlen_stream, int ret_dgram[], int
*retlen_dgram, const char *interface, unsigned short port);
+
void destroy_usock(unsigned short port);
#endif
diff --git a/relay.c b/relay.c
index 3ec78bb..5ae4f9c 100644
--- a/relay.c
+++ b/relay.c
@@ -111,8 +111,10 @@
int
main(int argc, char * const argv[])
{
- int sock[] = {0, 0, 0, 0, 0}; /* tcp4, udp4, tcp6, udp6, UNIX */
- int socklen = sizeof(sock) / sizeof(sock[0]);
+ int stream_sock[] = {0, 0, 0}; /* tcp4, tcp6, UNIX */
+ int stream_socklen = sizeof(stream_sock) / sizeof(stream_sock[0]);
+ int dgram_sock[] = {0, 0}; /* udp4, udp6 */
+ int dgram_socklen = sizeof(dgram_sock) / sizeof(dgram_sock[0]);
char id;
server **servers;
dispatcher **workers;
@@ -294,14 +296,23 @@
return 1;
}
- if (bindlisten(sock, &socklen, listeninterface, listenport) < 0) {
+ if (bindlisten(stream_sock, &stream_socklen,
+ dgram_sock, &dgram_socklen,
+ listeninterface, listenport) < 0) {
fprintf(stderr, "failed to bind on port %s:%d: %s\n",
- listeninterface, listenport, strerror(errno));
+ listeninterface == NULL ? "" : listeninterface,
+ listenport, strerror(errno));
return -1;
}
- for (ch = 0; ch < socklen; ch++) {
- if (dispatch_addlistener(sock[ch]) != 0) {
+ for (ch = 0; ch < stream_socklen; ch++) {
+ if (dispatch_addlistener(stream_sock[ch]) != 0) {
fprintf(stderr, "failed to add listener\n");
+ return -1;
+ }
+ }
+ for (ch = 0; ch < dgram_socklen; ch++) {
+ if (dispatch_addlistener_udp(dgram_sock[ch]) != 0) {
+ fprintf(stderr, "failed to listen to datagram
socket\n");
return -1;
}
}
@@ -354,8 +365,8 @@
fprintf(stdout, "[%s] shutting down...\n", fmtnow(nowbuf));
fflush(stdout);
/* make sure we don't accept anything new anymore */
- for (ch = 0; ch < socklen; ch++)
- dispatch_removelistener(sock[ch]);
+ for (ch = 0; ch < stream_socklen; ch++)
+ dispatch_removelistener(stream_sock[ch]);
destroy_usock(listenport);
fprintf(stdout, "[%s] listener for port %u closed\n",
fmtnow(nowbuf), listenport);
diff --git a/router.c b/router.c
index 1932efd..a54ad8c 100644
--- a/router.c
+++ b/router.c
@@ -93,6 +93,8 @@
static cluster *clusters = NULL;
static char keep_running = 1;
+/* custom constant, meant to force regex mode matching */
+#define REG_FORCE 01000000
/**
* Examines pattern and sets matchtype and rule or strmatch in route.
@@ -110,7 +112,12 @@
r->matchtype = CONTAINS;
r->nmatch = 0;
- if (*e == '^') {
+ if (flags & REG_FORCE) {
+ flags &= ~REG_NOSUB;
+ r->matchtype = REGEX;
+ }
+
+ if (*e == '^' && r->matchtype == CONTAINS) {
e++;
r->matchtype = STARTS_WITH;
}
@@ -169,12 +176,14 @@
r->strmatch = strdup(patbuf);
r->pattern = strdup(pat);
} else {
- int ret = regcomp(&r->rule, pat, flags);
+ int ret = regcomp(&r->rule, pat, flags & ~REG_FORCE);
if (ret != 0)
return ret; /* allow use of regerror */
r->strmatch = NULL;
r->pattern = strdup(pat);
- if ((flags & REG_NOSUB) == 0 && r->rule.re_nsub > 0) {
+ if (((flags & REG_NOSUB) == 0 && r->rule.re_nsub > 0) ||
+ flags & REG_FORCE)
+ {
/* we need +1 because position 0 contains the entire
* expression */
r->nmatch = r->rule.re_nsub + 1;
@@ -605,7 +614,10 @@
/* lookup dest */
for (w = clusters; w != NULL; w = w->next) {
- if (strcmp(w->name, dest) == 0)
+ if (w->type != GROUP &&
+ w->type != AGGREGATION &&
+ w->type != REWRITE &&
+ strcmp(w->name, dest) == 0)
break;
}
if (w == NULL) {
@@ -882,7 +894,7 @@
} else {
r = r->next = malloc(sizeof(route));
}
- err = determine_if_regex(r, pat, REG_EXTENDED);
+ err = determine_if_regex(r, pat, REG_EXTENDED |
REG_FORCE);
if (err != 0) {
char ebuf[512];
regerror(err, &r->rule, ebuf, sizeof(ebuf));
--
To view, visit https://gerrit.wikimedia.org/r/180756
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: Ia850be88f3e7ffc8dcd23f1b13f04f14996e28e3
Gerrit-PatchSet: 1
Gerrit-Project: operations/debs/carbon-c-relay
Gerrit-Branch: master
Gerrit-Owner: Filippo Giunchedi <[email protected]>
_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits