Hi, Peter
Suppose there are two templates. So, nfdump expect to have data packets
like this:
1 data_templ1 sequence=1
2 data_templ1 sequence=2
3 data_templ2 sequence=3
4 data_templ1 sequence=4
5 data_templ2 sequence=5
...
and so on.
But Juniper collector send data like this:
1 data_templ1 sequence=1
2 data_templ1 sequence=2
3 data_templ2 sequence=1
4 data_templ1 sequence=3
5 data_templ2 sequence=2
...
So nfdump treat it like packet loss.
Look in the attachment for patch I made to solve this.
On 04/03/2015 01:24 PM, Peter Haag wrote:
Hi Dmirty,
Hmm .. I don't understand what you mean. Can you elaborate this a bit in more
details? As IPFIX implementations are knd of moving targets, it could well be,
that some changes are required.
Cheers
- Peter
On 26.01.15 14:45, Kasyanov Dmitry wrote:
Hello,
Thank you for nfdump project, it's great work!
But I encountered problem with Juniper IPFIX implementation. In my case,
there are one template and one options template. The problem is, that
switch assign separate sequence number counter for data replying to
template and options template. So this situation nfdump handle as packet
loss. I'm not sure if it ok to have separate sequence counter for data
packet with different flow set. Is this behavior correspond to RFC or it
just Juniper realization?
And could you recommend how to gracefully handle this problem? First
solution i see, is to bound sequence number and template id. But that
would make a problem in case of switch which have one sequence counter
for each data flow set.
Sincerely
Dmirty Kasyanov
------------------------------------------------------------------------------
Dive into the World of Parallel Programming. The Go Parallel Website,
sponsored by Intel and developed in partnership with Slashdot Media, is your
hub for all things parallel software development, from weekly thought
leadership blogs to news, videos, case studies, tutorials and more. Take a
look and join the conversation now. http://goparallel.sourceforge.net/
_______________________________________________
Nfdump-discuss mailing list
Nfdump-discuss@lists.sourceforge.net
https://lists.sourceforge.net/lists/listinfo/nfdump-discuss
--
С уважением,
Касьянов Дмитрий
diff -ruN nfdump-1.6.13.orig/nfdump-1.6.13/bin/Makefile.in
nfdump-1.6.13/nfdump-1.6.13/bin/Makefile.in
--- nfdump-1.6.13.orig/nfdump-1.6.13/bin/Makefile.in 2015-01-22
15:39:25.000000000 +0300
+++ nfdump-1.6.13/nfdump-1.6.13/bin/Makefile.in 2015-02-16 18:30:20.609118567
+0300
@@ -757,7 +757,8 @@
EXTRA_DIST = inline.c collector_inline.c nffile_inline.c \
nfdump_inline.c heapsort_inline.c applybits_inline.c test.sh \
nfdump.test.out parse_csv.pl AddExtension.txt nfdump.test.diff
-CLEANFILES = lex.yy.c grammar.c grammar.h scanner.c scanner.h
+#CLEANFILES = lex.yy.c grammar.c grammar.h scanner.c scanner.h
+CLEANFILES = lex.yy.c
all: $(BUILT_SOURCES)
$(MAKE) $(AM_MAKEFLAGS) all-am
diff -ruN nfdump-1.6.13.orig/nfdump-1.6.13/bin/ipfix.c
nfdump-1.6.13/nfdump-1.6.13/bin/ipfix.c
--- nfdump-1.6.13.orig/nfdump-1.6.13/bin/ipfix.c 2015-01-22
15:39:25.000000000 +0300
+++ nfdump-1.6.13/nfdump-1.6.13/bin/ipfix.c 2015-02-16 19:42:07.069237512
+0300
@@ -64,6 +64,9 @@
#include "exporter.h"
#include "ipfix.h"
+#include "inline.c"
+#include "nffile_inline.c"
+
#ifndef DEVEL
# define dbg_printf(...) /* printf(__VA_ARGS__) */
#else
@@ -81,6 +84,9 @@
#define GET_OPTION_TEMPLATE_FIELD_COUNT(p) (Get_val16((void *)((p) + 2)))
#define GET_OPTION_TEMPLATE_SCOPE_FIELD_COUNT(p)
(Get_val16((void *)((p) + 4)))
+/* global variables */
+IPFIXVendorImpl IPFIXimpl = RFC;
+
/* module limited globals */
/*
@@ -161,6 +167,131 @@
} input_translation_t;
/*
+ * In Juniper realisation every template have own sequence counter.
+ */
+typedef struct jun_sequence_s {
+ uint32_t sequence;
+ uint16_t template_id;
+ struct jun_sequence_s* next;
+} jun_sequence_t;
+
+/*
+ * Unified data for handle sequences
+ */
+typedef struct sequence_handler_s {
+ // Juniper sequence implementation
+ jun_sequence_t* jseq_list_head;
+ jun_sequence_t* jseq_current;
+
+ // RFC
+ uint32_t rfc_sequence;
+} sequence_handler_t;
+
+jun_sequence_t* find_jun_sequence(uint16_t flowset_id, sequence_handler_t*
s_handler) {
+ if( s_handler->jseq_current &&
+ s_handler->jseq_current->template_id == flowset_id )
+ {
+ return s_handler->jseq_current;
+ }
+ else {
+ jun_sequence_t* js_curr = s_handler->jseq_list_head;
+ while( js_curr ) {
+ if(js_curr->template_id == flowset_id) {
+ s_handler->jseq_current = js_curr;
+ return js_curr;
+ }
+ js_curr = js_curr->next;
+ }
+ }
+ return NULL;
+}
+
+void delete_jun_sequence(uint16_t flowset_id, sequence_handler_t* s_handler) {
+ jun_sequence_t* js_curr = s_handler->jseq_list_head;
+ jun_sequence_t* js_prev = NULL;
+ while( js_curr ) {
+ if(js_curr->template_id == flowset_id) {
+ if(js_prev == NULL)
+ s_handler->jseq_list_head = js_curr->next;
+ else
+ js_prev->next = js_curr->next;
+
+ if(s_handler->jseq_current == js_curr)
+ s_handler->jseq_current = NULL;
+
+ free(js_curr);
+ }
+ js_prev = js_curr;
+ js_curr = js_curr->next;
+ }
+}
+
+/*
+ * Set of functions to handle sequences
+ */
+inline uint32_t* get_sequence_p(uint16_t flowset_id, sequence_handler_t*
s_handler) {
+ switch(IPFIXimpl) {
+ case Juniper: {
+ jun_sequence_t* jseq = find_jun_sequence(flowset_id, s_handler);
+ if(jseq)
+ return &(jseq->sequence);
+ else
+ return NULL;
+ break;
+ }
+ case RFC:
+ deafult:
+ return &(s_handler->rfc_sequence);
+ }
+}
+
+inline void init_sequence_handler(sequence_handler_t* s_handler) {
+ s_handler->jseq_list_head = NULL;
+ s_handler->jseq_current = NULL;
+ s_handler->rfc_sequence = 0;
+}
+
+inline uint32_t get_sequence(uint16_t flowset_id, sequence_handler_t*
s_handler) {
+ uint32_t* seq_p = get_sequence_p(flowset_id, s_handler);
+ return seq_p ? *seq_p : 0;
+}
+
+inline void increment_sequence(uint16_t flowset_id, sequence_handler_t*
s_handler) {
+ uint32_t* seq_p = get_sequence_p(flowset_id, s_handler);
+ if(seq_p) {
+ *seq_p = *seq_p + 1;
+ }
+}
+
+void update_sequence(uint16_t flowset_id, uint32_t new_seq,
sequence_handler_t* s_handler) {
+ uint32_t* seq_p = get_sequence_p(flowset_id, s_handler);
+ if(seq_p) {
+ *seq_p = new_seq;
+ }
+ else {
+ if( IPFIXimpl == Juniper ) {
+ // Add new element to list
+ jun_sequence_t* jnew = (jun_sequence_t*) malloc(
sizeof(jun_sequence_t) );
+ if(jnew == NULL) {
+ LogError("Process_ipfix: Panic! malloc() %s line %d: %s",
__FILE__, __LINE__, strerror (errno));
+ return;
+ }
+ jun_sequence_t* jseq_second = s_handler->jseq_list_head;
+ s_handler->jseq_list_head = jnew;
+ s_handler->jseq_list_head->sequence = new_seq;
+ s_handler->jseq_list_head->template_id = flowset_id;
+ s_handler->jseq_list_head->next = jseq_second;
+ }
+ }
+}
+
+inline void disable_sequence(uint16_t flowset_id, sequence_handler_t*
s_handler) {
+ if( IPFIXimpl == Juniper ) {
+ delete_jun_sequence(flowset_id, s_handler);
+ }
+}
+
+/*
* All Obervation Domains from all exporter are stored in a linked list
* which uniquely can identify each exporter/Observation Domain
*/
@@ -181,6 +312,7 @@
uint32_t ExportTime;
// Current sequence number
+ sequence_handler_t seq_handler;
uint32_t PacketSequence;
// statistics
@@ -196,6 +328,75 @@
} exporter_ipfix_domain_t;
+int is_flow_ready_for_counting(exporter_ipfix_domain_t* exporter, uint16_t
flowset_id ) {
+ if( IPFIXimpl == Juniper ) {
+ return ( get_sequence_p(flowset_id, &(exporter->seq_handler)) != NULL
) ? 1 : 0;
+ } else if( IPFIXimpl == Juniper ) {
+ return (exporter->DataRecords != 0) ? 1 : 0;
+ }
+}
+
+int check_message_sequence(void* in_buff, exporter_ipfix_domain_t* exporter,
FlowSource_t *fs) {
+ ipfix_header_t* ipfix_header = (ipfix_header_t *)in_buff;
+ uint32_t sequence = ntohl(ipfix_header->LastSequence);
+
+ // in case of normal RFC implementation, we don't care about flowset_id
+ uint16_t flowset_id = 0;
+ if( IPFIXimpl == Juniper ) {
+ void* flowset_header = (void *)ipfix_header + IPFIX_HEADER_LENGTH;
+ flowset_id = GET_FLOWSET_ID(flowset_header);
+
+ // in case of template or options template search sequence by template
id
+ switch(flowset_id) {
+ case IPFIX_TEMPLATE_FLOWSET_ID: {
+ flowset_id = GET_TEMPLATE_ID(flowset_header + 4);
+ update_sequence(flowset_id, sequence, &(exporter->seq_handler));
+ break;
+ }
+ case IPFIX_OPTIONS_FLOWSET_ID:
+ flowset_id = GET_OPTION_TEMPLATE_ID(flowset_header + 4);
+ update_sequence(flowset_id, sequence, &(exporter->seq_handler));
+ break;
+ deafult:
+ break;
+ }
+ }
+
+ uint32_t exp_sequence = get_sequence(flowset_id, &(exporter->seq_handler));
+ if( is_flow_ready_for_counting(exporter, flowset_id) && sequence !=
exp_sequence )
+ {
+ fs->nffile->stat_record->sequence_failure++;
+ exporter->sequence_failure++;
+
+ // QQ: get IP
+ #define IP_STRING_LEN 40
+ char ipstr[IP_STRING_LEN];
+ if ( exporter->info.sa_family == AF_INET ) {
+ uint32_t _ip = htonl(exporter->info.ip.v4);
+ inet_ntop(AF_INET, &_ip, ipstr, sizeof(ipstr));
+ } else if ( exporter->info.sa_family == AF_INET6 ) {
+ uint64_t _ip[2];
+ _ip[0] = htonll(exporter->info.ip.v6[0]);
+ _ip[1] = htonll(exporter->info.ip.v6[1]);
+ inet_ntop(AF_INET6, &_ip, ipstr, sizeof(ipstr));
+ } else {
+ strncpy(ipstr, "<unknown>", IP_STRING_LEN);
+ }
+
+ // QQ: write sequence error to logfile
+ LogError("Process_ipfix: [SysID: %u, IP: %s, Domain: %u] Sequence
error: flowset=%d, expecting=%lu, received=%lu, lost=%lli",
+ exporter->info.sysid,
+ ipstr,
+ exporter->info.id,
+ flowset_id,
+ exp_sequence,
+ sequence,
+ (int64_t)sequence-exp_sequence
+ );
+
+ update_sequence(flowset_id, sequence, &(exporter->seq_handler));
+ }
+}
static struct ipfix_element_map_s {
uint16_t id; // IPFIX element id
@@ -313,8 +514,8 @@
static inline void Process_ipfix_template_withdraw(exporter_ipfix_domain_t
*exporter, void *DataPtr, uint32_t size_left, FlowSource_t *fs);
-#include "inline.c"
-#include "nffile_inline.c"
+//#include "inline.c"
+//#include "nffile_inline.c"
int Init_IPFIX(void) {
int i;
@@ -387,6 +588,7 @@
(*e)->sequence_failure = 0;
(*e)->next = NULL;
(*e)->sampler = NULL;
+ init_sequence_handler( &((*e)->seq_handler) );
FlushInfoExporter(fs, &((*e)->info));
@@ -395,6 +597,9 @@
syslog(LOG_INFO, "Process_ipfix: New exporter: SysID: %u, Observation
domain %u from: %s\n",
(*e)->info.sysid, ObservationDomain, ipstr);
+ // QQ: write to logfile
+ LogInfo("Process_ipfix: New exporter: SysID: %u, IP: %s, Domain: %u",
+ (*e)->info.sysid, ipstr, ObservationDomain);
return (*e);
@@ -585,6 +790,31 @@
table = GetTranslationTable(exporter, id);
if ( !table ) {
syslog(LOG_INFO, "Process_ipfix: [%u] Add template %u",
exporter->info.id, id);
+
+
+ // QQ: get IP
+ #define IP_STRING_LEN 40
+ char ipstr[IP_STRING_LEN];
+ if ( exporter->info.sa_family == AF_INET ) {
+ uint32_t _ip = htonl(exporter->info.ip.v4);
+ inet_ntop(AF_INET, &_ip, ipstr, sizeof(ipstr));
+ } else if ( exporter->info.sa_family == AF_INET6 ) {
+ uint64_t _ip[2];
+ _ip[0] = htonll(exporter->info.ip.v6[0]);
+ _ip[1] = htonll(exporter->info.ip.v6[1]);
+ inet_ntop(AF_INET6, &_ip, ipstr, sizeof(ipstr));
+ } else {
+ strncpy(ipstr, "<unknown>", IP_STRING_LEN);
+ }
+
+ // QQ: write new template info to logfile
+ LogInfo("Process_ipfix: [SysID: %u, IP: %s, Domain: %u] Add
template %u",
+ exporter->info.sysid,
+ ipstr,
+ exporter->info.id,
+ id
+ );
+
table = add_translation_table(exporter, id);
if ( !table ) {
return NULL;
@@ -1316,7 +1546,8 @@
return;
}
processed_records++;
- exporter->PacketSequence++;
+ increment_sequence(table->id, &(exporter->seq_handler));
+// exporter->PacketSequence++;
// map file record to output buffer
data_record = (common_record_t *)fs->nffile->buff_ptr;
@@ -1639,6 +1870,9 @@
dbg_printf("[%u] Sequence: %u\n", ObservationDomain, Sequence);
+ // QQ: Check sequence
+ check_message_sequence(in_buff, exporter, fs);
+/*
// sequence check
// 2^32 wrap is handled automatically as both counters overflow
if ( Sequence != exporter->PacketSequence ) {
@@ -1648,10 +1882,10 @@
exporter->sequence_failure++;
dbg_printf("[%u] Sequence check failed: last seq: %u,
seq %u\n",
exporter->info.id, Sequence,
exporter->PacketSequence);
- /* maybee to noise onbuggy exporters
- syslog(LOG_ERR, "Process_ipfix [%u] Sequence error:
last seq: %u, seq %u\n",
- info.id, exporter->LastSequence, Sequence);
- */
+ // maybee to noise onbuggy exporters
+ //syslog(LOG_ERR, "Process_ipfix [%u] Sequence error:
last seq: %u, seq %u\n",
+ //info.id, exporter->LastSequence, Sequence);
+ //
} else {
dbg_printf("[%u] Sync Sequence: %u\n",
exporter->info.id, Sequence);
}
@@ -1659,7 +1893,7 @@
} else {
dbg_printf("[%u] Sequence check ok\n", exporter->info.id);
}
-
+*/
// iterate over all set
flowset_length = 0;
while (size_left) {
diff -ruN nfdump-1.6.13.orig/nfdump-1.6.13/bin/ipfix.h
nfdump-1.6.13/nfdump-1.6.13/bin/ipfix.h
--- nfdump-1.6.13.orig/nfdump-1.6.13/bin/ipfix.h 2015-01-22
15:39:25.000000000 +0300
+++ nfdump-1.6.13/nfdump-1.6.13/bin/ipfix.h 2015-02-16 18:30:20.619118657
+0300
@@ -49,6 +49,11 @@
} ipfix_header_t;
#define IPFIX_HEADER_LENGTH sizeof(ipfix_header_t)
+/* Correctness of vendor IPFIX implemetation */
+typedef enum {
+ RFC,
+ Juniper /* Every template have separate sequence counter */
+} IPFIXVendorImpl;
/*
Message Header Field Descriptions:
diff -ruN nfdump-1.6.13.orig/nfdump-1.6.13/bin/nfcapd.c
nfdump-1.6.13/nfdump-1.6.13/bin/nfcapd.c
--- nfdump-1.6.13.orig/nfdump-1.6.13/bin/nfcapd.c 2015-01-22
15:39:25.000000000 +0300
+++ nfdump-1.6.13/nfdump-1.6.13/bin/nfcapd.c 2015-02-16 18:30:20.622118684
+0300
@@ -128,6 +128,11 @@
extern uint32_t default_sampling; // the default sampling rate when nothing
else applies. set by -S
extern uint32_t overwrite_sampling; // unconditionally overwrite sampling
rate with given sampling rate -S
+extern int use_filelog;
+extern char* logfile_path;
+
+extern IPFIXVendorImpl IPFIXimpl;
+
// Define a generic type to get data from socket or pcap file
typedef ssize_t (*packet_function_t)(int, void *, size_t, int, struct sockaddr
*, socklen_t *);
@@ -183,6 +188,10 @@
"-T\t\tInclude extension tags in
records.\n"
"-4\t\tListen on IPv4 (default).\n"
"-6\t\tListen on IPv6.\n"
+ "-L filepath\tAdd logging to file.\n"
+ "-J type\t\tUse vendor specific
implementation of RFC:\n"
+ " \t\t\tRFC \t- strict RFC
implemntation (default)\n"
+ " \t\t\tJuniperIPFIX\t- every template
have indepentent sequence counter,\n"
"-V\t\tPrint version and exit.\n"
, name);
} // End of usage
@@ -809,7 +818,7 @@
extension_tags = DefaultExtensions;
dynsrcdir = NULL;
- while ((c = getopt(argc, argv,
"46ef:whEVI:DB:b:j:l:M:n:p:P:R:S:s:T:t:x:Xru:g:z")) != EOF) {
+ while ((c = getopt(argc, argv,
"46ef:whEVI:DB:b:j:l:M:n:p:P:R:S:s:T:t:x:Xru:g:zL:J:")) != EOF) {
switch (c) {
case 'h':
usage(argv[0]);
@@ -992,6 +1001,19 @@
exit(255);
}
break;
+ case 'L':
+ // QQ: logging to file
+ use_filelog = 1;
+ logfile_path = strdup(optarg);
+ break;
+ case 'J':
+ // QQ: Juniper check sequence mode (Exporter +
DomainID + FlowsetID)
+ if( strcmp( optarg, "JuniperIPFIX" ) == 0 ) {
+ IPFIXimpl = Juniper;
+ } else if( strcmp( optarg, "RFC" ) == 0 ) {
+ IPFIXimpl = RFC;
+ }
+ break;
default:
usage(argv[0]);
exit(255);
diff -ruN nfdump-1.6.13.orig/nfdump-1.6.13/bin/util.c
nfdump-1.6.13/nfdump-1.6.13/bin/util.c
--- nfdump-1.6.13.orig/nfdump-1.6.13/bin/util.c 2015-01-22 15:39:25.000000000
+0300
+++ nfdump-1.6.13/nfdump-1.6.13/bin/util.c 2015-02-16 18:30:20.624118702
+0300
@@ -78,6 +78,9 @@
uint32_t twin_first, twin_last;
static int use_syslog = 0;
+int use_filelog = 0;
+
+char* logfile_path;
#ifdef sun
struct _code {
@@ -159,6 +162,32 @@
} // End of InitLog
+void PrintLogToFile( int log_lvl, char* msg ) {
+ FILE* fp = fopen(logfile_path, "a");
+ if(fp != NULL) {
+ char log_name[10] = "";
+ switch(log_lvl) {
+ case LOG_ERR:
+ strcpy(log_name, "ERROR");
+ break;
+ case LOG_INFO:
+ strcpy(log_name, "INFO ");
+ break;
+ default:
+ return;
+ }
+
+ char strdate[128];
+ struct tm curtm;
+ time_t curtime = time(NULL);
+ localtime_r(&curtime, &curtm);
+ strftime(strdate, sizeof(strdate) - 1, "%F %T %z", &curtm);
+
+ fprintf(fp, "[%s] [%s]: %s\n", strdate, log_name, msg);
+ fclose(fp);
+ }
+}
+
/*
* some modules are needed for daemon code as well as normal stdio code
* therefore a generic LogError is defined, which maps in this case
@@ -168,17 +197,19 @@
va_list var_args;
char string[512];
+ va_start(var_args, format);
+ vsnprintf(string, 511, format, var_args);
+ va_end(var_args);
+
if ( use_syslog ) {
- va_start(var_args, format);
- vsnprintf(string, 511, format, var_args);
- va_end(var_args);
syslog(LOG_ERR, "%s", string);
dbg_printf("%s\n", string);
- } else {
- va_start(var_args, format);
- vsnprintf(string, 511, format, var_args);
+ }
+ if( use_filelog ) {
+ PrintLogToFile(LOG_ERR, string);
+ }
+ if( !use_syslog && !use_filelog ) {
fprintf(stderr, "%s\n", string);
- va_end(var_args);
}
} // End of LogError
@@ -187,17 +218,19 @@
va_list var_args;
char string[512];
+ va_start(var_args, format);
+ vsnprintf(string, 511, format, var_args);
+ va_end(var_args);
+
if ( use_syslog ) {
- va_start(var_args, format);
- vsnprintf(string, 511, format, var_args);
- va_end(var_args);
syslog(LOG_INFO, "%s", string);
dbg_printf("%s\n", string);
- } else {
- va_start(var_args, format);
- vsnprintf(string, 511, format, var_args);
+ }
+ if( use_filelog ) {
+ PrintLogToFile(LOG_INFO, string);
+ }
+ if( !use_syslog && !use_filelog ) {
fprintf(stderr, "%s\n", string);
- va_end(var_args);
}
} // End of LogInfo
diff -ruN nfdump-1.6.13.orig/nfdump-1.6.13/configure
nfdump-1.6.13/nfdump-1.6.13/configure
--- nfdump-1.6.13.orig/nfdump-1.6.13/configure 2015-01-22 15:39:25.000000000
+0300
+++ nfdump-1.6.13/nfdump-1.6.13/configure 2015-02-16 18:30:20.631118767
+0300
@@ -3018,7 +3018,7 @@
# Define the identity of the package.
PACKAGE='nfdump'
- VERSION='1.6.13'
+ VERSION='1.6.13 +patch_20150216 (Stack Soft JSC)'
cat >>confdefs.h <<_ACEOF
------------------------------------------------------------------------------
Dive into the World of Parallel Programming The Go Parallel Website, sponsored
by Intel and developed in partnership with Slashdot Media, is your hub for all
things parallel software development, from weekly thought leadership blogs to
news, videos, case studies, tutorials and more. Take a look and join the
conversation now. http://goparallel.sourceforge.net/
_______________________________________________
Nfdump-discuss mailing list
Nfdump-discuss@lists.sourceforge.net
https://lists.sourceforge.net/lists/listinfo/nfdump-discuss