Chris,
Yeah digging into the templates was another big win for me. For instance,
if you try to do a topN query on signature with the default template, you
end up with words like the and and as your top hits. Setting signature
to not_analyzed ensures the field isn't tokenized. Below is my template.
--Josh
Logstash settings:
output {
elasticsearch {
host => "10.0.0.1"
cluster => "mycluster"
index => "logstash-ossec-%{+YYYY.MM.dd}"
index_type => "ossec"
template_name => "template-ossec"
template => "/etc/logstash/elasticsearch_template.json"
template_overwrite => true
}
}
elasticsearch_template.json
{
"template":"logstash-ossec-*",
"settings":{
"index.analysis.analyzer.default.stopwords":"_none_",
"index.refresh_interval":"5s",
"index.analysis.analyzer.default.type":"standard"
},
"mappings":{
"ossec":{
"properties":{
"@fields.hostname":{
"type":"string",
"index":"not_analyzed"
},
"@fields.product":{
"type":"string",
"index":"not_analyzed"
},
"@message":{
"type":"string",
"index":"not_analyzed"
},
"@timestamp":{
"type":"date"
},
"@version":{
"type":"string",
"index":"not_analyzed"
},
"acct":{
"type":"string",
"index":"not_analyzed"
},
"ossec_group":{
"type":"string",
"index":"not_analyzed"
},
"ossec_server":{
"type":"string",
"index":"not_analyzed"
},
"raw_message":{
"type":"string",
"index":"analyzed"
},
"reporting_ip":{
"type":"string",
"index":"not_analyzed"
},
"reporting_source":{
"type":"string",
"index":"analyzed"
},
"rule_number":{
"type":"integer"
},
"severity":{
"type":"integer"
},
"signature":{
"type":"string",
"index":"not_analyzed"
},
"src_ip":{
"type":"string",
"index":"not_analyzed"
},
"geoip":{
"type" : "object",
"dynamic": true,
"path": "full",
"properties" : {
"location" : { "type" : "geo_point" }
}
}
},
"_all":{
"enabled":true
}
}
}
}
On Wed, Mar 19, 2014 at 10:54 AM, Chris H <[email protected]> wrote:
> Hi, Joshua.
>
> I'm using a very similar technique. Are you applying a mapping template,
> or using the default? I'm using the default automatic templates, because
> frankly I don't fully understand templates. What this means though is that
> my daily indexes are larger than the uncompressed alerts.log, between 2-4GB
> per day, and I'm rapidly running out of disk space. I gather than this can
> be optimised by enabling compression and excluding the _source and _all
> fields through the mapping template, but I'm not sure exactly how this
> works. Just wondered if you've come across the same problem.
>
> Thanks.
>
>
> On Saturday, March 8, 2014 10:02:35 PM UTC, Joshua Garnett wrote:
>>
>> All,
>>
>> I'll probably write a blog post on this, but I wanted to share some work
>> I've done today. http://vichargrave.com/ossec-log-management-with-
>> elasticsearch/ shows how to use OSSEC's syslog output to route messages
>> to Elasticsearch. The problem with this method is it uses UDP. Even when
>> sending packets to a local process UDP by definition is unreliable.
>> Garbage collections and other system events can cause packets to be lost.
>> I've found it tends to cap out at around 1,500 messages per minute.
>>
>> To address this issue I've put together a logstash config that will read
>> the alerts from /var/ossec/logs/alerts/alerts.log. On top of solving
>> the reliability issue, it also fixes issues with multi-lines being lost,
>> and adds geoip lookups for the src_ip. I tested it against approximately
>> 1GB of alerts (3M events).
>>
>> input {
>> file {
>> type => "ossec"
>> path => "/var/ossec/logs/alerts/alerts.log"
>> sincedb_path => "/opt/logstash/"
>> codec => multiline {
>> pattern => "^\*\*"
>> negate => true
>> what => "previous"
>> }
>> }
>> }
>>
>> filter {
>> if [type] == "ossec" {
>> # Parse the header of the alert
>> grok {
>> # Matches 2014 Mar 08 00:57:49 (some.server.com) 10.1.2.3->ossec
>> # (?m) fixes issues with multi-lines see https://logstash.jira.com/
>> browse/LOGSTASH-509
>> match => ["message", "(?m)\*\* Alert
>> %{DATA:timestamp_seconds}:%{SPACE}%{WORD}?%{SPACE}\-
>> %{DATA:ossec_group}\n%{YEAR} %{SYSLOGTIMESTAMP:syslog_timestamp}
>> \(%{DATA:reporting_host}\)
>> %{IP:reporting_ip}\-\>%{DATA:reporting_source}\nRule:
>> %{NONNEGINT:rule_number} \(level %{NONNEGINT:severity}\) \-\>
>> '%{DATA:signature}'\n%{GREEDYDATA:remaining_message}"]
>>
>> # Matches 2014 Mar 08 00:00:00 ossec-server01->/var/log/auth.log
>> match => ["message", "(?m)\*\* Alert
>> %{DATA:timestamp_seconds}:%{SPACE}%{WORD}?%{SPACE}\-
>> %{DATA:ossec_group}\n%{YEAR} %{SYSLOGTIMESTAMP:syslog_timestamp}
>> %{DATA:reporting_host}\-\>%{DATA:reporting_source}\nRule:
>> %{NONNEGINT:rule_number} \(level %{NONNEGINT:severity}\) \-\>
>> '%{DATA:signature}'\n%{GREEDYDATA:remaining_message}"]
>> }
>>
>> # Attempt to parse additional data from the alert
>> grok {
>> match => ["remaining_message", "(?m)(Src IP:
>> %{IP:src_ip}%{SPACE})?(Src Port: %{NONNEGINT:src_port}%{SPACE})?(Dst IP:
>> %{IP:dst_ip}%{SPACE})?(Dst Port: %{NONNEGINT:dst_port}%{SPACE})?(User:
>> %{USER:acct}%{SPACE})?%{GREEDYDATA:real_message}"]
>> }
>>
>> geoip {
>> source => "src_ip"
>> }
>>
>> mutate {
>> convert => [ "severity", "integer"]
>> replace => [ "@message", "%{real_message}" ]
>> replace => [ "@fields.hostname", "%{reporting_host}"]
>> add_field => [ "@fields.product", "ossec"]
>> add_field => [ "raw_message", "%{message}"]
>> add_field => [ "ossec_server", "%{host}"]
>> remove_field => [ "type", "syslog_program", "syslog_timestamp",
>> "reporting_host", "message", "timestamp_seconds", "real_message",
>> "remaining_message", "path", "host", "tags"]
>> }
>> }
>> }
>>
>> output {
>> elasticsearch {
>> host => "10.0.0.1"
>> cluster => "mycluster"
>> }
>> }
>>
>> Here are a few examples of the output this generates.
>>
>> {
>> "@timestamp":"2014-03-08T20:34:08.847Z",
>> "@version":"1",
>> "ossec_group":"syslog,sshd,invalid_login,authentication_failed,",
>> "reporting_ip":"10.1.2.3",
>> "reporting_source":"/var/log/auth.log",
>> "rule_number":"5710",
>> "severity":5,
>> "signature":"Attempt to login using a non-existent user",
>> "src_ip":"112.65.211.164",
>> "geoip":{
>> "ip":"112.65.211.164",
>> "country_code2":"CN",
>> "country_code3":"CHN",
>> "country_name":"China",
>> "continent_code":"AS",
>> "region_name":"23",
>> "city_name":"Shanghai",
>> "latitude":31.045600000000007,
>> "longitude":121.3997,
>> "timezone":"Asia/Shanghai",
>> "real_region_name":"Shanghai",
>> "location":[
>> 121.3997,
>> 31.045600000000007
>> ]
>> },
>> "@message":"Mar 8 01:00:59 someserver sshd[22874]: Invalid user
>> oracle from 112.65.211.164\n",
>> "@fields.hostname":"someserver.somedomain.com",
>> "@fields.product":"ossec",
>> "raw_message":"** Alert 1394240459.2305861: -
>> syslog,sshd,invalid_login,authentication_failed,\n2014 Mar 08 01:00:59 (
>> someserver.somedomain.com) 10.1.2.3->/var/log/auth.log\nRule: 5710
>> (level 5) -> 'Attempt to login using a non-existent user'\nSrc IP:
>> 112.65.211.164\nMar 8 01:00:59 someserver sshd[22874]: Invalid user oracle
>> from 112.65.211.164\n",
>> "ossec_server":"ossec-server.somedomain.com"
>> }
>>
>> and
>>
>> {
>> "@timestamp":"2014-03-08T21:15:23.278Z",
>> "@version":"1",
>> "ossec_group":"syslog,sudo",
>> "reporting_source":"/var/log/auth.log",
>> "rule_number":"5402",
>> "severity":3,
>> "signature":"Successful sudo to ROOT executed",
>> "acct":"nagios",
>> "@message":"Mar 8 00:00:03 ossec-server sudo: nagios : TTY=unknown
>> ; PWD=/ ; USER=root ; COMMAND=/usr/lib/some/command",
>> "@fields.hostname":"ossec-server",
>> "@fields.product":"ossec",
>> "raw_message":"** Alert 1394236804.1451: - syslog,sudo\n2014 Mar 08
>> 00:00:04 ossec-server->/var/log/auth.log\nRule: 5402 (level 3) ->
>> 'Successful sudo to ROOT executed'\nUser: nagios\nMar 8 00:00:03
>> ossec-server sudo: nagios : TTY=unknown ; PWD=/ ; USER=root ;
>> COMMAND=/usr/lib/some/command",
>> "ossec_server":"ossec-server.somedomain.com"
>> }
>>
>> If you combine the above with a custom Elasticsearch template, you can
>> put together some really nice Kibana dashboards.
>>
>>
>> --Josh
>>
>>
>> --
>
> ---
> You received this message because you are subscribed to the Google Groups
> "ossec-list" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to [email protected].
> For more options, visit https://groups.google.com/d/optout.
>
--
---
You received this message because you are subscribed to the Google Groups
"ossec-list" group.
To unsubscribe from this group and stop receiving emails from it, send an email
to [email protected].
For more options, visit https://groups.google.com/d/optout.