Again, thank you! I've modified the code, implemented some checks and the
fail().
The suggestion making SplitLea() throwable including the try/catch - should
the try/catch surround `Map<String, String> eventParsed = splitLea(event);`?
public void execute(Tuple tuple) {
String event = tuple.getString(0);
// TODO add try/catch
Map<String, String> eventParsed = splitLea(event);
if(Collections.frequency(eventParsed.values(), null) ==
eventParsed.size()) {
LOG.warn("Tuple is empty and will be dropped");
_collector.fail(tuple);
} else {
// Emit tuple to next bolt
_collector.emit(tuple, new Values(eventParsed));
}
// Ack tuple to confirm processing
_collector.ack(tuple);
}
2015-05-13 1:31 GMT+02:00 임정택 <[email protected]>:
> Yes, looks more fine when you remove debug log or introduce local variable
> to refer splitLea(event). :)
> And if splitLea() can throw any Throwable, you may want to handle it with
> try-catch and in finally call collector's ack or fail.
>
> Happy to help.
>
> Jungtaek Lim (HeartSaVioR)
>
> 2015-05-12 16:58 GMT+09:00 Bas van de Lustgraaf <[email protected]>
> :
>
>> Hi,
>>
>> Thanks for your reply. I've checked the page you suggested. Based on that
>> I've changed the execute function. Below the new version, can you confirm
>> if I've implemented the emit and ack function the right way?
>>
>> public void execute(Tuple tuple) {
>> String event = tuple.getString(0);
>>
>> System.out.println("##### event #####");
>> System.out.println(event);
>> System.out.println("##### #####");
>>
>> System.out.println("##### parse #####");
>> *System.out.println(new Values(splitLea(event)));*
>> System.out.println("##### #####");
>>
>> // Emit tuple to next bolt
>> _collector.emit(tuple, new Values(splitLea(event)));
>>
>> // Ack tuple to confirm processing
>> _collector.ack(tuple);
>> }
>>
>> The out come of the println displayed in bold is: [{action=monitor,
>> loc=49, time=2015-04-29 00:02:19, orig=10.26.107.214, i/f_dir=inbound,
>> i/f_name=eth2}]
>>
>> So you were right about the serializable LinkedHashMap!
>>
>> Any other suggestion about the code?
>>
>> Regards,
>>
>> Bas
>>
>>
>> 2015-05-12 0:09 GMT+02:00 임정택 <[email protected]>:
>>
>>> Hi.
>>>
>>> Seems like you're confusing emit and ack.
>>> Ack is for guaranteeing message processing, not for sending something
>>> new.
>>>
>>> http://storm.apache.org/documentation/Guaranteeing-message-processing.html
>>>
>>> So when you converted sth. and want to send to next bolt, use emit.
>>> LinkedHashMap is serializable so there would be no issue to include it
>>> into Values.
>>>
>>> Hope this helps.
>>>
>>> Regards.
>>> Jungtaek Lim (HeartSaVioR)
>>>
>>>
>>>
>>> 2015-05-12 1:40 GMT+09:00 Bas van de Lustgraaf <[email protected]
>>> >:
>>>
>>>> ---------- Doorgestuurd bericht ----------
>>>> Van: "Bas van de Lustgraaf" <[email protected]>
>>>> Datum: 11 mei 2015 16:46
>>>> Onderwerp: Converting LinkedHashMap to a Tuple
>>>> Aan: <[email protected]>
>>>> Cc:
>>>>
>>>> Hello,
>>>>
>>>> I've created a custom bolt for parsing a specific log event format.
>>>> After writing the logic in a simple java class for testing, I have
>>>> converted this into a bolt.
>>>>
>>>> So far the bolt emits the tuple (just a long String), Splits the log
>>>> event into an key/value pair using a (LinkedHashMap to preserve the order
>>>> of the fields and prints each k/v pair for debugging.
>>>>
>>>> The next step is to ack the Tuple back to the next bolt. But I have no
>>>> idea how to convert the LinkedHashMap to a Tuple.
>>>>
>>>> What is the best approach here? Keeping in mind that the information
>>>> should be written to Hive in the end.
>>>>
>>>> Below the code
>>>>
>>>> ###
>>>> ### Start Code ###
>>>> ###
>>>>
>>>> package storm.os.bolt;
>>>>
>>>> import java.util.LinkedHashMap;
>>>> import java.util.Map;
>>>>
>>>> import org.apache.storm.hdfs.bolt.HdfsBolt;
>>>>
>>>> import org.slf4j.Logger;
>>>> import org.slf4j.LoggerFactory;
>>>>
>>>> import backtype.storm.task.OutputCollector;
>>>> import backtype.storm.task.TopologyContext;
>>>> import backtype.storm.topology.OutputFieldsDeclarer;
>>>> import backtype.storm.topology.base.BaseRichBolt;
>>>> import backtype.storm.tuple.Fields;
>>>> import backtype.storm.tuple.Tuple;
>>>> import backtype.storm.tuple.Values;
>>>>
>>>> public class LeaParsingBolt extends BaseRichBolt {
>>>> private static final Logger LOG =
>>>> LoggerFactory.getLogger(LeaParsingBolt.class);
>>>> OutputCollector _collector;
>>>>
>>>> public static Map<String, String> splitLea(Object input) {
>>>> // String lea = "loc=49|time=2015-04-29
>>>> 00:02:19|action=monitor|orig=192.168.40.40|i/f_dir=inbound|i/f_name=eth2|has_accounting=0|uuid=<00000000,00000000,00000000,00000000>|product=SmartDefense|__policy_id_tag=product=VPN-1
>>>> &
>>>> FireWall-1[db_tag={1580F896-C9E4-FF49-88AE-F5099E3E13B2};mgmt=Management_Server;date=1430221728;policy_name=Standard]|src=1.1.1.1|s_port=42041|dst=10.10.10.10|service=257|proto=tcp|attack=Geo-location
>>>> enforcement|Attack Info=Geo-location inbound enforcement";
>>>> String lea = input.toString();
>>>>
>>>> // LinkedHashMap to preserve order of the keys
>>>> Map<String, String> leaEventMap = new LinkedHashMap<String,
>>>> String>();
>>>> leaEventMap.put("action", null);
>>>> leaEventMap.put("loc", null);
>>>> leaEventMap.put("time", null);
>>>> leaEventMap.put("orig", null);
>>>> leaEventMap.put("i/f_dir", null);
>>>> leaEventMap.put("i/f_name", null);
>>>>
>>>> // Split lea string
>>>> String[] leaStringSplit = lea.split("\\|");
>>>>
>>>> for( int i = 0; i < leaStringSplit.length; i++) {
>>>> // Split lea field
>>>> String[] leaFieldSplit = leaStringSplit[i].split("=");
>>>>
>>>> // Skip fields
>>>> if( leaFieldSplit[0].equals("__policy_id_tag")) {
>>>> continue;
>>>> }
>>>>
>>>> // If key exists, add value to key in the map
>>>> if(leaEventMap.containsKey(leaFieldSplit[0].toLowerCase()))
>>>> leaEventMap.put(leaFieldSplit[0].toLowerCase(),
>>>> leaFieldSplit[1]);
>>>> else
>>>> System.out.println("Warning: Missing key, field will be
>>>> ignored." + leaFieldSplit[0]);
>>>> }
>>>>
>>>> // Temporary print
>>>> System.out.println("Total: " + leaEventMap.size());
>>>> for(String key: leaEventMap.keySet())
>>>> System.out.println(key + ": " + leaEventMap.get(key));
>>>> System.out.println();
>>>>
>>>> return leaEventMap;
>>>> }
>>>>
>>>> public void prepare(Map stormConf, TopologyContext context,
>>>> OutputCollector collector) {
>>>> LOG.info("Preparing Lea Parsing Bolt...");
>>>>
>>>> _collector = collector;
>>>> }
>>>>
>>>> public void execute(Tuple tuple) {
>>>> _collector.emit(tuple, new Values(tuple.getString(0)));
>>>>
>>>> System.out.println("##########");
>>>> System.out.println(tuple.getString(0));
>>>> System.out.println("##########");
>>>>
>>>> // TODO: just running the function and printing the outcome.
>>>> // Some conversion magic should happen to get the
>>>> splitLea-outcome acked to the next Bolt.
>>>> splitLea(tuple.getValue(0));
>>>>
>>>> _collector.ack(tuple);
>>>> }
>>>>
>>>> public void declareOutputFields(OutputFieldsDeclarer declarer) {
>>>> declarer.declare(new Fields("event"));
>>>> }
>>>>
>>>> }
>>>>
>>>> ###
>>>> ### End Code ###
>>>> ###
>>>>
>>>
>>>
>>>
>>> --
>>> Name : 임 정택
>>> Blog : http://www.heartsavior.net / http://dev.heartsavior.net
>>> Twitter : http://twitter.com/heartsavior
>>> LinkedIn : http://www.linkedin.com/in/heartsavior
>>>
>>
>>
>
>
> --
> Name : 임 정택
> Blog : http://www.heartsavior.net / http://dev.heartsavior.net
> Twitter : http://twitter.com/heartsavior
> LinkedIn : http://www.linkedin.com/in/heartsavior
>