[jira] [Commented] (METRON-1584) Indexing Topology Crashes with Invalid Message

2018-06-06 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/METRON-1584?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16503421#comment-16503421
 ] 

ASF GitHub Bot commented on METRON-1584:


Github user asfgit closed the pull request at:

https://github.com/apache/metron/pull/1036


> Indexing Topology Crashes with Invalid Message
> --
>
> Key: METRON-1584
> URL: https://issues.apache.org/jira/browse/METRON-1584
> Project: Metron
>  Issue Type: Bug
>Reporter: Nick Allen
>Assignee: Nick Allen
>Priority: Major
>
> Per Mohan Venkateshaiah:
> I published message "adkadknalkda;LK;ad;Da;dD;" to indexing topic , I see 
> that the random access indexing topology worker thread died and couldn't 
> recover until the kafka topic was deleted and recreated.
> {code:java}
> Caused by: java.lang.IllegalStateException: Unable to parse 
> adkadknalkda;LK;ad;Da;dD; due to null
>  at 
> org.apache.metron.common.message.JSONFromPosition.get(JSONFromPosition.java:49)
>  ~[stormjar.jar:?]
>  at 
> org.apache.metron.common.message.JSONFromPosition.get(JSONFromPosition.java:25)
>  ~[stormjar.jar:?]
>  at 
> org.apache.metron.writer.bolt.BulkMessageWriterBolt.execute(BulkMessageWriterBolt.java:237)
>  ~[stormjar.jar:?]
>  at 
> org.apache.storm.daemon.executor$fn__10195$tuple_action_fn__10197.invoke(executor.clj:735)
>  ~[storm-core-1.1.0.2.6.5.0-292.jar:1.1.0.2.6.5.0-292]
>  at 
> org.apache.storm.daemon.executor$mk_task_receiver$fn__10114.invoke(executor.clj:466)
>  ~[storm-core-1.1.0.2.6.5.0-292.jar:1.1.0.2.6.5.0-292]
>  at 
> org.apache.storm.disruptor$clojure_handler$reify__4137.onEvent(disruptor.clj:40)
>  ~[storm-core-1.1.0.2.6.5.0-292.jar:1.1.0.2.6.5.0-292]
>  at 
> org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:472)
>  ~[storm-core-1.1.0.2.6.5.0-292.jar:1.1.0.2.6.5.0-292]
>  ... 6 more
> Caused by: org.json.simple.parser.ParseException
>  at org.json.simple.parser.Yylex.yylex(Yylex.java:610) ~[stormjar.jar:?]
>  at org.json.simple.parser.JSONParser.nextToken(JSONParser.java:269) 
> ~[stormjar.jar:?]
>  at org.json.simple.parser.JSONParser.parse(JSONParser.java:118) 
> ~[stormjar.jar:?]
>  at org.json.simple.parser.JSONParser.parse(JSONParser.java:81) 
> ~[stormjar.jar:?]
>  at org.json.simple.parser.JSONParser.parse(JSONParser.java:75) 
> ~[stormjar.jar:?]
>  at 
> org.apache.metron.common.message.JSONFromPosition.get(JSONFromPosition.java:47)
>  ~[stormjar.jar:?]
>  at 
> org.apache.metron.common.message.JSONFromPosition.get(JSONFromPosition.java:25)
>  ~[stormjar.jar:?]
>  at 
> org.apache.metron.writer.bolt.BulkMessageWriterBolt.execute(BulkMessageWriterBolt.java:237)
>  ~[stormjar.jar:?]
>  at 
> org.apache.storm.daemon.executor$fn__10195$tuple_action_fn__10197.invoke(executor.clj:735)
>  ~[storm-core-1.1.0.2.6.5.0-292.jar:1.1.0.2.6.5.0-292]
>  at 
> org.apache.storm.daemon.executor$mk_task_receiver$fn__10114.invoke(executor.clj:466)
>  ~[storm-core-1.1.0.2.6.5.0-292.jar:1.1.0.2.6.5.0-292]
>  at 
> org.apache.storm.disruptor$clojure_handler$reify__4137.onEvent(disruptor.clj:40)
>  ~[storm-core-1.1.0.2.6.5.0-292.jar:1.1.0.2.6.5.0-292]
>  at 
> org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:472)
>  ~[storm-core-1.1.0.2.6.5.0-292.jar:1.1.0.2.6.5.0-292]
>  ... 6 more
> 2018-05-24 09:21:22.236 o.a.s.util Thread-9-indexingBolt-executor[3 3] 
> [ERROR] Halting process: ("Worker died")
> java.lang.RuntimeException: ("Worker died")
>  at org.apache.storm.util$exit_process_BANG_.doInvoke(util.clj:341) 
> [storm-core-1.1.0.2.6.5.0-292.jar:1.1.0.2.6.5.0-292]
>  at clojure.lang.RestFn.invoke(RestFn.java:423) [clojure-1.7.0.jar:?]
>  at org.apache.storm.daemon.worker$fn__10799$fn__10800.invoke(worker.clj:763) 
> [storm-core-1.1.0.2.6.5.0-292.jar:1.1.0.2.6.5.0-292]
>  at 
> org.apache.storm.daemon.executor$mk_executor_data$fn__10011$fn__10012.invoke(executor.clj:276)
>  [storm-core-1.1.0.2.6.5.0-292.jar:1.1.0.2.6.5.0-292]
>  at org.apache.storm.util$async_loop$fn__1221.invoke(util.clj:494) 
> [storm-core-1.1.0.2.6.5.0-292.jar:1.1.0.2.6.5.0-292]
>  at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
>  at java.lang.Thread.run(Thread.java:748) [?:1.8.0_161]
> 2018-05-24 09:21:22.237 o.a.s.d.worker Thread-20 [INFO] Shutting down worker 
> random_access_indexing-24-1527147389 703b5bf7-6c9d-46f3-8136-0c4877a69375 6700
> 2018-05-24 09:21:22.237 o.a.s.d.worker Thread-20 [INFO] Terminating messaging 
> context
> 2018-05-24 09:21:22.238 o.a.s.d.worker Thread-20 [INFO] Shutting down 
> executors
> 2018-05-24 09:21:22.238 o.a.s.d.executor Thread-20 [INFO] Shutting down 
> executor 
> __metricsorg.apache.hadoop.metrics2.sink.storm.StormTimelineMetricsSink:[2 2]
> 2018-05-24 09:21:22.239 o.a.s.util Thread-6-disruptor-executor[2 
> 2]-send-queue [INFO] Async loop interrupted!
> 2018-05-24 

[jira] [Commented] (METRON-1584) Indexing Topology Crashes with Invalid Message

2018-06-06 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/METRON-1584?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16503355#comment-16503355
 ] 

ASF GitHub Bot commented on METRON-1584:


Github user mmiklavc commented on the issue:

https://github.com/apache/metron/pull/1036
  
+1


> Indexing Topology Crashes with Invalid Message
> --
>
> Key: METRON-1584
> URL: https://issues.apache.org/jira/browse/METRON-1584
> Project: Metron
>  Issue Type: Bug
>Reporter: Nick Allen
>Assignee: Nick Allen
>Priority: Major
>
> Per Mohan Venkateshaiah:
> I published message "adkadknalkda;LK;ad;Da;dD;" to indexing topic , I see 
> that the random access indexing topology worker thread died and couldn't 
> recover until the kafka topic was deleted and recreated.
> {code:java}
> Caused by: java.lang.IllegalStateException: Unable to parse 
> adkadknalkda;LK;ad;Da;dD; due to null
>  at 
> org.apache.metron.common.message.JSONFromPosition.get(JSONFromPosition.java:49)
>  ~[stormjar.jar:?]
>  at 
> org.apache.metron.common.message.JSONFromPosition.get(JSONFromPosition.java:25)
>  ~[stormjar.jar:?]
>  at 
> org.apache.metron.writer.bolt.BulkMessageWriterBolt.execute(BulkMessageWriterBolt.java:237)
>  ~[stormjar.jar:?]
>  at 
> org.apache.storm.daemon.executor$fn__10195$tuple_action_fn__10197.invoke(executor.clj:735)
>  ~[storm-core-1.1.0.2.6.5.0-292.jar:1.1.0.2.6.5.0-292]
>  at 
> org.apache.storm.daemon.executor$mk_task_receiver$fn__10114.invoke(executor.clj:466)
>  ~[storm-core-1.1.0.2.6.5.0-292.jar:1.1.0.2.6.5.0-292]
>  at 
> org.apache.storm.disruptor$clojure_handler$reify__4137.onEvent(disruptor.clj:40)
>  ~[storm-core-1.1.0.2.6.5.0-292.jar:1.1.0.2.6.5.0-292]
>  at 
> org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:472)
>  ~[storm-core-1.1.0.2.6.5.0-292.jar:1.1.0.2.6.5.0-292]
>  ... 6 more
> Caused by: org.json.simple.parser.ParseException
>  at org.json.simple.parser.Yylex.yylex(Yylex.java:610) ~[stormjar.jar:?]
>  at org.json.simple.parser.JSONParser.nextToken(JSONParser.java:269) 
> ~[stormjar.jar:?]
>  at org.json.simple.parser.JSONParser.parse(JSONParser.java:118) 
> ~[stormjar.jar:?]
>  at org.json.simple.parser.JSONParser.parse(JSONParser.java:81) 
> ~[stormjar.jar:?]
>  at org.json.simple.parser.JSONParser.parse(JSONParser.java:75) 
> ~[stormjar.jar:?]
>  at 
> org.apache.metron.common.message.JSONFromPosition.get(JSONFromPosition.java:47)
>  ~[stormjar.jar:?]
>  at 
> org.apache.metron.common.message.JSONFromPosition.get(JSONFromPosition.java:25)
>  ~[stormjar.jar:?]
>  at 
> org.apache.metron.writer.bolt.BulkMessageWriterBolt.execute(BulkMessageWriterBolt.java:237)
>  ~[stormjar.jar:?]
>  at 
> org.apache.storm.daemon.executor$fn__10195$tuple_action_fn__10197.invoke(executor.clj:735)
>  ~[storm-core-1.1.0.2.6.5.0-292.jar:1.1.0.2.6.5.0-292]
>  at 
> org.apache.storm.daemon.executor$mk_task_receiver$fn__10114.invoke(executor.clj:466)
>  ~[storm-core-1.1.0.2.6.5.0-292.jar:1.1.0.2.6.5.0-292]
>  at 
> org.apache.storm.disruptor$clojure_handler$reify__4137.onEvent(disruptor.clj:40)
>  ~[storm-core-1.1.0.2.6.5.0-292.jar:1.1.0.2.6.5.0-292]
>  at 
> org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:472)
>  ~[storm-core-1.1.0.2.6.5.0-292.jar:1.1.0.2.6.5.0-292]
>  ... 6 more
> 2018-05-24 09:21:22.236 o.a.s.util Thread-9-indexingBolt-executor[3 3] 
> [ERROR] Halting process: ("Worker died")
> java.lang.RuntimeException: ("Worker died")
>  at org.apache.storm.util$exit_process_BANG_.doInvoke(util.clj:341) 
> [storm-core-1.1.0.2.6.5.0-292.jar:1.1.0.2.6.5.0-292]
>  at clojure.lang.RestFn.invoke(RestFn.java:423) [clojure-1.7.0.jar:?]
>  at org.apache.storm.daemon.worker$fn__10799$fn__10800.invoke(worker.clj:763) 
> [storm-core-1.1.0.2.6.5.0-292.jar:1.1.0.2.6.5.0-292]
>  at 
> org.apache.storm.daemon.executor$mk_executor_data$fn__10011$fn__10012.invoke(executor.clj:276)
>  [storm-core-1.1.0.2.6.5.0-292.jar:1.1.0.2.6.5.0-292]
>  at org.apache.storm.util$async_loop$fn__1221.invoke(util.clj:494) 
> [storm-core-1.1.0.2.6.5.0-292.jar:1.1.0.2.6.5.0-292]
>  at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
>  at java.lang.Thread.run(Thread.java:748) [?:1.8.0_161]
> 2018-05-24 09:21:22.237 o.a.s.d.worker Thread-20 [INFO] Shutting down worker 
> random_access_indexing-24-1527147389 703b5bf7-6c9d-46f3-8136-0c4877a69375 6700
> 2018-05-24 09:21:22.237 o.a.s.d.worker Thread-20 [INFO] Terminating messaging 
> context
> 2018-05-24 09:21:22.238 o.a.s.d.worker Thread-20 [INFO] Shutting down 
> executors
> 2018-05-24 09:21:22.238 o.a.s.d.executor Thread-20 [INFO] Shutting down 
> executor 
> __metricsorg.apache.hadoop.metrics2.sink.storm.StormTimelineMetricsSink:[2 2]
> 2018-05-24 09:21:22.239 o.a.s.util Thread-6-disruptor-executor[2 
> 2]-send-queue [INFO] Async loop interrupted!
> 

[jira] [Commented] (METRON-1584) Indexing Topology Crashes with Invalid Message

2018-06-05 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/METRON-1584?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16502159#comment-16502159
 ] 

ASF GitHub Bot commented on METRON-1584:


Github user nickwallen commented on the issue:

https://github.com/apache/metron/pull/1036
  
@mmiklavc Are you good with this?


