Yes, looks more fine when you remove debug log or introduce local variable
to refer splitLea(event). :)
And if splitLea() can throw any Throwable, you may want to handle it with
try-catch and in finally call collector's ack or fail.

Happy to help.

Jungtaek Lim (HeartSaVioR)

2015-05-12 16:58 GMT+09:00 Bas van de Lustgraaf <[email protected]>:

> Hi,
>
> Thanks for your reply. I've checked the page you suggested. Based on that
> I've changed the execute function. Below the new version, can you confirm
> if I've implemented the emit and ack function the right way?
>
>     public void execute(Tuple tuple) {
>         String event = tuple.getString(0);
>
>         System.out.println("##### event #####");
>         System.out.println(event);
>         System.out.println("#####       #####");
>
>         System.out.println("##### parse #####");
>         *System.out.println(new Values(splitLea(event)));*
>         System.out.println("#####       #####");
>
>         // Emit tuple to next bolt
>         _collector.emit(tuple, new Values(splitLea(event)));
>
>         // Ack tuple to confirm processing
>         _collector.ack(tuple);
>     }
>
> The out come of the println displayed in bold is: [{action=monitor,
> loc=49, time=2015-04-29 00:02:19, orig=10.26.107.214, i/f_dir=inbound,
> i/f_name=eth2}]
>
> So you were right about the serializable LinkedHashMap!
>
> Any other suggestion about the code?
>
> Regards,
>
> Bas
>
>
> 2015-05-12 0:09 GMT+02:00 임정택 <[email protected]>:
>
>> Hi.
>>
>> Seems like you're confusing emit and ack.
>> Ack is for guaranteeing message processing, not for sending something new.
>> http://storm.apache.org/documentation/Guaranteeing-message-processing.html
>>
>> So when you converted sth. and want to send to next bolt, use emit.
>> LinkedHashMap is serializable so there would be no issue to include it
>> into Values.
>>
>> Hope this helps.
>>
>> Regards.
>> Jungtaek Lim (HeartSaVioR)
>>
>>
>>
>> 2015-05-12 1:40 GMT+09:00 Bas van de Lustgraaf <[email protected]>
>> :
>>
>>> ---------- Doorgestuurd bericht ----------
>>> Van: "Bas van de Lustgraaf" <[email protected]>
>>> Datum: 11 mei 2015 16:46
>>> Onderwerp: Converting LinkedHashMap to a Tuple
>>> Aan: <[email protected]>
>>> Cc:
>>>
>>> Hello,
>>>
>>> I've created a custom bolt for parsing a specific log event format.
>>> After writing the logic in a simple java class for testing, I have
>>> converted this into a bolt.
>>>
>>> So far the bolt emits the tuple (just a long String), Splits the log
>>> event into an key/value pair  using a (LinkedHashMap to preserve the order
>>> of the fields and prints each k/v pair for debugging.
>>>
>>> The next step is to ack the Tuple back to the next bolt. But I have no
>>> idea how to convert the LinkedHashMap to a Tuple.
>>>
>>> What is the best approach here? Keeping in mind that the information
>>> should be written to Hive in the end.
>>>
>>> Below the code
>>>
>>> ###
>>> ### Start Code ###
>>> ###
>>>
>>> package storm.os.bolt;
>>>
>>> import java.util.LinkedHashMap;
>>> import java.util.Map;
>>>
>>> import org.apache.storm.hdfs.bolt.HdfsBolt;
>>>
>>> import org.slf4j.Logger;
>>> import org.slf4j.LoggerFactory;
>>>
>>> import backtype.storm.task.OutputCollector;
>>> import backtype.storm.task.TopologyContext;
>>> import backtype.storm.topology.OutputFieldsDeclarer;
>>> import backtype.storm.topology.base.BaseRichBolt;
>>> import backtype.storm.tuple.Fields;
>>> import backtype.storm.tuple.Tuple;
>>> import backtype.storm.tuple.Values;
>>>
>>> public class LeaParsingBolt extends BaseRichBolt {
>>>     private static final Logger LOG =
>>> LoggerFactory.getLogger(LeaParsingBolt.class);
>>>     OutputCollector _collector;
>>>
>>>     public static Map<String, String> splitLea(Object input) {
>>>         // String lea = "loc=49|time=2015-04-29
>>> 00:02:19|action=monitor|orig=192.168.40.40|i/f_dir=inbound|i/f_name=eth2|has_accounting=0|uuid=<00000000,00000000,00000000,00000000>|product=SmartDefense|__policy_id_tag=product=VPN-1
>>> &
>>> FireWall-1[db_tag={1580F896-C9E4-FF49-88AE-F5099E3E13B2};mgmt=Management_Server;date=1430221728;policy_name=Standard]|src=1.1.1.1|s_port=42041|dst=10.10.10.10|service=257|proto=tcp|attack=Geo-location
>>> enforcement|Attack Info=Geo-location inbound enforcement";
>>>         String lea = input.toString();
>>>
>>>         // LinkedHashMap to preserve order of the keys
>>>         Map<String, String> leaEventMap = new LinkedHashMap<String,
>>> String>();
>>>         leaEventMap.put("action", null);
>>>         leaEventMap.put("loc", null);
>>>         leaEventMap.put("time", null);
>>>         leaEventMap.put("orig", null);
>>>         leaEventMap.put("i/f_dir", null);
>>>         leaEventMap.put("i/f_name", null);
>>>
>>>         // Split lea string
>>>         String[] leaStringSplit = lea.split("\\|");
>>>
>>>         for( int i = 0; i < leaStringSplit.length; i++) {
>>>             // Split lea field
>>>             String[] leaFieldSplit = leaStringSplit[i].split("=");
>>>
>>>             // Skip fields
>>>             if( leaFieldSplit[0].equals("__policy_id_tag")) {
>>>                 continue;
>>>             }
>>>
>>>             // If key exists, add value to key in the map
>>>             if(leaEventMap.containsKey(leaFieldSplit[0].toLowerCase()))
>>>                 leaEventMap.put(leaFieldSplit[0].toLowerCase(),
>>> leaFieldSplit[1]);
>>>             else
>>>                 System.out.println("Warning: Missing key, field will be
>>> ignored." + leaFieldSplit[0]);
>>>         }
>>>
>>>         // Temporary print
>>>         System.out.println("Total: " + leaEventMap.size());
>>>         for(String key: leaEventMap.keySet())
>>>             System.out.println(key + ": " + leaEventMap.get(key));
>>>             System.out.println();
>>>
>>>         return leaEventMap;
>>>     }
>>>
>>>     public void prepare(Map stormConf, TopologyContext context,
>>>             OutputCollector collector) {
>>>         LOG.info("Preparing Lea Parsing Bolt...");
>>>
>>>         _collector = collector;
>>>     }
>>>
>>>     public void execute(Tuple tuple) {
>>>         _collector.emit(tuple, new Values(tuple.getString(0)));
>>>
>>>         System.out.println("##########");
>>>         System.out.println(tuple.getString(0));
>>>         System.out.println("##########");
>>>
>>>         // TODO: just running the function and printing the outcome.
>>>         // Some conversion magic should happen to get the
>>> splitLea-outcome acked to the next Bolt.
>>>         splitLea(tuple.getValue(0));
>>>
>>>         _collector.ack(tuple);
>>>     }
>>>
>>>     public void declareOutputFields(OutputFieldsDeclarer declarer) {
>>>         declarer.declare(new Fields("event"));
>>>     }
>>>
>>> }
>>>
>>> ###
>>> ### End Code ###
>>> ###
>>>
>>
>>
>>
>> --
>> Name : 임 정택
>> Blog : http://www.heartsavior.net / http://dev.heartsavior.net
>> Twitter : http://twitter.com/heartsavior
>> LinkedIn : http://www.linkedin.com/in/heartsavior
>>
>
>


-- 
Name : 임 정택
Blog : http://www.heartsavior.net / http://dev.heartsavior.net
Twitter : http://twitter.com/heartsavior
LinkedIn : http://www.linkedin.com/in/heartsavior

Reply via email to