Hello.
So, now I know that actions are not reentrant in rsyslog. Therefore, any
single action cannot consume more than one core of CPU. Nowadays, there
are common servers having 24 cores, and this limits our ability to handle
high load. Making all modules thread-safe would be great, but takes huge
amount of effort. And there is much simpler solution.
We can define multiple identical or similar actions and divide the load
between them. Rsyslog has conditional statements to accomplish this.
But, unfortunately, rsyslog does not provide a variable or a function which
could be checked in such statement. Or, possibly, I am just unable to find
it. Basically, all we need is a variable, which values are evenly
distributed in a known range.
Additional use of this method would be to balance load between two or more
different output actions. Say, to use rsyslog as a network load balancer.
Let me introduce mmsequence. It is a message modification module, heavily
based on mmcount. It's purpose is to generate some numbers and
store them in message properties. I'm not a rsyslog guru or a professional
programmer, so please review my code. But, at least, it seemes to work here.
The patch is based on HEAD.
Description:
This module generates numeric sequences of different kinds. It can be used
to count messages up to a limit and to number them. It can generate random
numbers in a given range.
This module is implemented via the output module interface, so it is
called just as an action. The number generated is stored in "CEE/Lumberjack"
property of the message.
Action Parameters:
- mode "random" or "instance" or "key"
Specifies mode of the action. Default mode is "random", which
generates uniformly distributed integer numbers in a range defined
by "from" and "to".
In "instance" mode, the action produces a counter in range
[from, to).
This counter is specific to this action instance.
In "key" mode, the counter can be shared between multiple
instances.
This counter is identified by a name, which is defined with "key"
parameter.
- from [non-negative integer], default "0"
Starting value for counters and lower margin for random generator.
- to [positive integer], default "2"
Upper margin for all sequences. Note that this margin is not
inclusive. When next value for a counter is equal or greater than
this parameter, the counter resets to the starting value.
- step [non-negative integer], default "1"
Increment for counters. If step is "0", it can be used to fetch
current value without modification. The latter not applies to
"random" mode. This is useful in "key" mode or to get constant
values in "instance" mode.
- key [word], default ""
Name of the global counter which is used in this action.
- var [word], default "!mmsequence"
Name of the message property where the number will be stored.
Should start with "!".
Sample:
# load balance
Ruleset(
name="logd"
queue.workerthreads="5"
){
Action(
type="mmsequence"
mode="instance"
from="0"
to="2"
var="!seq"
)
if $!seq == "0" then {
Action(
type="mmnormalize"
userawmsg="on"
rulebase="/etc/rsyslog.d/rules.rb"
)
} else {
Action(
type="mmnormalize"
userawmsg="on"
rulebase="/etc/rsyslog.d/rules.rb"
)
}
# output logic here
}
# generate random numbers
action(
type="mmsequence"
mode="random"
to="100"
var="!rndz"
)
# count from 0 to 99
action(
type="mmsequence"
mode="instance"
to="100"
var="!cnt1"
)
# the same as before but the counter is global
action(
type="mmsequence"
mode="key"
key="key1"
to="100"
var="!cnt2"
)
# count specific messages but place the counter in every message
if $msg contains "txt" then
action(
type="mmsequence"
mode="key"
to="100"
var="!cnt3"
)
else
action(
type="mmsequence"
mode="key"
to="100"
step="0"
var="!cnt3"
key=""
)
Legacy Configuration Directives:
Not supported.
--
Pavel Levshin
diff --git a/Makefile.am b/Makefile.am
index c5e41c7..6eb7b5b 100644
--- a/Makefile.am
+++ b/Makefile.am
@@ -249,6 +249,10 @@ if ENABLE_MMCOUNT
SUBDIRS += plugins/mmcount
endif
+if ENABLE_MMSEQUENCE
+SUBDIRS += plugins/mmsequence
+endif
+
if ENABLE_MMFIELDS
SUBDIRS += plugins/mmfields
endif
diff --git a/configure.ac b/configure.ac
index 2f23147..6769ec0 100644
--- a/configure.ac
+++ b/configure.ac
@@ -981,8 +981,8 @@ AM_CONDITIONAL(ENABLE_MMUTF8FIX, test x$enable_mmutf8fix =
xyes)
AC_ARG_ENABLE(mmcount,
[AS_HELP_STRING([--enable-mmcount],[Enable message counting
@<:@default=no@:>@])],
[case "${enableval}" in
- yes) enable_xmpp="yes" ;;
- no) enable_xmpp="no" ;;
+ yes) enable_mmcount="yes" ;;
+ no) enable_mmcount="no" ;;
*) AC_MSG_ERROR(bad value ${enableval} for --enable-mmcount) ;;
esac],
[enable_mmcount=no]
@@ -990,6 +990,19 @@ AC_ARG_ENABLE(mmcount,
AM_CONDITIONAL(ENABLE_MMCOUNT, test x$enable_mmcount = xyes)
+# mmsequence
+AC_ARG_ENABLE(mmsequence,
+ [AS_HELP_STRING([--enable-mmsequence],[Enable sequence generator
@<:@default=no@:>@])],
+ [case "${enableval}" in
+ yes) enable_mmsequence="yes" ;;
+ no) enable_mmsequence="no" ;;
+ *) AC_MSG_ERROR(bad value ${enableval} for --enable-mmsequence) ;;
+ esac],
+ [enable_mmsequence=no]
+)
+AM_CONDITIONAL(ENABLE_MMSEQUENCE, test x$enable_mmsequence = xyes)
+
+
# mmfields
AC_ARG_ENABLE(mmfields,
[AS_HELP_STRING([--enable-mmfields],[Enable building mmfields support
@<:@default=no@:>@])],
@@ -1552,6 +1565,7 @@ AC_CONFIG_FILES([Makefile \
plugins/mmanon/Makefile \
plugins/mmutf8fix/Makefile \
plugins/mmcount/Makefile \
+ plugins/mmsequence/Makefile \
plugins/mmfields/Makefile \
plugins/mmpstrucdata/Makefile \
plugins/mmrfc5424addhmac/Makefile \
@@ -1621,6 +1635,7 @@ echo " mmsnmptrapd module will be compiled:
$enable_mmsnmptrapd"
echo " mmutf8fix enabled: $enable_mmutf8fix"
echo " mmrfc5424addhmac enabled: $enable_mmrfc5424addhmac"
echo " mmpstrucdata enabled: $enable_mmpstrucdata"
+echo " mmsequence enabled: $enable_mmsequence"
echo
echo "---{ strgen modules }---"
echo " sm_cust_bindcdr module will be compiled: $enable_sm_cust_bindcdr"
diff --git a/plugins/mmsequence/Makefile.am b/plugins/mmsequence/Makefile.am
new file mode 100644
index 0000000..543d6d8
--- /dev/null
+++ b/plugins/mmsequence/Makefile.am
@@ -0,0 +1,8 @@
+pkglib_LTLIBRARIES = mmsequence.la
+
+mmsequence_la_SOURCES = mmsequence.c
+mmsequence_la_CPPFLAGS = $(RSRT_CFLAGS) $(PTHREADS_CFLAGS)
+mmsequence_la_LDFLAGS = -module -avoid-version
+mmsequence_la_LIBADD =
+
+EXTRA_DIST =
diff --git a/plugins/mmsequence/mmsequence.c b/plugins/mmsequence/mmsequence.c
new file mode 100644
index 0000000..7fad09c
--- /dev/null
+++ b/plugins/mmsequence/mmsequence.c
@@ -0,0 +1,376 @@
+/* mmsequence.c
+ * Generate a number based on some sequence.
+ *
+ * Copyright 2013 [email protected].
+ *
+ * Based on: mmcount.c
+ * Copyright 2013 Red Hat Inc.
+ *
+ * This file is part of rsyslog.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * -or-
+ * see COPYING.ASL20 in the source distribution
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "config.h"
+#include "rsyslog.h"
+#include <stdio.h>
+#include <stdarg.h>
+#include <stdlib.h>
+#include <string.h>
+#include <assert.h>
+#include <signal.h>
+#include <errno.h>
+#include <unistd.h>
+#include <stdint.h>
+#include <time.h>
+#include <json/json.h>
+#include <pthread.h>
+#include "conf.h"
+#include "syslogd-types.h"
+#include "srUtils.h"
+#include "template.h"
+#include "module-template.h"
+#include "errmsg.h"
+#include "hashtable.h"
+
+#define JSON_VAR_NAME "!mmsequence"
+
+enum mmSequenceModes {
+ mmSequenceRandom,
+ mmSequencePerInstance,
+ mmSequencePerKey
+};
+
+MODULE_TYPE_OUTPUT
+MODULE_TYPE_NOKEEP
+MODULE_CNFNAME("mmsequence")
+
+
+DEFobjCurrIf(errmsg);
+DEF_OMOD_STATIC_DATA
+
+/* config variables */
+
+typedef struct _instanceData {
+ enum mmSequenceModes mode;
+ int valueFrom;
+ int valueTo;
+ int step;
+ unsigned int seed;
+ int value;
+ char *pszKey;
+ char *pszVar;
+} instanceData;
+
+struct modConfData_s {
+ rsconf_t *pConf; /* our overall config object */
+};
+static modConfData_t *loadModConf = NULL;/* modConf ptr to use for the current
load process */
+static modConfData_t *runModConf = NULL;/* modConf ptr to use for the current
exec process */
+
+
+/* tables for interfacing with the v6 config system */
+/* action (instance) parameters */
+static struct cnfparamdescr actpdescr[] = {
+ { "mode", eCmdHdlrGetWord, 0 },
+ { "from", eCmdHdlrNonNegInt, 0 },
+ { "to", eCmdHdlrPositiveInt, 0 },
+ { "step", eCmdHdlrNonNegInt, 0 },
+ { "key", eCmdHdlrGetWord, 0 },
+ { "var", eCmdHdlrGetWord, 0 },
+};
+static struct cnfparamblk actpblk =
+ { CNFPARAMBLK_VERSION,
+ sizeof(actpdescr)/sizeof(struct cnfparamdescr),
+ actpdescr
+ };
+
+/* table for key-counter pairs */
+static struct hashtable *ght;
+pthread_mutex_t ght_mutex = PTHREAD_MUTEX_INITIALIZER;
+
+BEGINbeginCnfLoad
+CODESTARTbeginCnfLoad
+ loadModConf = pModConf;
+ pModConf->pConf = pConf;
+ENDbeginCnfLoad
+
+BEGINendCnfLoad
+CODESTARTendCnfLoad
+ENDendCnfLoad
+
+BEGINcheckCnf
+CODESTARTcheckCnf
+ENDcheckCnf
+
+BEGINactivateCnf
+CODESTARTactivateCnf
+ runModConf = pModConf;
+ENDactivateCnf
+
+BEGINfreeCnf
+CODESTARTfreeCnf
+ENDfreeCnf
+
+
+BEGINcreateInstance
+CODESTARTcreateInstance
+ENDcreateInstance
+
+
+BEGINisCompatibleWithFeature
+CODESTARTisCompatibleWithFeature
+ENDisCompatibleWithFeature
+
+
+BEGINfreeInstance
+CODESTARTfreeInstance
+ENDfreeInstance
+
+
+static inline void
+setInstParamDefaults(instanceData *pData)
+{
+ pData->mode = mmSequenceRandom;
+ pData->valueFrom = 0;
+ pData->valueTo = 2;
+ pData->step = 1;
+ pData->pszKey = "";
+ pData->pszVar = JSON_VAR_NAME;
+}
+
+BEGINnewActInst
+ struct cnfparamvals *pvals;
+ int i;
+CODESTARTnewActInst
+ DBGPRINTF("newActInst (mmsequence)\n");
+ if((pvals = nvlstGetParams(lst, &actpblk, NULL)) == NULL) {
+ ABORT_FINALIZE(RS_RET_MISSING_CNFPARAMS);
+ }
+
+ CODE_STD_STRING_REQUESTnewActInst(1)
+ CHKiRet(OMSRsetEntry(*ppOMSR, 0, NULL, OMSR_TPL_AS_MSG));
+ CHKiRet(createInstance(&pData));
+ setInstParamDefaults(pData);
+
+ for(i = 0 ; i < actpblk.nParams ; ++i) {
+ if(!pvals[i].bUsed)
+ continue;
+ if(!strcmp(actpblk.descr[i].name, "mode")) {
+ if(!es_strbufcmp(pvals[i].val.d.estr, (uchar*)"random",
+ sizeof("random")-1)) {
+ pData->mode = mmSequenceRandom;
+ } else if (!es_strbufcmp(pvals[i].val.d.estr,
(uchar*)"instance",
+ sizeof("instance")-1)) {
+ pData->mode = mmSequencePerInstance;
+ } else if (!es_strbufcmp(pvals[i].val.d.estr,
(uchar*)"key",
+ sizeof("key")-1)) {
+ pData->mode = mmSequencePerKey;
+ } else {
+ char *cstr = es_str2cstr(pvals[i].val.d.estr,
NULL);
+ errmsg.LogError(0, RS_RET_INVLD_MODE,
+ "mmsequence: invalid mode '%s' -
ignored",
+ cstr);
+ free(cstr);
+ }
+ continue;
+ }
+ if(!strcmp(actpblk.descr[i].name, "from")) {
+ pData->valueFrom = pvals[i].val.d.n;
+ continue;
+ }
+ if(!strcmp(actpblk.descr[i].name, "to")) {
+ pData->valueTo = pvals[i].val.d.n;
+ continue;
+ }
+ if(!strcmp(actpblk.descr[i].name, "step")) {
+ pData->step = pvals[i].val.d.n;
+ continue;
+ }
+ if(!strcmp(actpblk.descr[i].name, "key")) {
+ pData->pszKey = es_str2cstr(pvals[i].val.d.estr, NULL);
+ continue;
+ }
+ if(!strcmp(actpblk.descr[i].name, "var")) {
+ pData->pszVar = es_str2cstr(pvals[i].val.d.estr, NULL);
+ continue;
+ }
+ dbgprintf("mmsequence: program error, non-handled "
+ "param '%s'\n", actpblk.descr[i].name);
+ }
+ switch(pData->mode) {
+ case mmSequenceRandom:
+ pData->seed = (unsigned int)(intptr_t)pData ^ (unsigned
int)time(NULL);
+ break;
+ case mmSequencePerInstance:
+ pData->value = pData->valueTo;
+ break;
+ case mmSequencePerKey:
+ if (pthread_mutex_lock(&ght_mutex)) {
+ DBGPRINTF("mmsequence: mutex lock has failed!\n");
+ ABORT_FINALIZE(RS_RET_ERR);
+ }
+ if (ght == NULL) {
+ if(NULL == (ght = create_hashtable(100,
hash_from_string, key_equals_string, NULL))) {
+ pthread_mutex_unlock(&ght_mutex);
+ DBGPRINTF("mmsequence: error creating hash
table!\n");
+ ABORT_FINALIZE(RS_RET_ERR);
+ }
+ }
+ pthread_mutex_unlock(&ght_mutex);
+ break;
+ default:
+ errmsg.LogError(0, RS_RET_INVLD_MODE,
+ "mmsequence: this mode is not currently
implemented");
+ }
+
+CODE_STD_FINALIZERnewActInst
+ cnfparamvalsDestruct(pvals, &actpblk);
+ENDnewActInst
+
+
+BEGINdbgPrintInstInfo
+CODESTARTdbgPrintInstInfo
+ENDdbgPrintInstInfo
+
+
+BEGINtryResume
+CODESTARTtryResume
+ENDtryResume
+
+static int *
+getCounter(struct hashtable *ht, char *str, int initial) {
+ int *pCounter;
+ char *pStr;
+
+ pCounter = hashtable_search(ht, str);
+ if(pCounter) {
+ return pCounter;
+ }
+
+ /* counter is not found for the str, so add new entry and
+ return the counter */
+ if(NULL == (pStr = strdup(str))) {
+ DBGPRINTF("mmsequence: memory allocation for key failed\n");
+ return NULL;
+ }
+
+ if(NULL == (pCounter = (int*)malloc(sizeof(int)))) {
+ DBGPRINTF("mmsequence: memory allocation for value failed\n");
+ free(pStr);
+ return NULL;
+ }
+ *pCounter = initial;
+
+ if(!hashtable_insert(ht, pStr, pCounter)) {
+ DBGPRINTF("mmsequence: inserting element into hashtable
failed\n");
+ free(pStr);
+ free(pCounter);
+ return NULL;
+ }
+ return pCounter;
+}
+
+
+BEGINdoAction
+ msg_t *pMsg;
+ struct json_object *json = NULL;
+ int val = 0;
+ int *pCounter;
+CODESTARTdoAction
+ pMsg = (msg_t*) ppString[0];
+
+ switch(pData->mode) {
+ case mmSequenceRandom:
+ val = pData->valueFrom + (rand_r(&pData->seed) %
+ (pData->valueTo - pData->valueFrom));
+ break;
+ case mmSequencePerInstance:
+ pData->value += pData->step;
+ if (pData->value >= pData->valueTo) {
+ pData->value = pData->valueFrom;
+ }
+ val = pData->value;
+ break;
+ case mmSequencePerKey:
+ if (!pthread_mutex_lock(&ght_mutex)) {
+ pCounter = getCounter(ght, pData->pszKey,
pData->valueTo);
+ if(pCounter) {
+ *pCounter += pData->step;
+ if (*pCounter >= pData->valueTo) {
+ *pCounter = pData->valueFrom;
+ }
+ val = *pCounter;
+ } else {
+ errmsg.LogError(0, RS_RET_NOT_FOUND,
+ "mmsequence: unable to fetch
the counter from hash");
+ }
+ pthread_mutex_unlock(&ght_mutex);
+ } else {
+ errmsg.LogError(0, RS_RET_ERR,
+ "mmsequence: mutex lock has failed!");
+ }
+
+ break;
+ default:
+ errmsg.LogError(0, RS_RET_NOT_IMPLEMENTED,
+ "mmsequence: this mode is not currently
implemented");
+ }
+
+ /* finalize_it: */
+ json = json_object_new_int(val);
+ if(json) {
+ msgAddJSON(pMsg, (uchar *)pData->pszVar, json);
+ } else {
+ errmsg.LogError(0, RS_RET_OBJ_CREATION_FAILED,
+ "mmsequence: no proper json object");
+ }
+ENDdoAction
+
+
+BEGINparseSelectorAct
+CODESTARTparseSelectorAct
+CODE_STD_STRING_REQUESTparseSelectorAct(1)
+ if(strncmp((char*) p, ":mmsequence:", sizeof(":mmsequence:") - 1)) {
+ errmsg.LogError(0, RS_RET_LEGA_ACT_NOT_SUPPORTED,
+ "mmsequence supports only v6+ config format, use: "
+ "action(type=\"mmsequence\" ...)");
+ }
+ ABORT_FINALIZE(RS_RET_CONFLINE_UNPROCESSED);
+CODE_STD_FINALIZERparseSelectorAct
+ENDparseSelectorAct
+
+
+BEGINmodExit
+CODESTARTmodExit
+ objRelease(errmsg, CORE_COMPONENT);
+ENDmodExit
+
+
+BEGINqueryEtryPt
+CODESTARTqueryEtryPt
+CODEqueryEtryPt_STD_OMOD_QUERIES
+CODEqueryEtryPt_STD_CONF2_OMOD_QUERIES
+CODEqueryEtryPt_STD_CONF2_QUERIES
+ENDqueryEtryPt
+
+
+
+BEGINmodInit()
+CODESTARTmodInit
+ *ipIFVersProvided = CURR_MOD_IF_VERSION; /* we only support the current
interface specification */
+CODEmodInit_QueryRegCFSLineHdlr
+ DBGPRINTF("mmsequence: module compiled with rsyslog version %s.\n",
VERSION);
+ CHKiRet(objUse(errmsg, CORE_COMPONENT));
+ENDmodInit
_______________________________________________
rsyslog mailing list
http://lists.adiscon.net/mailman/listinfo/rsyslog
http://www.rsyslog.com/professional-services/
What's up with rsyslog? Follow https://twitter.com/rgerhards
NOTE WELL: This is a PUBLIC mailing list, posts are ARCHIVED by a myriad of
sites beyond our control. PLEASE UNSUBSCRIBE and DO NOT POST if you DON'T LIKE
THAT.