> Indexing Topology Crashes with Invalid Message
> --
>
> Key: METRON-1584
> URL: https://issues.apache.org/jira/browse/METRON-1584
> Project: Metron
>  Issue Type: Bug
>Reporter: Nick Allen
>Assignee: Nick Allen
>Priority: Major
>
> Per Mohan Venkateshaiah:
> I published message "adkadknalkda;LK;ad;Da;dD;" to indexing topic , I see 
> that the random access indexing topology worker thread died and couldn't 
> recover until the kafka topic was deleted and recreated.
> {code:java}
> Caused by: java.lang.IllegalStateException: Unable to parse 
> adkadknalkda;LK;ad;Da;dD; due to null
>  at 
> org.apache.metron.common.message.JSONFromPosition.get(JSONFromPosition.java:49)
>  ~[stormjar.jar:?]
>  at 
> org.apache.metron.common.message.JSONFromPosition.get(JSONFromPosition.java:25)
>  ~[stormjar.jar:?]
>  at 
> org.apache.metron.writer.bolt.BulkMessageWriterBolt.execute(BulkMessageWriterBolt.java:237)
>  ~[stormjar.jar:?]
>  at 
> org.apache.storm.daemon.executor$fn__10195$tuple_action_fn__10197.invoke(executor.clj:735)
>  ~[storm-core-1.1.0.2.6.5.0-292.jar:1.1.0.2.6.5.0-292]
>  at 
> org.apache.storm.daemon.executor$mk_task_receiver$fn__10114.invoke(executor.clj:466)
>  ~[storm-core-1.1.0.2.6.5.0-292.jar:1.1.0.2.6.5.0-292]
>  at 
> org.apache.storm.disruptor$clojure_handler$reify__4137.onEvent(disruptor.clj:40)
>  ~[storm-core-1.1.0.2.6.5.0-292.jar:1.1.0.2.6.5.0-292]
>  at 
> org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:472)
>  ~[storm-core-1.1.0.2.6.5.0-292.jar:1.1.0.2.6.5.0-292]
>  ... 6 more
> Caused by: org.json.simple.parser.ParseException
>  at org.json.simple.parser.Yylex.yylex(Yylex.java:610) ~[stormjar.jar:?]
>  at org.json.simple.parser.JSONParser.nextToken(JSONParser.java:269) 
> ~[stormjar.jar:?]
>  at org.json.simple.parser.JSONParser.parse(JSONParser.java:118) 
> ~[stormjar.jar:?]
>  at org.json.simple.parser.JSONParser.parse(JSONParser.java:81) 
> ~[stormjar.jar:?]
>  at org.json.simple.parser.JSONParser.parse(JSONParser.java:75) 
> ~[stormjar.jar:?]
>  at 
> org.apache.metron.common.message.JSONFromPosition.get(JSONFromPosition.java:47)
>  ~[stormjar.jar:?]
>  at 
> org.apache.metron.common.message.JSONFromPosition.get(JSONFromPosition.java:25)
>  ~[stormjar.jar:?]
>  at 
> org.apache.metron.writer.bolt.BulkMessageWriterBolt.execute(BulkMessageWriterBolt.java:237)
>  ~[stormjar.jar:?]
>  at 
> org.apache.storm.daemon.executor$fn__10195$tuple_action_fn__10197.invoke(executor.clj:735)
>  ~[storm-core-1.1.0.2.6.5.0-292.jar:1.1.0.2.6.5.0-292]
>  at 
> org.apache.storm.daemon.executor$mk_task_receiver$fn__10114.invoke(executor.clj:466)
>  ~[storm-core-1.1.0.2.6.5.0-292.jar:1.1.0.2.6.5.0-292]
>  at 
> org.apache.storm.disruptor$clojure_handler$reify__4137.onEvent(disruptor.clj:40)
>  ~[storm-core-1.1.0.2.6.5.0-292.jar:1.1.0.2.6.5.0-292]
>  at 
> org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:472)
>  ~[storm-core-1.1.0.2.6.5.0-292.jar:1.1.0.2.6.5.0-292]
>  ... 6 more
> 2018-05-24 09:21:22.236 o.a.s.util Thread-9-indexingBolt-executor[3 3] 
> [ERROR] Halting process: ("Worker died")
> java.lang.RuntimeException: ("Worker died")
>  at org.apache.storm.util$exit_process_BANG_.doInvoke(util.clj:341) 
> [storm-core-1.1.0.2.6.5.0-292.jar:1.1.0.2.6.5.0-292]
>  at clojure.lang.RestFn.invoke(RestFn.java:423) [clojure-1.7.0.jar:?]
>  at org.apache.storm.daemon.worker$fn__10799$fn__10800.invoke(worker.clj:763) 
> [storm-core-1.1.0.2.6.5.0-292.jar:1.1.0.2.6.5.0-292]
>  at 
> org.apache.storm.daemon.executor$mk_executor_data$fn__10011$fn__10012.invoke(executor.clj:276)
>  [storm-core-1.1.0.2.6.5.0-292.jar:1.1.0.2.6.5.0-292]
>  at org.apache.storm.util$async_loop$fn__1221.invoke(util.clj:494) 
> [storm-core-1.1.0.2.6.5.0-292.jar:1.1.0.2.6.5.0-292]
>  at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
>  at java.lang.Thread.run(Thread.java:748) [?:1.8.0_161]
> 2018-05-24 09:21:22.237 o.a.s.d.worker Thread-20 [INFO] Shutting down worker 
> random_access_indexing-24-1527147389 703b5bf7-6c9d-46f3-8136-0c4877a69375 6700
> 2018-05-24 09:21:22.237 o.a.s.d.worker Thread-20 [INFO] Terminating messaging 
> context
> 2018-05-24 09:21:22.238 o.a.s.d.worker Thread-20 [INFO] Shutting down 
> executors
> 2018-05-24 09:21:22.238 o.a.s.d.executor Thread-20 [INFO] Shutting down 
> executor 
> __metricsorg.apache.hadoop.metrics2.sink.storm.StormTimelineMetricsSink:[2 2]
> 2018-05-24 09:21:22.239 o.a.s.util Thread-6-disruptor-executor[2 
> 2]-send-queue 

[jira] [Commented] (METRON-1584) Indexing Topology Crashes with Invalid Message

2018-06-05 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/METRON-1584?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16501836#comment-16501836
 ] 

ASF GitHub Bot commented on METRON-1584:


Github user cestella commented on the issue:

https://github.com/apache/metron/pull/1036
  
+1 that looks great


> Indexing Topology Crashes with Invalid Message
> --
>
> Key: METRON-1584
> URL: https://issues.apache.org/jira/browse/METRON-1584
> Project: Metron
>  Issue Type: Bug
>Reporter: Nick Allen
>Assignee: Nick Allen
>Priority: Major
>
> Per Mohan Venkateshaiah:
> I published message "adkadknalkda;LK;ad;Da;dD;" to indexing topic , I see 
> that the random access indexing topology worker thread died and couldn't 
> recover until the kafka topic was deleted and recreated.
> {code:java}
> Caused by: java.lang.IllegalStateException: Unable to parse 
> adkadknalkda;LK;ad;Da;dD; due to null
>  at 
> org.apache.metron.common.message.JSONFromPosition.get(JSONFromPosition.java:49)
>  ~[stormjar.jar:?]
>  at 
> org.apache.metron.common.message.JSONFromPosition.get(JSONFromPosition.java:25)
>  ~[stormjar.jar:?]
>  at 
> org.apache.metron.writer.bolt.BulkMessageWriterBolt.execute(BulkMessageWriterBolt.java:237)
>  ~[stormjar.jar:?]
>  at 
> org.apache.storm.daemon.executor$fn__10195$tuple_action_fn__10197.invoke(executor.clj:735)
>  ~[storm-core-1.1.0.2.6.5.0-292.jar:1.1.0.2.6.5.0-292]
>  at 
> org.apache.storm.daemon.executor$mk_task_receiver$fn__10114.invoke(executor.clj:466)
>  ~[storm-core-1.1.0.2.6.5.0-292.jar:1.1.0.2.6.5.0-292]
>  at 
> org.apache.storm.disruptor$clojure_handler$reify__4137.onEvent(disruptor.clj:40)
>  ~[storm-core-1.1.0.2.6.5.0-292.jar:1.1.0.2.6.5.0-292]
>  at 
> org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:472)
>  ~[storm-core-1.1.0.2.6.5.0-292.jar:1.1.0.2.6.5.0-292]
>  ... 6 more
> Caused by: org.json.simple.parser.ParseException
>  at org.json.simple.parser.Yylex.yylex(Yylex.java:610) ~[stormjar.jar:?]
>  at org.json.simple.parser.JSONParser.nextToken(JSONParser.java:269) 
> ~[stormjar.jar:?]
>  at org.json.simple.parser.JSONParser.parse(JSONParser.java:118) 
> ~[stormjar.jar:?]
>  at org.json.simple.parser.JSONParser.parse(JSONParser.java:81) 
> ~[stormjar.jar:?]
>  at org.json.simple.parser.JSONParser.parse(JSONParser.java:75) 
> ~[stormjar.jar:?]
>  at 
> org.apache.metron.common.message.JSONFromPosition.get(JSONFromPosition.java:47)
>  ~[stormjar.jar:?]
>  at 
> org.apache.metron.common.message.JSONFromPosition.get(JSONFromPosition.java:25)
>  ~[stormjar.jar:?]
>  at 
> org.apache.metron.writer.bolt.BulkMessageWriterBolt.execute(BulkMessageWriterBolt.java:237)
>  ~[stormjar.jar:?]
>  at 
> org.apache.storm.daemon.executor$fn__10195$tuple_action_fn__10197.invoke(executor.clj:735)
>  ~[storm-core-1.1.0.2.6.5.0-292.jar:1.1.0.2.6.5.0-292]
>  at 
> org.apache.storm.daemon.executor$mk_task_receiver$fn__10114.invoke(executor.clj:466)
>  ~[storm-core-1.1.0.2.6.5.0-292.jar:1.1.0.2.6.5.0-292]
>  at 
> org.apache.storm.disruptor$clojure_handler$reify__4137.onEvent(disruptor.clj:40)
>  ~[storm-core-1.1.0.2.6.5.0-292.jar:1.1.0.2.6.5.0-292]
>  at 
> org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:472)
>  ~[storm-core-1.1.0.2.6.5.0-292.jar:1.1.0.2.6.5.0-292]
>  ... 6 more
> 2018-05-24 09:21:22.236 o.a.s.util Thread-9-indexingBolt-executor[3 3] 
> [ERROR] Halting process: ("Worker died")
> java.lang.RuntimeException: ("Worker died")
>  at org.apache.storm.util$exit_process_BANG_.doInvoke(util.clj:341) 
> [storm-core-1.1.0.2.6.5.0-292.jar:1.1.0.2.6.5.0-292]
>  at clojure.lang.RestFn.invoke(RestFn.java:423) [clojure-1.7.0.jar:?]
>  at org.apache.storm.daemon.worker$fn__10799$fn__10800.invoke(worker.clj:763) 
> [storm-core-1.1.0.2.6.5.0-292.jar:1.1.0.2.6.5.0-292]
>  at 
> org.apache.storm.daemon.executor$mk_executor_data$fn__10011$fn__10012.invoke(executor.clj:276)
>  [storm-core-1.1.0.2.6.5.0-292.jar:1.1.0.2.6.5.0-292]
>  at org.apache.storm.util$async_loop$fn__1221.invoke(util.clj:494) 
> [storm-core-1.1.0.2.6.5.0-292.jar:1.1.0.2.6.5.0-292]
>  at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
>  at java.lang.Thread.run(Thread.java:748) [?:1.8.0_161]
> 2018-05-24 09:21:22.237 o.a.s.d.worker Thread-20 [INFO] Shutting down worker 
> random_access_indexing-24-1527147389 703b5bf7-6c9d-46f3-8136-0c4877a69375 6700
> 2018-05-24 09:21:22.237 o.a.s.d.worker Thread-20 [INFO] Terminating messaging 
> context
> 2018-05-24 09:21:22.238 o.a.s.d.worker Thread-20 [INFO] Shutting down 
> executors
> 2018-05-24 09:21:22.238 o.a.s.d.executor Thread-20 [INFO] Shutting down 
> executor 
> __metricsorg.apache.hadoop.metrics2.sink.storm.StormTimelineMetricsSink:[2 2]
> 2018-05-24 09:21:22.239 o.a.s.util Thread-6-disruptor-executor[2 
> 2]-send-queue [INFO] Async loop 

[jira] [Commented] (METRON-1584) Indexing Topology Crashes with Invalid Message

2018-06-04 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/METRON-1584?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16500226#comment-16500226
 ] 

ASF GitHub Bot commented on METRON-1584:


Github user merrimanr commented on the issue:

https://github.com/apache/metron/pull/1036
  
I ran this up in full dev and everything worked as expected.  +1 pending 
approval from other reviewers.


