All,
I'll probably write a blog post on this, but I wanted to share some work
I've done today.
http://vichargrave.com/ossec-log-management-with-elasticsearch/ shows how
to use OSSEC's syslog output to route messages to Elasticsearch. The
problem with this method is it uses UDP. Even when sending packets to a
local process UDP by definition is unreliable. Garbage collections and
other system events can cause packets to be lost. I've found it tends to
cap out at around 1,500 messages per minute.
To address this issue I've put together a logstash config that will read
the alerts from /var/ossec/logs/alerts/alerts.log. On top of solving the
reliability issue, it also fixes issues with multi-lines being lost, and
adds geoip lookups for the src_ip. I tested it against approximately 1GB
of alerts (3M events).
input {
file {
type => "ossec"
path => "/var/ossec/logs/alerts/alerts.log"
sincedb_path => "/opt/logstash/"
codec => multiline {
pattern => "^\*\*"
negate => true
what => "previous"
}
}
}
filter {
if [type] == "ossec" {
# Parse the header of the alert
grok {
# Matches 2014 Mar 08 00:57:49 (some.server.com) 10.1.2.3->ossec
# (?m) fixes issues with multi-lines see
https://logstash.jira.com/browse/LOGSTASH-509
match => ["message", "(?m)\*\* Alert
%{DATA:timestamp_seconds}:%{SPACE}%{WORD}?%{SPACE}\-
%{DATA:ossec_group}\n%{YEAR} %{SYSLOGTIMESTAMP:syslog_timestamp}
\(%{DATA:reporting_host}\)
%{IP:reporting_ip}\-\>%{DATA:reporting_source}\nRule:
%{NONNEGINT:rule_number} \(level %{NONNEGINT:severity}\) \-\>
'%{DATA:signature}'\n%{GREEDYDATA:remaining_message}"]
# Matches 2014 Mar 08 00:00:00 ossec-server01->/var/log/auth.log
match => ["message", "(?m)\*\* Alert
%{DATA:timestamp_seconds}:%{SPACE}%{WORD}?%{SPACE}\-
%{DATA:ossec_group}\n%{YEAR} %{SYSLOGTIMESTAMP:syslog_timestamp}
%{DATA:reporting_host}\-\>%{DATA:reporting_source}\nRule:
%{NONNEGINT:rule_number} \(level %{NONNEGINT:severity}\) \-\>
'%{DATA:signature}'\n%{GREEDYDATA:remaining_message}"]
}
# Attempt to parse additional data from the alert
grok {
match => ["remaining_message", "(?m)(Src IP:
%{IP:src_ip}%{SPACE})?(Src Port: %{NONNEGINT:src_port}%{SPACE})?(Dst IP:
%{IP:dst_ip}%{SPACE})?(Dst Port: %{NONNEGINT:dst_port}%{SPACE})?(User:
%{USER:acct}%{SPACE})?%{GREEDYDATA:real_message}"]
}
geoip {
source => "src_ip"
}
mutate {
convert => [ "severity", "integer"]
replace => [ "@message", "%{real_message}" ]
replace => [ "@fields.hostname", "%{reporting_host}"]
add_field => [ "@fields.product", "ossec"]
add_field => [ "raw_message", "%{message}"]
add_field => [ "ossec_server", "%{host}"]
remove_field => [ "type", "syslog_program", "syslog_timestamp",
"reporting_host", "message", "timestamp_seconds", "real_message",
"remaining_message", "path", "host", "tags"]
}
}
}
output {
elasticsearch {
host => "10.0.0.1"
cluster => "mycluster"
}
}
Here are a few examples of the output this generates.
{
"@timestamp":"2014-03-08T20:34:08.847Z",
"@version":"1",
"ossec_group":"syslog,sshd,invalid_login,authentication_failed,",
"reporting_ip":"10.1.2.3",
"reporting_source":"/var/log/auth.log",
"rule_number":"5710",
"severity":5,
"signature":"Attempt to login using a non-existent user",
"src_ip":"112.65.211.164",
"geoip":{
"ip":"112.65.211.164",
"country_code2":"CN",
"country_code3":"CHN",
"country_name":"China",
"continent_code":"AS",
"region_name":"23",
"city_name":"Shanghai",
"latitude":31.045600000000007,
"longitude":121.3997,
"timezone":"Asia/Shanghai",
"real_region_name":"Shanghai",
"location":[
121.3997,
31.045600000000007
]
},
"@message":"Mar 8 01:00:59 someserver sshd[22874]: Invalid user oracle
from 112.65.211.164\n",
"@fields.hostname":"someserver.somedomain.com",
"@fields.product":"ossec",
"raw_message":"** Alert 1394240459.2305861: -
syslog,sshd,invalid_login,authentication_failed,\n2014 Mar 08 01:00:59 (
someserver.somedomain.com) 10.1.2.3->/var/log/auth.log\nRule: 5710 (level
5) -> 'Attempt to login using a non-existent user'\nSrc IP:
112.65.211.164\nMar 8 01:00:59 someserver sshd[22874]: Invalid user oracle
from 112.65.211.164\n",
"ossec_server":"ossec-server.somedomain.com"
}
and
{
"@timestamp":"2014-03-08T21:15:23.278Z",
"@version":"1",
"ossec_group":"syslog,sudo",
"reporting_source":"/var/log/auth.log",
"rule_number":"5402",
"severity":3,
"signature":"Successful sudo to ROOT executed",
"acct":"nagios",
"@message":"Mar 8 00:00:03 ossec-server sudo: nagios : TTY=unknown ;
PWD=/ ; USER=root ; COMMAND=/usr/lib/some/command",
"@fields.hostname":"ossec-server",
"@fields.product":"ossec",
"raw_message":"** Alert 1394236804.1451: - syslog,sudo\n2014 Mar 08
00:00:04 ossec-server->/var/log/auth.log\nRule: 5402 (level 3) ->
'Successful sudo to ROOT executed'\nUser: nagios\nMar 8 00:00:03
ossec-server sudo: nagios : TTY=unknown ; PWD=/ ; USER=root ;
COMMAND=/usr/lib/some/command",
"ossec_server":"ossec-server.somedomain.com"
}
If you combine the above with a custom Elasticsearch template, you can put
together some really nice Kibana dashboards.
--Josh
--
---
You received this message because you are subscribed to the Google Groups
"ossec-list" group.
To unsubscribe from this group and stop receiving emails from it, send an email
to [email protected].
For more options, visit https://groups.google.com/d/optout.