Again, thank you! I've modified the code, implemented some checks and the
fail().

The suggestion making SplitLea() throwable including the try/catch - should
the try/catch surround `Map<String, String> eventParsed = splitLea(event);`?

    public void execute(Tuple tuple) {
        String event = tuple.getString(0);
        // TODO add try/catch
        Map<String, String> eventParsed = splitLea(event);

        if(Collections.frequency(eventParsed.values(), null) ==
eventParsed.size()) {
            LOG.warn("Tuple is empty and will be dropped");
            _collector.fail(tuple);
        } else {
            // Emit tuple to next bolt
            _collector.emit(tuple, new Values(eventParsed));
        }

        // Ack tuple to confirm processing
        _collector.ack(tuple);
    }

2015-05-13 1:31 GMT+02:00 임정택 <[email protected]>:

> Yes, looks more fine when you remove debug log or introduce local variable
> to refer splitLea(event). :)
> And if splitLea() can throw any Throwable, you may want to handle it with
> try-catch and in finally call collector's ack or fail.
>
> Happy to help.
>
> Jungtaek Lim (HeartSaVioR)
>
> 2015-05-12 16:58 GMT+09:00 Bas van de Lustgraaf <[email protected]>
> :
>
>> Hi,
>>
>> Thanks for your reply. I've checked the page you suggested. Based on that
>> I've changed the execute function. Below the new version, can you confirm
>> if I've implemented the emit and ack function the right way?
>>
>>     public void execute(Tuple tuple) {
>>         String event = tuple.getString(0);
>>
>>         System.out.println("##### event #####");
>>         System.out.println(event);
>>         System.out.println("#####       #####");
>>
>>         System.out.println("##### parse #####");
>>         *System.out.println(new Values(splitLea(event)));*
>>         System.out.println("#####       #####");
>>
>>         // Emit tuple to next bolt
>>         _collector.emit(tuple, new Values(splitLea(event)));
>>
>>         // Ack tuple to confirm processing
>>         _collector.ack(tuple);
>>     }
>>
>> The out come of the println displayed in bold is: [{action=monitor,
>> loc=49, time=2015-04-29 00:02:19, orig=10.26.107.214, i/f_dir=inbound,
>> i/f_name=eth2}]
>>
>> So you were right about the serializable LinkedHashMap!
>>
>> Any other suggestion about the code?
>>
>> Regards,
>>
>> Bas
>>
>>
>> 2015-05-12 0:09 GMT+02:00 임정택 <[email protected]>:
>>
>>> Hi.
>>>
>>> Seems like you're confusing emit and ack.
>>> Ack is for guaranteeing message processing, not for sending something
>>> new.
>>>
>>> http://storm.apache.org/documentation/Guaranteeing-message-processing.html
>>>
>>> So when you converted sth. and want to send to next bolt, use emit.
>>> LinkedHashMap is serializable so there would be no issue to include it
>>> into Values.
>>>
>>> Hope this helps.
>>>
>>> Regards.
>>> Jungtaek Lim (HeartSaVioR)
>>>
>>>
>>>
>>> 2015-05-12 1:40 GMT+09:00 Bas van de Lustgraaf <[email protected]
>>> >:
>>>
>>>> ---------- Doorgestuurd bericht ----------
>>>> Van: "Bas van de Lustgraaf" <[email protected]>
>>>> Datum: 11 mei 2015 16:46
>>>> Onderwerp: Converting LinkedHashMap to a Tuple
>>>> Aan: <[email protected]>
>>>> Cc:
>>>>
>>>> Hello,
>>>>
>>>> I've created a custom bolt for parsing a specific log event format.
>>>> After writing the logic in a simple java class for testing, I have
>>>> converted this into a bolt.
>>>>
>>>> So far the bolt emits the tuple (just a long String), Splits the log
>>>> event into an key/value pair  using a (LinkedHashMap to preserve the order
>>>> of the fields and prints each k/v pair for debugging.
>>>>
>>>> The next step is to ack the Tuple back to the next bolt. But I have no
>>>> idea how to convert the LinkedHashMap to a Tuple.
>>>>
>>>> What is the best approach here? Keeping in mind that the information
>>>> should be written to Hive in the end.
>>>>
>>>> Below the code
>>>>
>>>> ###
>>>> ### Start Code ###
>>>> ###
>>>>
>>>> package storm.os.bolt;
>>>>
>>>> import java.util.LinkedHashMap;
>>>> import java.util.Map;
>>>>
>>>> import org.apache.storm.hdfs.bolt.HdfsBolt;
>>>>
>>>> import org.slf4j.Logger;
>>>> import org.slf4j.LoggerFactory;
>>>>
>>>> import backtype.storm.task.OutputCollector;
>>>> import backtype.storm.task.TopologyContext;
>>>> import backtype.storm.topology.OutputFieldsDeclarer;
>>>> import backtype.storm.topology.base.BaseRichBolt;
>>>> import backtype.storm.tuple.Fields;
>>>> import backtype.storm.tuple.Tuple;
>>>> import backtype.storm.tuple.Values;
>>>>
>>>> public class LeaParsingBolt extends BaseRichBolt {
>>>>     private static final Logger LOG =
>>>> LoggerFactory.getLogger(LeaParsingBolt.class);
>>>>     OutputCollector _collector;
>>>>
>>>>     public static Map<String, String> splitLea(Object input) {
>>>>         // String lea = "loc=49|time=2015-04-29
>>>> 00:02:19|action=monitor|orig=192.168.40.40|i/f_dir=inbound|i/f_name=eth2|has_accounting=0|uuid=<00000000,00000000,00000000,00000000>|product=SmartDefense|__policy_id_tag=product=VPN-1
>>>> &
>>>> FireWall-1[db_tag={1580F896-C9E4-FF49-88AE-F5099E3E13B2};mgmt=Management_Server;date=1430221728;policy_name=Standard]|src=1.1.1.1|s_port=42041|dst=10.10.10.10|service=257|proto=tcp|attack=Geo-location
>>>> enforcement|Attack Info=Geo-location inbound enforcement";
>>>>         String lea = input.toString();
>>>>
>>>>         // LinkedHashMap to preserve order of the keys
>>>>         Map<String, String> leaEventMap = new LinkedHashMap<String,
>>>> String>();
>>>>         leaEventMap.put("action", null);
>>>>         leaEventMap.put("loc", null);
>>>>         leaEventMap.put("time", null);
>>>>         leaEventMap.put("orig", null);
>>>>         leaEventMap.put("i/f_dir", null);
>>>>         leaEventMap.put("i/f_name", null);
>>>>
>>>>         // Split lea string
>>>>         String[] leaStringSplit = lea.split("\\|");
>>>>
>>>>         for( int i = 0; i < leaStringSplit.length; i++) {
>>>>             // Split lea field
>>>>             String[] leaFieldSplit = leaStringSplit[i].split("=");
>>>>
>>>>             // Skip fields
>>>>             if( leaFieldSplit[0].equals("__policy_id_tag")) {
>>>>                 continue;
>>>>             }
>>>>
>>>>             // If key exists, add value to key in the map
>>>>             if(leaEventMap.containsKey(leaFieldSplit[0].toLowerCase()))
>>>>                 leaEventMap.put(leaFieldSplit[0].toLowerCase(),
>>>> leaFieldSplit[1]);
>>>>             else
>>>>                 System.out.println("Warning: Missing key, field will be
>>>> ignored." + leaFieldSplit[0]);
>>>>         }
>>>>
>>>>         // Temporary print
>>>>         System.out.println("Total: " + leaEventMap.size());
>>>>         for(String key: leaEventMap.keySet())
>>>>             System.out.println(key + ": " + leaEventMap.get(key));
>>>>             System.out.println();
>>>>
>>>>         return leaEventMap;
>>>>     }
>>>>
>>>>     public void prepare(Map stormConf, TopologyContext context,
>>>>             OutputCollector collector) {
>>>>         LOG.info("Preparing Lea Parsing Bolt...");
>>>>
>>>>         _collector = collector;
>>>>     }
>>>>
>>>>     public void execute(Tuple tuple) {
>>>>         _collector.emit(tuple, new Values(tuple.getString(0)));
>>>>
>>>>         System.out.println("##########");
>>>>         System.out.println(tuple.getString(0));
>>>>         System.out.println("##########");
>>>>
>>>>         // TODO: just running the function and printing the outcome.
>>>>         // Some conversion magic should happen to get the
>>>> splitLea-outcome acked to the next Bolt.
>>>>         splitLea(tuple.getValue(0));
>>>>
>>>>         _collector.ack(tuple);
>>>>     }
>>>>
>>>>     public void declareOutputFields(OutputFieldsDeclarer declarer) {
>>>>         declarer.declare(new Fields("event"));
>>>>     }
>>>>
>>>> }
>>>>
>>>> ###
>>>> ### End Code ###
>>>> ###
>>>>
>>>
>>>
>>>
>>> --
>>> Name : 임 정택
>>> Blog : http://www.heartsavior.net / http://dev.heartsavior.net
>>> Twitter : http://twitter.com/heartsavior
>>> LinkedIn : http://www.linkedin.com/in/heartsavior
>>>
>>
>>
>
>
> --
> Name : 임 정택
> Blog : http://www.heartsavior.net / http://dev.heartsavior.net
> Twitter : http://twitter.com/heartsavior
> LinkedIn : http://www.linkedin.com/in/heartsavior
>

Reply via email to