> Indexing Topology Crashes with Invalid Message
> --
>
> Key: METRON-1584
> URL: https://issues.apache.org/jira/browse/METRON-1584
> Project: Metron
>  Issue Type: Bug
>Reporter: Nick Allen
>Assignee: Nick Allen
>Priority: Major
>
> Per Mohan Venkateshaiah:
> I published message "adkadknalkda;LK;ad;Da;dD;" to indexing topic , I see 
> that the random access indexing topology worker thread died and couldn't 
> recover until the kafka topic was deleted and recreated.
> {code:java}
> Caused by: java.lang.IllegalStateException: Unable to parse 
> adkadknalkda;LK;ad;Da;dD; due to null
>  at 
> org.apache.metron.common.message.JSONFromPosition.get(JSONFromPosition.java:49)
>  ~[stormjar.jar:?]
>  at 
> org.apache.metron.common.message.JSONFromPosition.get(JSONFromPosition.java:25)
>  ~[stormjar.jar:?]
>  at 
> org.apache.metron.writer.bolt.BulkMessageWriterBolt.execute(BulkMessageWriterBolt.java:237)
>  ~[stormjar.jar:?]
>  at 
> org.apache.storm.daemon.executor$fn__10195$tuple_action_fn__10197.invoke(executor.clj:735)
>  ~[storm-core-1.1.0.2.6.5.0-292.jar:1.1.0.2.6.5.0-292]
>  at 
> org.apache.storm.daemon.executor$mk_task_receiver$fn__10114.invoke(executor.clj:466)
>  ~[storm-core-1.1.0.2.6.5.0-292.jar:1.1.0.2.6.5.0-292]
>  at 
> org.apache.storm.disruptor$clojure_handler$reify__4137.onEvent(disruptor.clj:40)
>  ~[storm-core-1.1.0.2.6.5.0-292.jar:1.1.0.2.6.5.0-292]
>  at 
> org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:472)
>  ~[storm-core-1.1.0.2.6.5.0-292.jar:1.1.0.2.6.5.0-292]
>  ... 6 more
> Caused by: org.json.simple.parser.ParseException
>  at org.json.simple.parser.Yylex.yylex(Yylex.java:610) ~[stormjar.jar:?]
>  at org.json.simple.parser.JSONParser.nextToken(JSONParser.java:269) 
> ~[stormjar.jar:?]
>  at org.json.simple.parser.JSONParser.parse(JSONParser.java:118) 
> ~[stormjar.jar:?]
>  at org.json.simple.parser.JSONParser.parse(JSONParser.java:81) 
> ~[stormjar.jar:?]
>  at org.json.simple.parser.JSONParser.parse(JSONParser.java:75) 
> ~[stormjar.jar:?]
>  at 
> org.apache.metron.common.message.JSONFromPosition.get(JSONFromPosition.java:47)
>  ~[stormjar.jar:?]
>  at 
> org.apache.metron.common.message.JSONFromPosition.get(JSONFromPosition.java:25)
>  ~[stormjar.jar:?]
>  at 
> org.apache.metron.writer.bolt.BulkMessageWriterBolt.execute(BulkMessageWriterBolt.java:237)
>  ~[stormjar.jar:?]
>  at 
> org.apache.storm.daemon.executor$fn__10195$tuple_action_fn__10197.invoke(executor.clj:735)
>  ~[storm-core-1.1.0.2.6.5.0-292.jar:1.1.0.2.6.5.0-292]
>  at 
> org.apache.storm.daemon.executor$mk_task_receiver$fn__10114.invoke(executor.clj:466)
>  ~[storm-core-1.1.0.2.6.5.0-292.jar:1.1.0.2.6.5.0-292]
>  at 
> org.apache.storm.disruptor$clojure_handler$reify__4137.onEvent(disruptor.clj:40)
>  ~[storm-core-1.1.0.2.6.5.0-292.jar:1.1.0.2.6.5.0-292]
>  at 
> org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:472)
>  ~[storm-core-1.1.0.2.6.5.0-292.jar:1.1.0.2.6.5.0-292]
>  ... 6 more
> 2018-05-24 09:21:22.236 o.a.s.util Thread-9-indexingBolt-executor[3 3] 
> [ERROR] Halting process: ("Worker died")
> java.lang.RuntimeException: ("Worker died")
>  at org.apache.storm.util$exit_process_BANG_.doInvoke(util.clj:341) 
> [storm-core-1.1.0.2.6.5.0-292.jar:1.1.0.2.6.5.0-292]
>  at clojure.lang.RestFn.invoke(RestFn.java:423) [clojure-1.7.0.jar:?]
>  at org.apache.storm.daemon.worker$fn__10799$fn__10800.invoke(worker.clj:763) 
> [storm-core-1.1.0.2.6.5.0-292.jar:1.1.0.2.6.5.0-292]
>  at 
> org.apache.storm.daemon.executor$mk_executor_data$fn__10011$fn__10012.invoke(executor.clj:276)
>  [storm-core-1.1.0.2.6.5.0-292.jar:1.1.0.2.6.5.0-292]
>  at org.apache.storm.util$async_loop$fn__1221.invoke(util.clj:494) 
> [storm-core-1.1.0.2.6.5.0-292.jar:1.1.0.2.6.5.0-292]
>  at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
>  at java.lang.Thread.run(Thread.java:748) [?:1.8.0_161]
> 2018-05-24 09:21:22.237 o.a.s.d.worker Thread-20 [INFO] Shutting down worker 
> random_access_indexing-24-1527147389 703b5bf7-6c9d-46f3-8136-0c4877a69375 6700
> 2018-05-24 09:21:22.237 o.a.s.d.worker Thread-20 [INFO] Terminating messaging 
> context
> 2018-05-24 09:21:22.238 o.a.s.d.worker Thread-20 [INFO] Shutting down 
> executors
> 2018-05-24 09:21:22.238 o.a.s.d.executor Thread-20 [INFO] Shutting down 
> executor 
> __metricsorg.apache.hadoop.metrics2.sink.storm.StormTimelineMetricsSink:[2 2]
> 2018-05-24 

[jira] [Commented] (METRON-1584) Indexing Topology Crashes with Invalid Message

2018-05-31 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/METRON-1584?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16497279#comment-16497279
 ] 

ASF GitHub Bot commented on METRON-1584:


Github user nickwallen commented on the issue:

https://github.com/apache/metron/pull/1036
  
Undid refactors.  This should include only what is needed.


> Indexing Topology Crashes with Invalid Message
> --
>
> Key: METRON-1584
> URL: https://issues.apache.org/jira/browse/METRON-1584
> Project: Metron
>  Issue Type: Bug
>Reporter: Nick Allen
>Assignee: Nick Allen
>Priority: Major
>
> Per Mohan Venkateshaiah:
> I published message "adkadknalkda;LK;ad;Da;dD;" to indexing topic , I see 
> that the random access indexing topology worker thread died and couldn't 
> recover until the kafka topic was deleted and recreated.
> {code:java}
> Caused by: java.lang.IllegalStateException: Unable to parse 
> adkadknalkda;LK;ad;Da;dD; due to null
>  at 
> org.apache.metron.common.message.JSONFromPosition.get(JSONFromPosition.java:49)
>  ~[stormjar.jar:?]
>  at 
> org.apache.metron.common.message.JSONFromPosition.get(JSONFromPosition.java:25)
>  ~[stormjar.jar:?]
>  at 
> org.apache.metron.writer.bolt.BulkMessageWriterBolt.execute(BulkMessageWriterBolt.java:237)
>  ~[stormjar.jar:?]
>  at 
> org.apache.storm.daemon.executor$fn__10195$tuple_action_fn__10197.invoke(executor.clj:735)
>  ~[storm-core-1.1.0.2.6.5.0-292.jar:1.1.0.2.6.5.0-292]
>  at 
> org.apache.storm.daemon.executor$mk_task_receiver$fn__10114.invoke(executor.clj:466)
>  ~[storm-core-1.1.0.2.6.5.0-292.jar:1.1.0.2.6.5.0-292]
>  at 
> org.apache.storm.disruptor$clojure_handler$reify__4137.onEvent(disruptor.clj:40)
>  ~[storm-core-1.1.0.2.6.5.0-292.jar:1.1.0.2.6.5.0-292]
>  at 
> org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:472)
>  ~[storm-core-1.1.0.2.6.5.0-292.jar:1.1.0.2.6.5.0-292]
>  ... 6 more
> Caused by: org.json.simple.parser.ParseException
>  at org.json.simple.parser.Yylex.yylex(Yylex.java:610) ~[stormjar.jar:?]
>  at org.json.simple.parser.JSONParser.nextToken(JSONParser.java:269) 
> ~[stormjar.jar:?]
>  at org.json.simple.parser.JSONParser.parse(JSONParser.java:118) 
> ~[stormjar.jar:?]
>  at org.json.simple.parser.JSONParser.parse(JSONParser.java:81) 
> ~[stormjar.jar:?]
>  at org.json.simple.parser.JSONParser.parse(JSONParser.java:75) 
> ~[stormjar.jar:?]
>  at 
> org.apache.metron.common.message.JSONFromPosition.get(JSONFromPosition.java:47)
>  ~[stormjar.jar:?]
>  at 
> org.apache.metron.common.message.JSONFromPosition.get(JSONFromPosition.java:25)
>  ~[stormjar.jar:?]
>  at 
> org.apache.metron.writer.bolt.BulkMessageWriterBolt.execute(BulkMessageWriterBolt.java:237)
>  ~[stormjar.jar:?]
>  at 
> org.apache.storm.daemon.executor$fn__10195$tuple_action_fn__10197.invoke(executor.clj:735)
>  ~[storm-core-1.1.0.2.6.5.0-292.jar:1.1.0.2.6.5.0-292]
>  at 
> org.apache.storm.daemon.executor$mk_task_receiver$fn__10114.invoke(executor.clj:466)
>  ~[storm-core-1.1.0.2.6.5.0-292.jar:1.1.0.2.6.5.0-292]
>  at 
> org.apache.storm.disruptor$clojure_handler$reify__4137.onEvent(disruptor.clj:40)
>  ~[storm-core-1.1.0.2.6.5.0-292.jar:1.1.0.2.6.5.0-292]
>  at 
> org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:472)
>  ~[storm-core-1.1.0.2.6.5.0-292.jar:1.1.0.2.6.5.0-292]
>  ... 6 more
> 2018-05-24 09:21:22.236 o.a.s.util Thread-9-indexingBolt-executor[3 3] 
> [ERROR] Halting process: ("Worker died")
> java.lang.RuntimeException: ("Worker died")
>  at org.apache.storm.util$exit_process_BANG_.doInvoke(util.clj:341) 
> [storm-core-1.1.0.2.6.5.0-292.jar:1.1.0.2.6.5.0-292]
>  at clojure.lang.RestFn.invoke(RestFn.java:423) [clojure-1.7.0.jar:?]
>  at org.apache.storm.daemon.worker$fn__10799$fn__10800.invoke(worker.clj:763) 
> [storm-core-1.1.0.2.6.5.0-292.jar:1.1.0.2.6.5.0-292]
>  at 
> org.apache.storm.daemon.executor$mk_executor_data$fn__10011$fn__10012.invoke(executor.clj:276)
>  [storm-core-1.1.0.2.6.5.0-292.jar:1.1.0.2.6.5.0-292]
>  at org.apache.storm.util$async_loop$fn__1221.invoke(util.clj:494) 
> [storm-core-1.1.0.2.6.5.0-292.jar:1.1.0.2.6.5.0-292]
>  at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
>  at java.lang.Thread.run(Thread.java:748) [?:1.8.0_161]
> 2018-05-24 09:21:22.237 o.a.s.d.worker Thread-20 [INFO] Shutting down worker 
> random_access_indexing-24-1527147389 703b5bf7-6c9d-46f3-8136-0c4877a69375 6700
> 2018-05-24 09:21:22.237 o.a.s.d.worker Thread-20 [INFO] Terminating messaging 
> context
> 2018-05-24 09:21:22.238 o.a.s.d.worker Thread-20 [INFO] Shutting down 
> executors
> 2018-05-24 09:21:22.238 o.a.s.d.executor Thread-20 [INFO] Shutting down 
> executor 
> __metricsorg.apache.hadoop.metrics2.sink.storm.StormTimelineMetricsSink:[2 2]
> 2018-05-24 09:21:22.239 o.a.s.util 

[jira] [Commented] (METRON-1584) Indexing Topology Crashes with Invalid Message

2018-05-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/METRON-1584?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16494425#comment-16494425
 ] 

ASF GitHub Bot commented on METRON-1584:


Github user cestella commented on the issue:

https://github.com/apache/metron/pull/1036
  
Yeah, I have mixed reactions.  I think refactoring opportunistically is 
damned useful, but if it becomes excessive it can make it hard to review and 
make it hard for those with open branches in the same codebase.

This seems like a broader discussion is merited, though.  Can we maybe move 
this to a mailing list DISCUSS thread?  This feels like one of those things we 
should hash out and should end up in the dev guidelines.


> Indexing Topology Crashes with Invalid Message
> --
>
> Key: METRON-1584
> URL: https://issues.apache.org/jira/browse/METRON-1584
> Project: Metron
>  Issue Type: Bug
>Reporter: Nick Allen
>Assignee: Nick Allen
>Priority: Major
>
> Per Mohan Venkateshaiah:
> I published message "adkadknalkda;LK;ad;Da;dD;" to indexing topic , I see 
> that the random access indexing topology worker thread died and couldn't 
> recover until the kafka topic was deleted and recreated.
> {code:java}
> Caused by: java.lang.IllegalStateException: Unable to parse 
> adkadknalkda;LK;ad;Da;dD; due to null
>  at 
> org.apache.metron.common.message.JSONFromPosition.get(JSONFromPosition.java:49)
>  ~[stormjar.jar:?]
>  at 
> org.apache.metron.common.message.JSONFromPosition.get(JSONFromPosition.java:25)
>  ~[stormjar.jar:?]
>  at 
> org.apache.metron.writer.bolt.BulkMessageWriterBolt.execute(BulkMessageWriterBolt.java:237)
>  ~[stormjar.jar:?]
>  at 
> org.apache.storm.daemon.executor$fn__10195$tuple_action_fn__10197.invoke(executor.clj:735)
>  ~[storm-core-1.1.0.2.6.5.0-292.jar:1.1.0.2.6.5.0-292]
>  at 
> org.apache.storm.daemon.executor$mk_task_receiver$fn__10114.invoke(executor.clj:466)
>  ~[storm-core-1.1.0.2.6.5.0-292.jar:1.1.0.2.6.5.0-292]
>  at 
> org.apache.storm.disruptor$clojure_handler$reify__4137.onEvent(disruptor.clj:40)
>  ~[storm-core-1.1.0.2.6.5.0-292.jar:1.1.0.2.6.5.0-292]
>  at 
> org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:472)
>  ~[storm-core-1.1.0.2.6.5.0-292.jar:1.1.0.2.6.5.0-292]
>  ... 6 more
> Caused by: org.json.simple.parser.ParseException
>  at org.json.simple.parser.Yylex.yylex(Yylex.java:610) ~[stormjar.jar:?]
>  at org.json.simple.parser.JSONParser.nextToken(JSONParser.java:269) 
> ~[stormjar.jar:?]
>  at org.json.simple.parser.JSONParser.parse(JSONParser.java:118) 
> ~[stormjar.jar:?]
>  at org.json.simple.parser.JSONParser.parse(JSONParser.java:81) 
> ~[stormjar.jar:?]
>  at org.json.simple.parser.JSONParser.parse(JSONParser.java:75) 
> ~[stormjar.jar:?]
>  at 
> org.apache.metron.common.message.JSONFromPosition.get(JSONFromPosition.java:47)
>  ~[stormjar.jar:?]
>  at 
> org.apache.metron.common.message.JSONFromPosition.get(JSONFromPosition.java:25)
>  ~[stormjar.jar:?]
>  at 
> org.apache.metron.writer.bolt.BulkMessageWriterBolt.execute(BulkMessageWriterBolt.java:237)
>  ~[stormjar.jar:?]
>  at 
> org.apache.storm.daemon.executor$fn__10195$tuple_action_fn__10197.invoke(executor.clj:735)
>  ~[storm-core-1.1.0.2.6.5.0-292.jar:1.1.0.2.6.5.0-292]
>  at 
> org.apache.storm.daemon.executor$mk_task_receiver$fn__10114.invoke(executor.clj:466)
>  ~[storm-core-1.1.0.2.6.5.0-292.jar:1.1.0.2.6.5.0-292]
>  at 
> org.apache.storm.disruptor$clojure_handler$reify__4137.onEvent(disruptor.clj:40)
>  ~[storm-core-1.1.0.2.6.5.0-292.jar:1.1.0.2.6.5.0-292]
>  at 
> org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:472)
>  ~[storm-core-1.1.0.2.6.5.0-292.jar:1.1.0.2.6.5.0-292]
>  ... 6 more
> 2018-05-24 09:21:22.236 o.a.s.util Thread-9-indexingBolt-executor[3 3] 
> [ERROR] Halting process: ("Worker died")
> java.lang.RuntimeException: ("Worker died")
>  at org.apache.storm.util$exit_process_BANG_.doInvoke(util.clj:341) 
> [storm-core-1.1.0.2.6.5.0-292.jar:1.1.0.2.6.5.0-292]
>  at clojure.lang.RestFn.invoke(RestFn.java:423) [clojure-1.7.0.jar:?]
>  at org.apache.storm.daemon.worker$fn__10799$fn__10800.invoke(worker.clj:763) 
> [storm-core-1.1.0.2.6.5.0-292.jar:1.1.0.2.6.5.0-292]
>  at 
> org.apache.storm.daemon.executor$mk_executor_data$fn__10011$fn__10012.invoke(executor.clj:276)
>  [storm-core-1.1.0.2.6.5.0-292.jar:1.1.0.2.6.5.0-292]
>  at org.apache.storm.util$async_loop$fn__1221.invoke(util.clj:494) 
> [storm-core-1.1.0.2.6.5.0-292.jar:1.1.0.2.6.5.0-292]
>  at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
>  at java.lang.Thread.run(Thread.java:748) [?:1.8.0_161]
> 2018-05-24 09:21:22.237 o.a.s.d.worker Thread-20 [INFO] Shutting down worker 
> random_access_indexing-24-1527147389 703b5bf7-6c9d-46f3-8136-0c4877a69375 6700
> 2018-05-24 09:21:22.237 

[jira] [Commented] (METRON-1584) Indexing Topology Crashes with Invalid Message

2018-05-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/METRON-1584?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16494406#comment-16494406
 ] 

ASF GitHub Bot commented on METRON-1584:


Github user mmiklavc commented on the issue:

https://github.com/apache/metron/pull/1036
  
I want to take a step back a moment here. I think there are some useful, 
clarifying refactorings in this PR. I'm generally in favor of refactoring 
improvements e.g. "handleTick(tuple)" that sum up the code nicely. I also like 
adding sensible clarifying comments and making tests more clear. However, I'm 
not sure how I feel about mixing refactoring like this with the bug fix itself. 
It seems to me that the actual bug fix is roughly 5-10 lines of code. It makes 
it more difficult to determine code that's specific to the change in mind. I 
noticed this while reviewing https://github.com/apache/metron/pull/1015 as 
well. Might it be better to do separate refactoring PR's? I am certainly not 
suggesting we be pedantic about it - I'm not even clear how I would even 
suggest a reasonable rule to follow, to be honest. Part of me thinks "ok, 
broken window theory and boyscout rules - this is great stuff." But in general, 
I think that splitting the refactoring will allow reviewers to voice opinions 
specific to coding style and refactoring changes as distinct from a feature or 
bug fix itself. It's clear that we all have varying opinions on what reads best 
in our code, and I think splitting those discussions off will make the review 
process more expedient.


> Indexing Topology Crashes with Invalid Message
> --
>
> Key: METRON-1584
> URL: https://issues.apache.org/jira/browse/METRON-1584
> Project: Metron
>  Issue Type: Bug
>Reporter: Nick Allen
>Assignee: Nick Allen
>Priority: Major
>
> Per Mohan Venkateshaiah:
> I published message "adkadknalkda;LK;ad;Da;dD;" to indexing topic , I see 
> that the random access indexing topology worker thread died and couldn't 
> recover until the kafka topic was deleted and recreated.
> {code:java}
> Caused by: java.lang.IllegalStateException: Unable to parse 
> adkadknalkda;LK;ad;Da;dD; due to null
>  at 
> org.apache.metron.common.message.JSONFromPosition.get(JSONFromPosition.java:49)
>  ~[stormjar.jar:?]
>  at 
> org.apache.metron.common.message.JSONFromPosition.get(JSONFromPosition.java:25)
>  ~[stormjar.jar:?]
>  at 
> org.apache.metron.writer.bolt.BulkMessageWriterBolt.execute(BulkMessageWriterBolt.java:237)
>  ~[stormjar.jar:?]
>  at 
> org.apache.storm.daemon.executor$fn__10195$tuple_action_fn__10197.invoke(executor.clj:735)
>  ~[storm-core-1.1.0.2.6.5.0-292.jar:1.1.0.2.6.5.0-292]
>  at 
> org.apache.storm.daemon.executor$mk_task_receiver$fn__10114.invoke(executor.clj:466)
>  ~[storm-core-1.1.0.2.6.5.0-292.jar:1.1.0.2.6.5.0-292]
>  at 
> org.apache.storm.disruptor$clojure_handler$reify__4137.onEvent(disruptor.clj:40)
>  ~[storm-core-1.1.0.2.6.5.0-292.jar:1.1.0.2.6.5.0-292]
>  at 
> org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:472)
>  ~[storm-core-1.1.0.2.6.5.0-292.jar:1.1.0.2.6.5.0-292]
>  ... 6 more
> Caused by: org.json.simple.parser.ParseException
>  at org.json.simple.parser.Yylex.yylex(Yylex.java:610) ~[stormjar.jar:?]
>  at org.json.simple.parser.JSONParser.nextToken(JSONParser.java:269) 
> ~[stormjar.jar:?]
>  at org.json.simple.parser.JSONParser.parse(JSONParser.java:118) 
> ~[stormjar.jar:?]
>  at org.json.simple.parser.JSONParser.parse(JSONParser.java:81) 
> ~[stormjar.jar:?]
>  at org.json.simple.parser.JSONParser.parse(JSONParser.java:75) 
> ~[stormjar.jar:?]
>  at 
> org.apache.metron.common.message.JSONFromPosition.get(JSONFromPosition.java:47)
>  ~[stormjar.jar:?]
>  at 
> org.apache.metron.common.message.JSONFromPosition.get(JSONFromPosition.java:25)
>  ~[stormjar.jar:?]
>  at 
> org.apache.metron.writer.bolt.BulkMessageWriterBolt.execute(BulkMessageWriterBolt.java:237)
>  ~[stormjar.jar:?]
>  at 
> org.apache.storm.daemon.executor$fn__10195$tuple_action_fn__10197.invoke(executor.clj:735)
>  ~[storm-core-1.1.0.2.6.5.0-292.jar:1.1.0.2.6.5.0-292]
>  at 
> org.apache.storm.daemon.executor$mk_task_receiver$fn__10114.invoke(executor.clj:466)
>  ~[storm-core-1.1.0.2.6.5.0-292.jar:1.1.0.2.6.5.0-292]
>  at 
> org.apache.storm.disruptor$clojure_handler$reify__4137.onEvent(disruptor.clj:40)
>  ~[storm-core-1.1.0.2.6.5.0-292.jar:1.1.0.2.6.5.0-292]
>  at 
> org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:472)
>  ~[storm-core-1.1.0.2.6.5.0-292.jar:1.1.0.2.6.5.0-292]
>  ... 6 more
> 2018-05-24 09:21:22.236 o.a.s.util Thread-9-indexingBolt-executor[3 3] 
> [ERROR] Halting process: ("Worker died")
> java.lang.RuntimeException: ("Worker died")
>  at org.apache.storm.util$exit_process_BANG_.doInvoke(util.clj:341) 
> 

[jira] [Commented] (METRON-1584) Indexing Topology Crashes with Invalid Message

2018-05-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/METRON-1584?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16494405#comment-16494405
 ] 

ASF GitHub Bot commented on METRON-1584:


Github user nickwallen commented on a diff in the pull request:

https://github.com/apache/metron/pull/1036#discussion_r191596670
  
--- Diff: 
metron-platform/metron-writer/src/main/java/org/apache/metron/writer/BulkWriterComponent.java
 ---
@@ -115,15 +116,37 @@ public void commit(BulkWriterResponse response) {
   }
 
   public void error(String sensorType, Throwable e, Iterable 
tuples, MessageGetStrategy messageGetStrategy) {
+
+if(!Iterables.isEmpty(tuples)) {
+  LOG.error("Failing tuples; count={}, error={}", 
Iterables.size(tuples), ExceptionUtils.getRootCauseMessage(e));
+}
 tuples.forEach(t -> collector.ack(t));
 MetronError error = new MetronError()
 .withSensorType(sensorType)
 .withErrorType(Constants.ErrorType.INDEXING_ERROR)
 .withThrowable(e);
+tuples.forEach(t -> error.addRawMessage(messageGetStrategy.get(t)));
+ErrorUtils.handleError(collector, error);
+  }
+
+  /**
+   * Error a set of tuples that may not contain a valid message.
+   *
+   * Without a valid message, the source type is unknown.
+   * Without a valid message, the JSON message cannot be added to the 
error.
+   *
+   * @param e The exception that occurred.
+   * @param tuples The tuples to error that may not contain valid messages.
+   */
+  public void error(Throwable e, Iterable tuples) {
+
 if(!Iterables.isEmpty(tuples)) {
-  LOG.error("Failing {} tuples", Iterables.size(tuples), e);
+  LOG.error("Failing tuples; count={}, error={}", 
Iterables.size(tuples), ExceptionUtils.getRootCauseMessage(e));
--- End diff --

Sure will do.


> Indexing Topology Crashes with Invalid Message
> --
>
> Key: METRON-1584
> URL: https://issues.apache.org/jira/browse/METRON-1584
> Project: Metron
>  Issue Type: Bug
>Reporter: Nick Allen
>Assignee: Nick Allen
>Priority: Major
>
> Per Mohan Venkateshaiah:
> I published message "adkadknalkda;LK;ad;Da;dD;" to indexing topic , I see 
> that the random access indexing topology worker thread died and couldn't 
> recover until the kafka topic was deleted and recreated.
> {code:java}
> Caused by: java.lang.IllegalStateException: Unable to parse 
> adkadknalkda;LK;ad;Da;dD; due to null
>  at 
> org.apache.metron.common.message.JSONFromPosition.get(JSONFromPosition.java:49)
>  ~[stormjar.jar:?]
>  at 
> org.apache.metron.common.message.JSONFromPosition.get(JSONFromPosition.java:25)
>  ~[stormjar.jar:?]
>  at 
> org.apache.metron.writer.bolt.BulkMessageWriterBolt.execute(BulkMessageWriterBolt.java:237)
>  ~[stormjar.jar:?]
>  at 
> org.apache.storm.daemon.executor$fn__10195$tuple_action_fn__10197.invoke(executor.clj:735)
>  ~[storm-core-1.1.0.2.6.5.0-292.jar:1.1.0.2.6.5.0-292]
>  at 
> org.apache.storm.daemon.executor$mk_task_receiver$fn__10114.invoke(executor.clj:466)
>  ~[storm-core-1.1.0.2.6.5.0-292.jar:1.1.0.2.6.5.0-292]
>  at 
> org.apache.storm.disruptor$clojure_handler$reify__4137.onEvent(disruptor.clj:40)
>  ~[storm-core-1.1.0.2.6.5.0-292.jar:1.1.0.2.6.5.0-292]
>  at 
> org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:472)
>  ~[storm-core-1.1.0.2.6.5.0-292.jar:1.1.0.2.6.5.0-292]
>  ... 6 more
> Caused by: org.json.simple.parser.ParseException
>  at org.json.simple.parser.Yylex.yylex(Yylex.java:610) ~[stormjar.jar:?]
>  at org.json.simple.parser.JSONParser.nextToken(JSONParser.java:269) 
> ~[stormjar.jar:?]
>  at org.json.simple.parser.JSONParser.parse(JSONParser.java:118) 
> ~[stormjar.jar:?]
>  at org.json.simple.parser.JSONParser.parse(JSONParser.java:81) 
> ~[stormjar.jar:?]
>  at org.json.simple.parser.JSONParser.parse(JSONParser.java:75) 
> ~[stormjar.jar:?]
>  at 
> org.apache.metron.common.message.JSONFromPosition.get(JSONFromPosition.java:47)
>  ~[stormjar.jar:?]
>  at 
> org.apache.metron.common.message.JSONFromPosition.get(JSONFromPosition.java:25)
>  ~[stormjar.jar:?]
>  at 
> org.apache.metron.writer.bolt.BulkMessageWriterBolt.execute(BulkMessageWriterBolt.java:237)
>  ~[stormjar.jar:?]
>  at 
> org.apache.storm.daemon.executor$fn__10195$tuple_action_fn__10197.invoke(executor.clj:735)
>  ~[storm-core-1.1.0.2.6.5.0-292.jar:1.1.0.2.6.5.0-292]
>  at 
> org.apache.storm.daemon.executor$mk_task_receiver$fn__10114.invoke(executor.clj:466)
>  ~[storm-core-1.1.0.2.6.5.0-292.jar:1.1.0.2.6.5.0-292]
>  at 
> org.apache.storm.disruptor$clojure_handler$reify__4137.onEvent(disruptor.clj:40)
>  ~[storm-core-1.1.0.2.6.5.0-292.jar:1.1.0.2.6.5.0-292]
>  at 
> 

[jira] [Commented] (METRON-1584) Indexing Topology Crashes with Invalid Message

2018-05-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/METRON-1584?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16494376#comment-16494376
 ] 

ASF GitHub Bot commented on METRON-1584:


Github user cestella commented on a diff in the pull request:

https://github.com/apache/metron/pull/1036#discussion_r191592021
  
--- Diff: 
metron-platform/metron-writer/src/main/java/org/apache/metron/writer/BulkWriterComponent.java
 ---
@@ -115,15 +116,37 @@ public void commit(BulkWriterResponse response) {
   }
 
   public void error(String sensorType, Throwable e, Iterable 
tuples, MessageGetStrategy messageGetStrategy) {
+
+if(!Iterables.isEmpty(tuples)) {
+  LOG.error("Failing tuples; count={}, error={}", 
Iterables.size(tuples), ExceptionUtils.getRootCauseMessage(e));
+}
 tuples.forEach(t -> collector.ack(t));
 MetronError error = new MetronError()
 .withSensorType(sensorType)
 .withErrorType(Constants.ErrorType.INDEXING_ERROR)
 .withThrowable(e);
+tuples.forEach(t -> error.addRawMessage(messageGetStrategy.get(t)));
+ErrorUtils.handleError(collector, error);
+  }
+
+  /**
+   * Error a set of tuples that may not contain a valid message.
+   *
+   * Without a valid message, the source type is unknown.
+   * Without a valid message, the JSON message cannot be added to the 
error.
+   *
+   * @param e The exception that occurred.
+   * @param tuples The tuples to error that may not contain valid messages.
+   */
+  public void error(Throwable e, Iterable tuples) {
+
 if(!Iterables.isEmpty(tuples)) {
-  LOG.error("Failing {} tuples", Iterables.size(tuples), e);
+  LOG.error("Failing tuples; count={}, error={}", 
Iterables.size(tuples), ExceptionUtils.getRootCauseMessage(e));
--- End diff --

Ugh, yeah, so I think we refactored at some point to move to parameterized 
logging statements and ran into 
[this](https://www.slf4j.org/faq.html#paramException).

Perhaps we can use `LOG.error(String.format("Failing tuples; count=%d, 
error=%s", ...), e);` and get the best of both worlds?


> Indexing Topology Crashes with Invalid Message
> --
>
> Key: METRON-1584
> URL: https://issues.apache.org/jira/browse/METRON-1584
> Project: Metron
>  Issue Type: Bug
>Reporter: Nick Allen
>Assignee: Nick Allen
>Priority: Major
>
> Per Mohan Venkateshaiah:
> I published message "adkadknalkda;LK;ad;Da;dD;" to indexing topic , I see 
> that the random access indexing topology worker thread died and couldn't 
> recover until the kafka topic was deleted and recreated.
> {code:java}
> Caused by: java.lang.IllegalStateException: Unable to parse 
> adkadknalkda;LK;ad;Da;dD; due to null
>  at 
> org.apache.metron.common.message.JSONFromPosition.get(JSONFromPosition.java:49)
>  ~[stormjar.jar:?]
>  at 
> org.apache.metron.common.message.JSONFromPosition.get(JSONFromPosition.java:25)
>  ~[stormjar.jar:?]
>  at 
> org.apache.metron.writer.bolt.BulkMessageWriterBolt.execute(BulkMessageWriterBolt.java:237)
>  ~[stormjar.jar:?]
>  at 
> org.apache.storm.daemon.executor$fn__10195$tuple_action_fn__10197.invoke(executor.clj:735)
>  ~[storm-core-1.1.0.2.6.5.0-292.jar:1.1.0.2.6.5.0-292]
>  at 
> org.apache.storm.daemon.executor$mk_task_receiver$fn__10114.invoke(executor.clj:466)
>  ~[storm-core-1.1.0.2.6.5.0-292.jar:1.1.0.2.6.5.0-292]
>  at 
> org.apache.storm.disruptor$clojure_handler$reify__4137.onEvent(disruptor.clj:40)
>  ~[storm-core-1.1.0.2.6.5.0-292.jar:1.1.0.2.6.5.0-292]
>  at 
> org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:472)
>  ~[storm-core-1.1.0.2.6.5.0-292.jar:1.1.0.2.6.5.0-292]
>  ... 6 more
> Caused by: org.json.simple.parser.ParseException
>  at org.json.simple.parser.Yylex.yylex(Yylex.java:610) ~[stormjar.jar:?]
>  at org.json.simple.parser.JSONParser.nextToken(JSONParser.java:269) 
> ~[stormjar.jar:?]
>  at org.json.simple.parser.JSONParser.parse(JSONParser.java:118) 
> ~[stormjar.jar:?]
>  at org.json.simple.parser.JSONParser.parse(JSONParser.java:81) 
> ~[stormjar.jar:?]
>  at org.json.simple.parser.JSONParser.parse(JSONParser.java:75) 
> ~[stormjar.jar:?]
>  at 
> org.apache.metron.common.message.JSONFromPosition.get(JSONFromPosition.java:47)
>  ~[stormjar.jar:?]
>  at 
> org.apache.metron.common.message.JSONFromPosition.get(JSONFromPosition.java:25)
>  ~[stormjar.jar:?]
>  at 
> org.apache.metron.writer.bolt.BulkMessageWriterBolt.execute(BulkMessageWriterBolt.java:237)
>  ~[stormjar.jar:?]
>  at 
> org.apache.storm.daemon.executor$fn__10195$tuple_action_fn__10197.invoke(executor.clj:735)
>  ~[storm-core-1.1.0.2.6.5.0-292.jar:1.1.0.2.6.5.0-292]
>  at 
> 

[jira] [Commented] (METRON-1584) Indexing Topology Crashes with Invalid Message

2018-05-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/METRON-1584?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16494366#comment-16494366
 ] 

ASF GitHub Bot commented on METRON-1584:


Github user nickwallen commented on a diff in the pull request:

https://github.com/apache/metron/pull/1036#discussion_r191590539
  
--- Diff: 
metron-platform/metron-writer/src/main/java/org/apache/metron/writer/bolt/BulkMessageWriterBolt.java
 ---
@@ -213,60 +214,144 @@ public void prepare(Map stormConf, TopologyContext 
context, OutputCollector coll
   @SuppressWarnings("unchecked")
   @Override
   public void execute(Tuple tuple) {
+
 if (isTick(tuple)) {
-  try {
-if (!(bulkMessageWriter instanceof WriterToBulkWriter)) {
-  //WriterToBulkWriter doesn't allow batching, so no need to flush 
on Tick.
-  LOG.debug("Flushing message queues older than their 
batchTimeouts");
-  getWriterComponent().flushTimeouts(bulkMessageWriter, 
configurationTransformation.apply(
-  new 
IndexingWriterConfiguration(bulkMessageWriter.getName(), getConfigurations()))
-  , messageGetStrategy);
-}
-  }
-  catch(Exception e) {
-throw new RuntimeException("This should have been caught in the 
writerComponent.  If you see this, file a JIRA", e);
-  }
-  finally {
-collector.ack(tuple);
-  }
-  return;
+  handleTick(tuple);
+
+} else {
+  handleMessage(tuple);
 }
+  }
+
+  /**
+   * Handle a tuple containing a message; anything other than a tick tuple.
+   *
+   * @param tuple The tuple containing a message.
+   */
+  private void handleMessage(Tuple tuple) {
+try {
+
+  JSONObject message = getMessage(tuple);
+  if(message == null) {
+handleMissingMessage(tuple);
+return;
+  }
 
-try
-{
-  JSONObject message = (JSONObject) messageGetStrategy.get(tuple);
   String sensorType = MessageUtils.getSensorType(message);
-  LOG.trace("Writing enrichment message: {}", message);
-  WriterConfiguration writerConfiguration = 
configurationTransformation.apply(
-  new IndexingWriterConfiguration(bulkMessageWriter.getName(), 
getConfigurations()));
   if(sensorType == null) {
-//sensor type somehow ended up being null.  We want to error this 
message directly.
-getWriterComponent().error("null"
- , new Exception("Sensor type is not specified 
for message "
-+ message.toJSONString()
-)
- , ImmutableList.of(tuple)
- , messageGetStrategy
- );
-  }
-  else {
-if (writerConfiguration.isDefault(sensorType)) {
-  //want to warn, but not fail the tuple
-  collector.reportError(new Exception("WARNING: Default and 
(likely) unoptimized writer config used for " + bulkMessageWriter.getName() + " 
writer and sensor " + sensorType));
-}
-
-getWriterComponent().write(sensorType
-, tuple
-, message
-, bulkMessageWriter
-, writerConfiguration
-, messageGetStrategy
-);
+handleMissingSensorType(tuple, message);
+return;
   }
+
+  writeMessage(tuple, message, sensorType);
+
+} catch (Exception e) {
+  throw new RuntimeException("This should have been caught in the 
writerComponent.  If you see this, file a JIRA", e);
 }
-catch(Exception e) {
+  }
+
+  /**
+   * Handles a tick tuple.
+   *
+   * @param tickTuple The tick tuple.
+   */
+  private void handleTick(Tuple tickTuple) {
+
+try {
+  if (!(bulkMessageWriter instanceof WriterToBulkWriter)) {
+//WriterToBulkWriter doesn't allow batching, so no need to flush 
on Tick.
+LOG.debug("Flushing message queues older than their 
batchTimeouts");
+getWriterComponent().flushTimeouts(bulkMessageWriter, 
configurationTransformation.apply(
+new 
IndexingWriterConfiguration(bulkMessageWriter.getName(), getConfigurations()))
+, messageGetStrategy);
+  }
+
+} catch(Exception e) {
   throw new RuntimeException("This should have been caught in the 
writerComponent.  If you see this, file a JIRA", e);
+
+} finally {
+  collector.ack(tickTuple);
+}
+  }
+
+  /**
+   * 

[jira] [Commented] (METRON-1584) Indexing Topology Crashes with Invalid Message

2018-05-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/METRON-1584?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16494357#comment-16494357
 ] 

ASF GitHub Bot commented on METRON-1584:


Github user cestella commented on a diff in the pull request:

https://github.com/apache/metron/pull/1036#discussion_r191588901
  
--- Diff: 
metron-platform/metron-writer/src/main/java/org/apache/metron/writer/bolt/BulkMessageWriterBolt.java
 ---
@@ -213,60 +214,144 @@ public void prepare(Map stormConf, TopologyContext 
context, OutputCollector coll
   @SuppressWarnings("unchecked")
   @Override
   public void execute(Tuple tuple) {
+
 if (isTick(tuple)) {
-  try {
-if (!(bulkMessageWriter instanceof WriterToBulkWriter)) {
-  //WriterToBulkWriter doesn't allow batching, so no need to flush 
on Tick.
-  LOG.debug("Flushing message queues older than their 
batchTimeouts");
-  getWriterComponent().flushTimeouts(bulkMessageWriter, 
configurationTransformation.apply(
-  new 
IndexingWriterConfiguration(bulkMessageWriter.getName(), getConfigurations()))
-  , messageGetStrategy);
-}
-  }
-  catch(Exception e) {
-throw new RuntimeException("This should have been caught in the 
writerComponent.  If you see this, file a JIRA", e);
-  }
-  finally {
-collector.ack(tuple);
-  }
-  return;
+  handleTick(tuple);
+
+} else {
+  handleMessage(tuple);
 }
+  }
+
+  /**
+   * Handle a tuple containing a message; anything other than a tick tuple.
+   *
+   * @param tuple The tuple containing a message.
+   */
+  private void handleMessage(Tuple tuple) {
+try {
+
+  JSONObject message = getMessage(tuple);
+  if(message == null) {
+handleMissingMessage(tuple);
+return;
+  }
 
-try
-{
-  JSONObject message = (JSONObject) messageGetStrategy.get(tuple);
   String sensorType = MessageUtils.getSensorType(message);
-  LOG.trace("Writing enrichment message: {}", message);
-  WriterConfiguration writerConfiguration = 
configurationTransformation.apply(
-  new IndexingWriterConfiguration(bulkMessageWriter.getName(), 
getConfigurations()));
   if(sensorType == null) {
-//sensor type somehow ended up being null.  We want to error this 
message directly.
-getWriterComponent().error("null"
- , new Exception("Sensor type is not specified 
for message "
-+ message.toJSONString()
-)
- , ImmutableList.of(tuple)
- , messageGetStrategy
- );
-  }
-  else {
-if (writerConfiguration.isDefault(sensorType)) {
-  //want to warn, but not fail the tuple
-  collector.reportError(new Exception("WARNING: Default and 
(likely) unoptimized writer config used for " + bulkMessageWriter.getName() + " 
writer and sensor " + sensorType));
-}
-
-getWriterComponent().write(sensorType
-, tuple
-, message
-, bulkMessageWriter
-, writerConfiguration
-, messageGetStrategy
-);
+handleMissingSensorType(tuple, message);
+return;
   }
+
+  writeMessage(tuple, message, sensorType);
+
+} catch (Exception e) {
+  throw new RuntimeException("This should have been caught in the 
writerComponent.  If you see this, file a JIRA", e);
 }
-catch(Exception e) {
+  }
+
+  /**
+   * Handles a tick tuple.
+   *
+   * @param tickTuple The tick tuple.
+   */
+  private void handleTick(Tuple tickTuple) {
+
+try {
+  if (!(bulkMessageWriter instanceof WriterToBulkWriter)) {
+//WriterToBulkWriter doesn't allow batching, so no need to flush 
on Tick.
+LOG.debug("Flushing message queues older than their 
batchTimeouts");
+getWriterComponent().flushTimeouts(bulkMessageWriter, 
configurationTransformation.apply(
+new 
IndexingWriterConfiguration(bulkMessageWriter.getName(), getConfigurations()))
+, messageGetStrategy);
+  }
+
+} catch(Exception e) {
   throw new RuntimeException("This should have been caught in the 
writerComponent.  If you see this, file a JIRA", e);
+
+} finally {
+  collector.ack(tickTuple);
+}
+  }
+
+  /**
+   * 

[jira] [Commented] (METRON-1584) Indexing Topology Crashes with Invalid Message

2018-05-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/METRON-1584?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16494355#comment-16494355
 ] 

ASF GitHub Bot commented on METRON-1584:


Github user cestella commented on a diff in the pull request:

https://github.com/apache/metron/pull/1036#discussion_r191588785
  
--- Diff: 
metron-platform/metron-writer/src/main/java/org/apache/metron/writer/BulkWriterComponent.java
 ---
@@ -115,15 +116,37 @@ public void commit(BulkWriterResponse response) {
   }
 
   public void error(String sensorType, Throwable e, Iterable 
tuples, MessageGetStrategy messageGetStrategy) {
+
+if(!Iterables.isEmpty(tuples)) {
+  LOG.error("Failing tuples; count={}, error={}", 
Iterables.size(tuples), ExceptionUtils.getRootCauseMessage(e));
+}
 tuples.forEach(t -> collector.ack(t));
 MetronError error = new MetronError()
 .withSensorType(sensorType)
 .withErrorType(Constants.ErrorType.INDEXING_ERROR)
 .withThrowable(e);
+tuples.forEach(t -> error.addRawMessage(messageGetStrategy.get(t)));
+ErrorUtils.handleError(collector, error);
+  }
+
+  /**
+   * Error a set of tuples that may not contain a valid message.
+   *
+   * Without a valid message, the source type is unknown.
+   * Without a valid message, the JSON message cannot be added to the 
error.
+   *
+   * @param e The exception that occurred.
+   * @param tuples The tuples to error that may not contain valid messages.
+   */
+  public void error(Throwable e, Iterable tuples) {
+
 if(!Iterables.isEmpty(tuples)) {
-  LOG.error("Failing {} tuples", Iterables.size(tuples), e);
+  LOG.error("Failing tuples; count={}, error={}", 
Iterables.size(tuples), ExceptionUtils.getRootCauseMessage(e));
 }
-tuples.forEach(t -> error.addRawMessage(messageGetStrategy.get(t)));
+tuples.forEach(t -> collector.ack(t));
--- End diff --

AH yes, it is, but this is a new error method.  It feels that both `error` 
method should be calling another `error`method to do the basics like `ack()`.


> Indexing Topology Crashes with Invalid Message
> --
>
> Key: METRON-1584
> URL: https://issues.apache.org/jira/browse/METRON-1584
> Project: Metron
>  Issue Type: Bug
>Reporter: Nick Allen
>Assignee: Nick Allen
>Priority: Major
>
> Per Mohan Venkateshaiah:
> I published message "adkadknalkda;LK;ad;Da;dD;" to indexing topic , I see 
> that the random access indexing topology worker thread died and couldn't 
> recover until the kafka topic was deleted and recreated.
> {code:java}
> Caused by: java.lang.IllegalStateException: Unable to parse 
> adkadknalkda;LK;ad;Da;dD; due to null
>  at 
> org.apache.metron.common.message.JSONFromPosition.get(JSONFromPosition.java:49)
>  ~[stormjar.jar:?]
>  at 
> org.apache.metron.common.message.JSONFromPosition.get(JSONFromPosition.java:25)
>  ~[stormjar.jar:?]
>  at 
> org.apache.metron.writer.bolt.BulkMessageWriterBolt.execute(BulkMessageWriterBolt.java:237)
>  ~[stormjar.jar:?]
>  at 
> org.apache.storm.daemon.executor$fn__10195$tuple_action_fn__10197.invoke(executor.clj:735)
>  ~[storm-core-1.1.0.2.6.5.0-292.jar:1.1.0.2.6.5.0-292]
>  at 
> org.apache.storm.daemon.executor$mk_task_receiver$fn__10114.invoke(executor.clj:466)
>  ~[storm-core-1.1.0.2.6.5.0-292.jar:1.1.0.2.6.5.0-292]
>  at 
> org.apache.storm.disruptor$clojure_handler$reify__4137.onEvent(disruptor.clj:40)
>  ~[storm-core-1.1.0.2.6.5.0-292.jar:1.1.0.2.6.5.0-292]
>  at 
> org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:472)
>  ~[storm-core-1.1.0.2.6.5.0-292.jar:1.1.0.2.6.5.0-292]
>  ... 6 more
> Caused by: org.json.simple.parser.ParseException
>  at org.json.simple.parser.Yylex.yylex(Yylex.java:610) ~[stormjar.jar:?]
>  at org.json.simple.parser.JSONParser.nextToken(JSONParser.java:269) 
> ~[stormjar.jar:?]
>  at org.json.simple.parser.JSONParser.parse(JSONParser.java:118) 
> ~[stormjar.jar:?]
>  at org.json.simple.parser.JSONParser.parse(JSONParser.java:81) 
> ~[stormjar.jar:?]
>  at org.json.simple.parser.JSONParser.parse(JSONParser.java:75) 
> ~[stormjar.jar:?]
>  at 
> org.apache.metron.common.message.JSONFromPosition.get(JSONFromPosition.java:47)
>  ~[stormjar.jar:?]
>  at 
> org.apache.metron.common.message.JSONFromPosition.get(JSONFromPosition.java:25)
>  ~[stormjar.jar:?]
>  at 
> org.apache.metron.writer.bolt.BulkMessageWriterBolt.execute(BulkMessageWriterBolt.java:237)
>  ~[stormjar.jar:?]
>  at 
> org.apache.storm.daemon.executor$fn__10195$tuple_action_fn__10197.invoke(executor.clj:735)
>  ~[storm-core-1.1.0.2.6.5.0-292.jar:1.1.0.2.6.5.0-292]
>  at 
> 

[jira] [Commented] (METRON-1584) Indexing Topology Crashes with Invalid Message

2018-05-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/METRON-1584?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16494352#comment-16494352
 ] 

ASF GitHub Bot commented on METRON-1584:


Github user nickwallen commented on a diff in the pull request:

https://github.com/apache/metron/pull/1036#discussion_r191588328
  
--- Diff: 
metron-platform/metron-writer/src/main/java/org/apache/metron/writer/BulkWriterComponent.java
 ---
@@ -115,15 +116,37 @@ public void commit(BulkWriterResponse response) {
   }
 
   public void error(String sensorType, Throwable e, Iterable 
tuples, MessageGetStrategy messageGetStrategy) {
+
+if(!Iterables.isEmpty(tuples)) {
+  LOG.error("Failing tuples; count={}, error={}", 
Iterables.size(tuples), ExceptionUtils.getRootCauseMessage(e));
+}
 tuples.forEach(t -> collector.ack(t));
 MetronError error = new MetronError()
 .withSensorType(sensorType)
 .withErrorType(Constants.ErrorType.INDEXING_ERROR)
 .withThrowable(e);
+tuples.forEach(t -> error.addRawMessage(messageGetStrategy.get(t)));
+ErrorUtils.handleError(collector, error);
+  }
+
+  /**
+   * Error a set of tuples that may not contain a valid message.
+   *
+   * Without a valid message, the source type is unknown.
+   * Without a valid message, the JSON message cannot be added to the 
error.
+   *
+   * @param e The exception that occurred.
+   * @param tuples The tuples to error that may not contain valid messages.
+   */
+  public void error(Throwable e, Iterable tuples) {
+
 if(!Iterables.isEmpty(tuples)) {
-  LOG.error("Failing {} tuples", Iterables.size(tuples), e);
+  LOG.error("Failing tuples; count={}, error={}", 
Iterables.size(tuples), ExceptionUtils.getRootCauseMessage(e));
--- End diff --

We were not logging the the exception message or stack trace at all before; 
just "Failing 8 tuples".  

The error method was not being used as intended AFAIK.  Having the 
exception as the last parameter was just doing a `{}` substitution for a `{}` 
that did not exist.

I can change this to log the stack trace, if you like.  


> Indexing Topology Crashes with Invalid Message
> --
>
> Key: METRON-1584
> URL: https://issues.apache.org/jira/browse/METRON-1584
> Project: Metron
>  Issue Type: Bug
>Reporter: Nick Allen
>Assignee: Nick Allen
>Priority: Major
>
> Per Mohan Venkateshaiah:
> I published message "adkadknalkda;LK;ad;Da;dD;" to indexing topic , I see 
> that the random access indexing topology worker thread died and couldn't 
> recover until the kafka topic was deleted and recreated.
> {code:java}
> Caused by: java.lang.IllegalStateException: Unable to parse 
> adkadknalkda;LK;ad;Da;dD; due to null
>  at 
> org.apache.metron.common.message.JSONFromPosition.get(JSONFromPosition.java:49)
>  ~[stormjar.jar:?]
>  at 
> org.apache.metron.common.message.JSONFromPosition.get(JSONFromPosition.java:25)
>  ~[stormjar.jar:?]
>  at 
> org.apache.metron.writer.bolt.BulkMessageWriterBolt.execute(BulkMessageWriterBolt.java:237)
>  ~[stormjar.jar:?]
>  at 
> org.apache.storm.daemon.executor$fn__10195$tuple_action_fn__10197.invoke(executor.clj:735)
>  ~[storm-core-1.1.0.2.6.5.0-292.jar:1.1.0.2.6.5.0-292]
>  at 
> org.apache.storm.daemon.executor$mk_task_receiver$fn__10114.invoke(executor.clj:466)
>  ~[storm-core-1.1.0.2.6.5.0-292.jar:1.1.0.2.6.5.0-292]
>  at 
> org.apache.storm.disruptor$clojure_handler$reify__4137.onEvent(disruptor.clj:40)
>  ~[storm-core-1.1.0.2.6.5.0-292.jar:1.1.0.2.6.5.0-292]
>  at 
> org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:472)
>  ~[storm-core-1.1.0.2.6.5.0-292.jar:1.1.0.2.6.5.0-292]
>  ... 6 more
> Caused by: org.json.simple.parser.ParseException
>  at org.json.simple.parser.Yylex.yylex(Yylex.java:610) ~[stormjar.jar:?]
>  at org.json.simple.parser.JSONParser.nextToken(JSONParser.java:269) 
> ~[stormjar.jar:?]
>  at org.json.simple.parser.JSONParser.parse(JSONParser.java:118) 
> ~[stormjar.jar:?]
>  at org.json.simple.parser.JSONParser.parse(JSONParser.java:81) 
> ~[stormjar.jar:?]
>  at org.json.simple.parser.JSONParser.parse(JSONParser.java:75) 
> ~[stormjar.jar:?]
>  at 
> org.apache.metron.common.message.JSONFromPosition.get(JSONFromPosition.java:47)
>  ~[stormjar.jar:?]
>  at 
> org.apache.metron.common.message.JSONFromPosition.get(JSONFromPosition.java:25)
>  ~[stormjar.jar:?]
>  at 
> org.apache.metron.writer.bolt.BulkMessageWriterBolt.execute(BulkMessageWriterBolt.java:237)
>  ~[stormjar.jar:?]
>  at 
> org.apache.storm.daemon.executor$fn__10195$tuple_action_fn__10197.invoke(executor.clj:735)
>  ~[storm-core-1.1.0.2.6.5.0-292.jar:1.1.0.2.6.5.0-292]
>  

[jira] [Commented] (METRON-1584) Indexing Topology Crashes with Invalid Message

2018-05-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/METRON-1584?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16494348#comment-16494348
 ] 

ASF GitHub Bot commented on METRON-1584:


Github user nickwallen commented on a diff in the pull request:

https://github.com/apache/metron/pull/1036#discussion_r191587917
  
--- Diff: 
metron-platform/metron-writer/src/main/java/org/apache/metron/writer/bolt/BulkMessageWriterBolt.java
 ---
@@ -213,60 +214,144 @@ public void prepare(Map stormConf, TopologyContext 
context, OutputCollector coll
   @SuppressWarnings("unchecked")
   @Override
   public void execute(Tuple tuple) {
+
 if (isTick(tuple)) {
-  try {
-if (!(bulkMessageWriter instanceof WriterToBulkWriter)) {
-  //WriterToBulkWriter doesn't allow batching, so no need to flush 
on Tick.
-  LOG.debug("Flushing message queues older than their 
batchTimeouts");
-  getWriterComponent().flushTimeouts(bulkMessageWriter, 
configurationTransformation.apply(
-  new 
IndexingWriterConfiguration(bulkMessageWriter.getName(), getConfigurations()))
-  , messageGetStrategy);
-}
-  }
-  catch(Exception e) {
-throw new RuntimeException("This should have been caught in the 
writerComponent.  If you see this, file a JIRA", e);
-  }
-  finally {
-collector.ack(tuple);
-  }
-  return;
+  handleTick(tuple);
+
+} else {
+  handleMessage(tuple);
 }
+  }
+
+  /**
+   * Handle a tuple containing a message; anything other than a tick tuple.
+   *
+   * @param tuple The tuple containing a message.
+   */
+  private void handleMessage(Tuple tuple) {
+try {
+
+  JSONObject message = getMessage(tuple);
+  if(message == null) {
+handleMissingMessage(tuple);
+return;
+  }
 
-try
-{
-  JSONObject message = (JSONObject) messageGetStrategy.get(tuple);
   String sensorType = MessageUtils.getSensorType(message);
-  LOG.trace("Writing enrichment message: {}", message);
-  WriterConfiguration writerConfiguration = 
configurationTransformation.apply(
-  new IndexingWriterConfiguration(bulkMessageWriter.getName(), 
getConfigurations()));
   if(sensorType == null) {
-//sensor type somehow ended up being null.  We want to error this 
message directly.
-getWriterComponent().error("null"
- , new Exception("Sensor type is not specified 
for message "
-+ message.toJSONString()
-)
- , ImmutableList.of(tuple)
- , messageGetStrategy
- );
-  }
-  else {
-if (writerConfiguration.isDefault(sensorType)) {
-  //want to warn, but not fail the tuple
-  collector.reportError(new Exception("WARNING: Default and 
(likely) unoptimized writer config used for " + bulkMessageWriter.getName() + " 
writer and sensor " + sensorType));
-}
-
-getWriterComponent().write(sensorType
-, tuple
-, message
-, bulkMessageWriter
-, writerConfiguration
-, messageGetStrategy
-);
+handleMissingSensorType(tuple, message);
+return;
   }
+
+  writeMessage(tuple, message, sensorType);
+
+} catch (Exception e) {
+  throw new RuntimeException("This should have been caught in the 
writerComponent.  If you see this, file a JIRA", e);
 }
-catch(Exception e) {
+  }
+
+  /**
+   * Handles a tick tuple.
+   *
+   * @param tickTuple The tick tuple.
+   */
+  private void handleTick(Tuple tickTuple) {
+
+try {
+  if (!(bulkMessageWriter instanceof WriterToBulkWriter)) {
+//WriterToBulkWriter doesn't allow batching, so no need to flush 
on Tick.
+LOG.debug("Flushing message queues older than their 
batchTimeouts");
+getWriterComponent().flushTimeouts(bulkMessageWriter, 
configurationTransformation.apply(
+new 
IndexingWriterConfiguration(bulkMessageWriter.getName(), getConfigurations()))
+, messageGetStrategy);
+  }
+
+} catch(Exception e) {
   throw new RuntimeException("This should have been caught in the 
writerComponent.  If you see this, file a JIRA", e);
+
+} finally {
+  collector.ack(tickTuple);
+}
+  }
+
+  /**
+   * 

[jira] [Commented] (METRON-1584) Indexing Topology Crashes with Invalid Message

2018-05-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/METRON-1584?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16494328#comment-16494328
 ] 

ASF GitHub Bot commented on METRON-1584:


Github user mmiklavc commented on a diff in the pull request:

https://github.com/apache/metron/pull/1036#discussion_r191585908
  
--- Diff: 
metron-platform/metron-writer/src/main/java/org/apache/metron/writer/BulkWriterComponent.java
 ---
@@ -115,15 +116,37 @@ public void commit(BulkWriterResponse response) {
   }
 
   public void error(String sensorType, Throwable e, Iterable 
tuples, MessageGetStrategy messageGetStrategy) {
+
+if(!Iterables.isEmpty(tuples)) {
+  LOG.error("Failing tuples; count={}, error={}", 
Iterables.size(tuples), ExceptionUtils.getRootCauseMessage(e));
+}
 tuples.forEach(t -> collector.ack(t));
 MetronError error = new MetronError()
 .withSensorType(sensorType)
 .withErrorType(Constants.ErrorType.INDEXING_ERROR)
 .withThrowable(e);
+tuples.forEach(t -> error.addRawMessage(messageGetStrategy.get(t)));
+ErrorUtils.handleError(collector, error);
+  }
+
+  /**
+   * Error a set of tuples that may not contain a valid message.
+   *
+   * Without a valid message, the source type is unknown.
+   * Without a valid message, the JSON message cannot be added to the 
error.
+   *
+   * @param e The exception that occurred.
+   * @param tuples The tuples to error that may not contain valid messages.
+   */
+  public void error(Throwable e, Iterable tuples) {
+
 if(!Iterables.isEmpty(tuples)) {
-  LOG.error("Failing {} tuples", Iterables.size(tuples), e);
+  LOG.error("Failing tuples; count={}, error={}", 
Iterables.size(tuples), ExceptionUtils.getRootCauseMessage(e));
 }
-tuples.forEach(t -> error.addRawMessage(messageGetStrategy.get(t)));
+tuples.forEach(t -> collector.ack(t));
--- End diff --

Wait, @cestella isn't it already acked in line 123? 


> Indexing Topology Crashes with Invalid Message
> --
>
> Key: METRON-1584
> URL: https://issues.apache.org/jira/browse/METRON-1584
> Project: Metron
>  Issue Type: Bug
>Reporter: Nick Allen
>Assignee: Nick Allen
>Priority: Major
>
> Per Mohan Venkateshaiah:
> I published message "adkadknalkda;LK;ad;Da;dD;" to indexing topic , I see 
> that the random access indexing topology worker thread died and couldn't 
> recover until the kafka topic was deleted and recreated.
> {code:java}
> Caused by: java.lang.IllegalStateException: Unable to parse 
> adkadknalkda;LK;ad;Da;dD; due to null
>  at 
> org.apache.metron.common.message.JSONFromPosition.get(JSONFromPosition.java:49)
>  ~[stormjar.jar:?]
>  at 
> org.apache.metron.common.message.JSONFromPosition.get(JSONFromPosition.java:25)
>  ~[stormjar.jar:?]
>  at 
> org.apache.metron.writer.bolt.BulkMessageWriterBolt.execute(BulkMessageWriterBolt.java:237)
>  ~[stormjar.jar:?]
>  at 
> org.apache.storm.daemon.executor$fn__10195$tuple_action_fn__10197.invoke(executor.clj:735)
>  ~[storm-core-1.1.0.2.6.5.0-292.jar:1.1.0.2.6.5.0-292]
>  at 
> org.apache.storm.daemon.executor$mk_task_receiver$fn__10114.invoke(executor.clj:466)
>  ~[storm-core-1.1.0.2.6.5.0-292.jar:1.1.0.2.6.5.0-292]
>  at 
> org.apache.storm.disruptor$clojure_handler$reify__4137.onEvent(disruptor.clj:40)
>  ~[storm-core-1.1.0.2.6.5.0-292.jar:1.1.0.2.6.5.0-292]
>  at 
> org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:472)
>  ~[storm-core-1.1.0.2.6.5.0-292.jar:1.1.0.2.6.5.0-292]
>  ... 6 more
> Caused by: org.json.simple.parser.ParseException
>  at org.json.simple.parser.Yylex.yylex(Yylex.java:610) ~[stormjar.jar:?]
>  at org.json.simple.parser.JSONParser.nextToken(JSONParser.java:269) 
> ~[stormjar.jar:?]
>  at org.json.simple.parser.JSONParser.parse(JSONParser.java:118) 
> ~[stormjar.jar:?]
>  at org.json.simple.parser.JSONParser.parse(JSONParser.java:81) 
> ~[stormjar.jar:?]
>  at org.json.simple.parser.JSONParser.parse(JSONParser.java:75) 
> ~[stormjar.jar:?]
>  at 
> org.apache.metron.common.message.JSONFromPosition.get(JSONFromPosition.java:47)
>  ~[stormjar.jar:?]
>  at 
> org.apache.metron.common.message.JSONFromPosition.get(JSONFromPosition.java:25)
>  ~[stormjar.jar:?]
>  at 
> org.apache.metron.writer.bolt.BulkMessageWriterBolt.execute(BulkMessageWriterBolt.java:237)
>  ~[stormjar.jar:?]
>  at 
> org.apache.storm.daemon.executor$fn__10195$tuple_action_fn__10197.invoke(executor.clj:735)
>  ~[storm-core-1.1.0.2.6.5.0-292.jar:1.1.0.2.6.5.0-292]
>  at 
> org.apache.storm.daemon.executor$mk_task_receiver$fn__10114.invoke(executor.clj:466)
>  ~[storm-core-1.1.0.2.6.5.0-292.jar:1.1.0.2.6.5.0-292]
>  at 
> 

[jira] [Commented] (METRON-1584) Indexing Topology Crashes with Invalid Message

2018-05-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/METRON-1584?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16494322#comment-16494322
 ] 

ASF GitHub Bot commented on METRON-1584:


Github user cestella commented on a diff in the pull request:

https://github.com/apache/metron/pull/1036#discussion_r191585015
  
--- Diff: 
metron-platform/metron-writer/src/main/java/org/apache/metron/writer/bolt/BulkMessageWriterBolt.java
 ---
@@ -213,60 +214,144 @@ public void prepare(Map stormConf, TopologyContext 
context, OutputCollector coll
   @SuppressWarnings("unchecked")
   @Override
   public void execute(Tuple tuple) {
+
 if (isTick(tuple)) {
-  try {
-if (!(bulkMessageWriter instanceof WriterToBulkWriter)) {
-  //WriterToBulkWriter doesn't allow batching, so no need to flush 
on Tick.
-  LOG.debug("Flushing message queues older than their 
batchTimeouts");
-  getWriterComponent().flushTimeouts(bulkMessageWriter, 
configurationTransformation.apply(
-  new 
IndexingWriterConfiguration(bulkMessageWriter.getName(), getConfigurations()))
-  , messageGetStrategy);
-}
-  }
-  catch(Exception e) {
-throw new RuntimeException("This should have been caught in the 
writerComponent.  If you see this, file a JIRA", e);
-  }
-  finally {
-collector.ack(tuple);
-  }
-  return;
+  handleTick(tuple);
+
+} else {
+  handleMessage(tuple);
 }
+  }
+
+  /**
+   * Handle a tuple containing a message; anything other than a tick tuple.
+   *
+   * @param tuple The tuple containing a message.
+   */
+  private void handleMessage(Tuple tuple) {
+try {
+
+  JSONObject message = getMessage(tuple);
+  if(message == null) {
+handleMissingMessage(tuple);
+return;
+  }
 
-try
-{
-  JSONObject message = (JSONObject) messageGetStrategy.get(tuple);
   String sensorType = MessageUtils.getSensorType(message);
-  LOG.trace("Writing enrichment message: {}", message);
-  WriterConfiguration writerConfiguration = 
configurationTransformation.apply(
-  new IndexingWriterConfiguration(bulkMessageWriter.getName(), 
getConfigurations()));
   if(sensorType == null) {
-//sensor type somehow ended up being null.  We want to error this 
message directly.
-getWriterComponent().error("null"
- , new Exception("Sensor type is not specified 
for message "
-+ message.toJSONString()
-)
- , ImmutableList.of(tuple)
- , messageGetStrategy
- );
-  }
-  else {
-if (writerConfiguration.isDefault(sensorType)) {
-  //want to warn, but not fail the tuple
-  collector.reportError(new Exception("WARNING: Default and 
(likely) unoptimized writer config used for " + bulkMessageWriter.getName() + " 
writer and sensor " + sensorType));
-}
-
-getWriterComponent().write(sensorType
-, tuple
-, message
-, bulkMessageWriter
-, writerConfiguration
-, messageGetStrategy
-);
+handleMissingSensorType(tuple, message);
+return;
   }
+
+  writeMessage(tuple, message, sensorType);
+
+} catch (Exception e) {
+  throw new RuntimeException("This should have been caught in the 
writerComponent.  If you see this, file a JIRA", e);
 }
-catch(Exception e) {
+  }
+
+  /**
+   * Handles a tick tuple.
+   *
+   * @param tickTuple The tick tuple.
+   */
+  private void handleTick(Tuple tickTuple) {
+
+try {
+  if (!(bulkMessageWriter instanceof WriterToBulkWriter)) {
+//WriterToBulkWriter doesn't allow batching, so no need to flush 
on Tick.
+LOG.debug("Flushing message queues older than their 
batchTimeouts");
+getWriterComponent().flushTimeouts(bulkMessageWriter, 
configurationTransformation.apply(
+new 
IndexingWriterConfiguration(bulkMessageWriter.getName(), getConfigurations()))
+, messageGetStrategy);
+  }
+
+} catch(Exception e) {
   throw new RuntimeException("This should have been caught in the 
writerComponent.  If you see this, file a JIRA", e);
+
+} finally {
+  collector.ack(tickTuple);
+}
+  }
+
+  /**
+   * 

[jira] [Commented] (METRON-1584) Indexing Topology Crashes with Invalid Message

2018-05-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/METRON-1584?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16494320#comment-16494320
 ] 

ASF GitHub Bot commented on METRON-1584:


Github user cestella commented on a diff in the pull request:

https://github.com/apache/metron/pull/1036#discussion_r191584626
  
--- Diff: 
metron-platform/metron-writer/src/main/java/org/apache/metron/writer/BulkWriterComponent.java
 ---
@@ -115,15 +116,37 @@ public void commit(BulkWriterResponse response) {
   }
 
   public void error(String sensorType, Throwable e, Iterable 
tuples, MessageGetStrategy messageGetStrategy) {
+
+if(!Iterables.isEmpty(tuples)) {
+  LOG.error("Failing tuples; count={}, error={}", 
Iterables.size(tuples), ExceptionUtils.getRootCauseMessage(e));
+}
 tuples.forEach(t -> collector.ack(t));
 MetronError error = new MetronError()
 .withSensorType(sensorType)
 .withErrorType(Constants.ErrorType.INDEXING_ERROR)
 .withThrowable(e);
+tuples.forEach(t -> error.addRawMessage(messageGetStrategy.get(t)));
+ErrorUtils.handleError(collector, error);
+  }
+
+  /**
+   * Error a set of tuples that may not contain a valid message.
+   *
+   * Without a valid message, the source type is unknown.
+   * Without a valid message, the JSON message cannot be added to the 
error.
+   *
+   * @param e The exception that occurred.
+   * @param tuples The tuples to error that may not contain valid messages.
+   */
+  public void error(Throwable e, Iterable tuples) {
+
 if(!Iterables.isEmpty(tuples)) {
-  LOG.error("Failing {} tuples", Iterables.size(tuples), e);
+  LOG.error("Failing tuples; count={}, error={}", 
Iterables.size(tuples), ExceptionUtils.getRootCauseMessage(e));
 }
-tuples.forEach(t -> error.addRawMessage(messageGetStrategy.get(t)));
+tuples.forEach(t -> collector.ack(t));
--- End diff --

wow, we did we really not ack here before?  That's a bug. good catch.


> Indexing Topology Crashes with Invalid Message
> --
>
> Key: METRON-1584
> URL: https://issues.apache.org/jira/browse/METRON-1584
> Project: Metron
>  Issue Type: Bug
>Reporter: Nick Allen
>Assignee: Nick Allen
>Priority: Major
>
> Per Mohan Venkateshaiah:
> I published message "adkadknalkda;LK;ad;Da;dD;" to indexing topic , I see 
> that the random access indexing topology worker thread died and couldn't 
> recover until the kafka topic was deleted and recreated.
> {code:java}
> Caused by: java.lang.IllegalStateException: Unable to parse 
> adkadknalkda;LK;ad;Da;dD; due to null
>  at 
> org.apache.metron.common.message.JSONFromPosition.get(JSONFromPosition.java:49)
>  ~[stormjar.jar:?]
>  at 
> org.apache.metron.common.message.JSONFromPosition.get(JSONFromPosition.java:25)
>  ~[stormjar.jar:?]
>  at 
> org.apache.metron.writer.bolt.BulkMessageWriterBolt.execute(BulkMessageWriterBolt.java:237)
>  ~[stormjar.jar:?]
>  at 
> org.apache.storm.daemon.executor$fn__10195$tuple_action_fn__10197.invoke(executor.clj:735)
>  ~[storm-core-1.1.0.2.6.5.0-292.jar:1.1.0.2.6.5.0-292]
>  at 
> org.apache.storm.daemon.executor$mk_task_receiver$fn__10114.invoke(executor.clj:466)
>  ~[storm-core-1.1.0.2.6.5.0-292.jar:1.1.0.2.6.5.0-292]
>  at 
> org.apache.storm.disruptor$clojure_handler$reify__4137.onEvent(disruptor.clj:40)
>  ~[storm-core-1.1.0.2.6.5.0-292.jar:1.1.0.2.6.5.0-292]
>  at 
> org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:472)
>  ~[storm-core-1.1.0.2.6.5.0-292.jar:1.1.0.2.6.5.0-292]
>  ... 6 more
> Caused by: org.json.simple.parser.ParseException
>  at org.json.simple.parser.Yylex.yylex(Yylex.java:610) ~[stormjar.jar:?]
>  at org.json.simple.parser.JSONParser.nextToken(JSONParser.java:269) 
> ~[stormjar.jar:?]
>  at org.json.simple.parser.JSONParser.parse(JSONParser.java:118) 
> ~[stormjar.jar:?]
>  at org.json.simple.parser.JSONParser.parse(JSONParser.java:81) 
> ~[stormjar.jar:?]
>  at org.json.simple.parser.JSONParser.parse(JSONParser.java:75) 
> ~[stormjar.jar:?]
>  at 
> org.apache.metron.common.message.JSONFromPosition.get(JSONFromPosition.java:47)
>  ~[stormjar.jar:?]
>  at 
> org.apache.metron.common.message.JSONFromPosition.get(JSONFromPosition.java:25)
>  ~[stormjar.jar:?]
>  at 
> org.apache.metron.writer.bolt.BulkMessageWriterBolt.execute(BulkMessageWriterBolt.java:237)
>  ~[stormjar.jar:?]
>  at 
> org.apache.storm.daemon.executor$fn__10195$tuple_action_fn__10197.invoke(executor.clj:735)
>  ~[storm-core-1.1.0.2.6.5.0-292.jar:1.1.0.2.6.5.0-292]
>  at 
> org.apache.storm.daemon.executor$mk_task_receiver$fn__10114.invoke(executor.clj:466)
>  

[jira] [Commented] (METRON-1584) Indexing Topology Crashes with Invalid Message

2018-05-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/METRON-1584?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16494314#comment-16494314
 ] 

ASF GitHub Bot commented on METRON-1584:


Github user cestella commented on a diff in the pull request:

https://github.com/apache/metron/pull/1036#discussion_r191582496
  
--- Diff: 
metron-platform/metron-writer/src/main/java/org/apache/metron/writer/BulkWriterComponent.java
 ---
@@ -115,15 +116,37 @@ public void commit(BulkWriterResponse response) {
   }
 
   public void error(String sensorType, Throwable e, Iterable 
tuples, MessageGetStrategy messageGetStrategy) {
+
+if(!Iterables.isEmpty(tuples)) {
+  LOG.error("Failing tuples; count={}, error={}", 
Iterables.size(tuples), ExceptionUtils.getRootCauseMessage(e));
+}
 tuples.forEach(t -> collector.ack(t));
 MetronError error = new MetronError()
 .withSensorType(sensorType)
 .withErrorType(Constants.ErrorType.INDEXING_ERROR)
 .withThrowable(e);
+tuples.forEach(t -> error.addRawMessage(messageGetStrategy.get(t)));
+ErrorUtils.handleError(collector, error);
+  }
+
+  /**
+   * Error a set of tuples that may not contain a valid message.
+   *
+   * Without a valid message, the source type is unknown.
+   * Without a valid message, the JSON message cannot be added to the 
error.
+   *
+   * @param e The exception that occurred.
+   * @param tuples The tuples to error that may not contain valid messages.
+   */
+  public void error(Throwable e, Iterable tuples) {
+
 if(!Iterables.isEmpty(tuples)) {
-  LOG.error("Failing {} tuples", Iterables.size(tuples), e);
+  LOG.error("Failing tuples; count={}, error={}", 
Iterables.size(tuples), ExceptionUtils.getRootCauseMessage(e));
--- End diff --

Does `ExceptionUtils.getRootCauseMessage()` return the whole stack trace or 
just the root message?  I think we'd like to keep the stack trace here.


> Indexing Topology Crashes with Invalid Message
> --
>
> Key: METRON-1584
> URL: https://issues.apache.org/jira/browse/METRON-1584
> Project: Metron
>  Issue Type: Bug
>Reporter: Nick Allen
>Assignee: Nick Allen
>Priority: Major
>
> Per Mohan Venkateshaiah:
> I published message "adkadknalkda;LK;ad;Da;dD;" to indexing topic , I see 
> that the random access indexing topology worker thread died and couldn't 
> recover until the kafka topic was deleted and recreated.
> {code:java}
> Caused by: java.lang.IllegalStateException: Unable to parse 
> adkadknalkda;LK;ad;Da;dD; due to null
>  at 
> org.apache.metron.common.message.JSONFromPosition.get(JSONFromPosition.java:49)
>  ~[stormjar.jar:?]
>  at 
> org.apache.metron.common.message.JSONFromPosition.get(JSONFromPosition.java:25)
>  ~[stormjar.jar:?]
>  at 
> org.apache.metron.writer.bolt.BulkMessageWriterBolt.execute(BulkMessageWriterBolt.java:237)
>  ~[stormjar.jar:?]
>  at 
> org.apache.storm.daemon.executor$fn__10195$tuple_action_fn__10197.invoke(executor.clj:735)
>  ~[storm-core-1.1.0.2.6.5.0-292.jar:1.1.0.2.6.5.0-292]
>  at 
> org.apache.storm.daemon.executor$mk_task_receiver$fn__10114.invoke(executor.clj:466)
>  ~[storm-core-1.1.0.2.6.5.0-292.jar:1.1.0.2.6.5.0-292]
>  at 
> org.apache.storm.disruptor$clojure_handler$reify__4137.onEvent(disruptor.clj:40)
>  ~[storm-core-1.1.0.2.6.5.0-292.jar:1.1.0.2.6.5.0-292]
>  at 
> org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:472)
>  ~[storm-core-1.1.0.2.6.5.0-292.jar:1.1.0.2.6.5.0-292]
>  ... 6 more
> Caused by: org.json.simple.parser.ParseException
>  at org.json.simple.parser.Yylex.yylex(Yylex.java:610) ~[stormjar.jar:?]
>  at org.json.simple.parser.JSONParser.nextToken(JSONParser.java:269) 
> ~[stormjar.jar:?]
>  at org.json.simple.parser.JSONParser.parse(JSONParser.java:118) 
> ~[stormjar.jar:?]
>  at org.json.simple.parser.JSONParser.parse(JSONParser.java:81) 
> ~[stormjar.jar:?]
>  at org.json.simple.parser.JSONParser.parse(JSONParser.java:75) 
> ~[stormjar.jar:?]
>  at 
> org.apache.metron.common.message.JSONFromPosition.get(JSONFromPosition.java:47)
>  ~[stormjar.jar:?]
>  at 
> org.apache.metron.common.message.JSONFromPosition.get(JSONFromPosition.java:25)
>  ~[stormjar.jar:?]
>  at 
> org.apache.metron.writer.bolt.BulkMessageWriterBolt.execute(BulkMessageWriterBolt.java:237)
>  ~[stormjar.jar:?]
>  at 
> org.apache.storm.daemon.executor$fn__10195$tuple_action_fn__10197.invoke(executor.clj:735)
>  ~[storm-core-1.1.0.2.6.5.0-292.jar:1.1.0.2.6.5.0-292]
>  at 
> org.apache.storm.daemon.executor$mk_task_receiver$fn__10114.invoke(executor.clj:466)
>  ~[storm-core-1.1.0.2.6.5.0-292.jar:1.1.0.2.6.5.0-292]
>  at 
> 

[jira] [Commented] (METRON-1584) Indexing Topology Crashes with Invalid Message

2018-05-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/METRON-1584?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16493980#comment-16493980
 ] 

ASF GitHub Bot commented on METRON-1584:


GitHub user nickwallen opened a pull request:

https://github.com/apache/metron/pull/1036

METRON-1584 Indexing Topology Crashes with Invalid Message

If the indexing topology receives a message containing invalid JSON, the 
topology will crash.  The topology needs to handle these types of errors and 
continue processing.

## Changes

* Updated the `BulkWriterComponent` to allow tuples that do not contain 
valid JSON messages to be error'd out.

* Updated `BulkMessageWriterBolt` to handle the case where a tuple does not 
contain a valid JSON message.

* Updated the BulkMessageWriterBolt unit tests to validate that error'd 
messages are being ack'd and emitted on the correct error stream.  Previously a 
mock was being used that hid a bug with the `BulkWriterComponent`.

* Added a unit test for the case where an invalid message is sent as input 
to the indexing topology.

## Testing

1. Spin-up the development environment.

1. Ensure that messages are successfully being indexing by running the 
Service Check and/or validating the Alerts UI.

1. Write an invalid message to the "indexing" topic.

1. Ensure the invalid message is sent to the "error" topic.

1. Ensure the indexing topology did not crash.


## Pull Request Checklist

- [ ] Is there a JIRA ticket associated with this PR? If not one needs to 
be created at [Metron 
Jira](https://issues.apache.org/jira/browse/METRON/?selectedTab=com.atlassian.jira.jira-projects-plugin:summary-panel).
- [ ] Does your PR title start with METRON- where  is the JIRA 
number you are trying to resolve? Pay particular attention to the hyphen "-" 
character.
- [ ] Has your PR been rebased against the latest commit within the target 
branch (typically master)?
- [ ] Have you included steps to reproduce the behavior or problem that is 
being changed or addressed?
- [ ] Have you included steps or a guide to how the change may be verified 
and tested manually?
- [ ] Have you ensured that the full suite of tests and checks have been 
executed in the root metron folder via:
- [ ] Have you written or updated unit tests and or integration tests to 
verify your changes?
- [ ] If adding new dependencies to the code, are these dependencies 
licensed in a way that is compatible for inclusion under [ASF 
2.0](http://www.apache.org/legal/resolved.html#category-a)?
- [ ] Have you verified the basic functionality of the build by building 
and running locally with Vagrant full-dev environment or the equivalent?


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/nickwallen/metron METRON-1584

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/metron/pull/1036.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1036


commit 765e05264b378a0a7608092e5842a31239418ae5
Author: Nick Allen 
Date:   2018-05-29T17:49:31Z

METRON-1584 Indexing Topology Crashes with Invalid Message




> Indexing Topology Crashes with Invalid Message
> --
>
> Key: METRON-1584
> URL: https://issues.apache.org/jira/browse/METRON-1584
> Project: Metron
>  Issue Type: Bug
>Reporter: Nick Allen
>Assignee: Nick Allen
>Priority: Major
>
> Per Mohan Venkateshaiah:
> I published message "adkadknalkda;LK;ad;Da;dD;" to indexing topic , I see 
> that the random access indexing topology worker thread died and couldn't 
> recover until the kafka topic was deleted and recreated.
> {code:java}
> Caused by: java.lang.IllegalStateException: Unable to parse 
> adkadknalkda;LK;ad;Da;dD; due to null
>  at 
> org.apache.metron.common.message.JSONFromPosition.get(JSONFromPosition.java:49)
>  ~[stormjar.jar:?]
>  at 
> org.apache.metron.common.message.JSONFromPosition.get(JSONFromPosition.java:25)
>  ~[stormjar.jar:?]
>  at 
> org.apache.metron.writer.bolt.BulkMessageWriterBolt.execute(BulkMessageWriterBolt.java:237)
>  ~[stormjar.jar:?]
>  at 
> org.apache.storm.daemon.executor$fn__10195$tuple_action_fn__10197.invoke(executor.clj:735)
>  ~[storm-core-1.1.0.2.6.5.0-292.jar:1.1.0.2.6.5.0-292]
>  at 
> org.apache.storm.daemon.executor$mk_task_receiver$fn__10114.invoke(executor.clj:466)
>  ~[storm-core-1.1.0.2.6.5.0-292.jar:1.1.0.2.6.5.0-292]
>  at 
> org.apache.storm.disruptor$clojure_handler$reify__4137.onEvent(disruptor.clj:40)
>  ~[storm-core-1.1.0.2.6.5.0-292.jar:1.1.0.2.6.5.0-292]
>  at 
>