Yes, looks more fine when you remove debug log or introduce local variable to refer splitLea(event). :) And if splitLea() can throw any Throwable, you may want to handle it with try-catch and in finally call collector's ack or fail.
Happy to help. Jungtaek Lim (HeartSaVioR) 2015-05-12 16:58 GMT+09:00 Bas van de Lustgraaf <[email protected]>: > Hi, > > Thanks for your reply. I've checked the page you suggested. Based on that > I've changed the execute function. Below the new version, can you confirm > if I've implemented the emit and ack function the right way? > > public void execute(Tuple tuple) { > String event = tuple.getString(0); > > System.out.println("##### event #####"); > System.out.println(event); > System.out.println("##### #####"); > > System.out.println("##### parse #####"); > *System.out.println(new Values(splitLea(event)));* > System.out.println("##### #####"); > > // Emit tuple to next bolt > _collector.emit(tuple, new Values(splitLea(event))); > > // Ack tuple to confirm processing > _collector.ack(tuple); > } > > The out come of the println displayed in bold is: [{action=monitor, > loc=49, time=2015-04-29 00:02:19, orig=10.26.107.214, i/f_dir=inbound, > i/f_name=eth2}] > > So you were right about the serializable LinkedHashMap! > > Any other suggestion about the code? > > Regards, > > Bas > > > 2015-05-12 0:09 GMT+02:00 임정택 <[email protected]>: > >> Hi. >> >> Seems like you're confusing emit and ack. >> Ack is for guaranteeing message processing, not for sending something new. >> http://storm.apache.org/documentation/Guaranteeing-message-processing.html >> >> So when you converted sth. and want to send to next bolt, use emit. >> LinkedHashMap is serializable so there would be no issue to include it >> into Values. >> >> Hope this helps. >> >> Regards. >> Jungtaek Lim (HeartSaVioR) >> >> >> >> 2015-05-12 1:40 GMT+09:00 Bas van de Lustgraaf <[email protected]> >> : >> >>> ---------- Doorgestuurd bericht ---------- >>> Van: "Bas van de Lustgraaf" <[email protected]> >>> Datum: 11 mei 2015 16:46 >>> Onderwerp: Converting LinkedHashMap to a Tuple >>> Aan: <[email protected]> >>> Cc: >>> >>> Hello, >>> >>> I've created a custom bolt for parsing a specific log event format. >>> After writing the logic in a simple java class for testing, I have >>> converted this into a bolt. >>> >>> So far the bolt emits the tuple (just a long String), Splits the log >>> event into an key/value pair using a (LinkedHashMap to preserve the order >>> of the fields and prints each k/v pair for debugging. >>> >>> The next step is to ack the Tuple back to the next bolt. But I have no >>> idea how to convert the LinkedHashMap to a Tuple. >>> >>> What is the best approach here? Keeping in mind that the information >>> should be written to Hive in the end. >>> >>> Below the code >>> >>> ### >>> ### Start Code ### >>> ### >>> >>> package storm.os.bolt; >>> >>> import java.util.LinkedHashMap; >>> import java.util.Map; >>> >>> import org.apache.storm.hdfs.bolt.HdfsBolt; >>> >>> import org.slf4j.Logger; >>> import org.slf4j.LoggerFactory; >>> >>> import backtype.storm.task.OutputCollector; >>> import backtype.storm.task.TopologyContext; >>> import backtype.storm.topology.OutputFieldsDeclarer; >>> import backtype.storm.topology.base.BaseRichBolt; >>> import backtype.storm.tuple.Fields; >>> import backtype.storm.tuple.Tuple; >>> import backtype.storm.tuple.Values; >>> >>> public class LeaParsingBolt extends BaseRichBolt { >>> private static final Logger LOG = >>> LoggerFactory.getLogger(LeaParsingBolt.class); >>> OutputCollector _collector; >>> >>> public static Map<String, String> splitLea(Object input) { >>> // String lea = "loc=49|time=2015-04-29 >>> 00:02:19|action=monitor|orig=192.168.40.40|i/f_dir=inbound|i/f_name=eth2|has_accounting=0|uuid=<00000000,00000000,00000000,00000000>|product=SmartDefense|__policy_id_tag=product=VPN-1 >>> & >>> FireWall-1[db_tag={1580F896-C9E4-FF49-88AE-F5099E3E13B2};mgmt=Management_Server;date=1430221728;policy_name=Standard]|src=1.1.1.1|s_port=42041|dst=10.10.10.10|service=257|proto=tcp|attack=Geo-location >>> enforcement|Attack Info=Geo-location inbound enforcement"; >>> String lea = input.toString(); >>> >>> // LinkedHashMap to preserve order of the keys >>> Map<String, String> leaEventMap = new LinkedHashMap<String, >>> String>(); >>> leaEventMap.put("action", null); >>> leaEventMap.put("loc", null); >>> leaEventMap.put("time", null); >>> leaEventMap.put("orig", null); >>> leaEventMap.put("i/f_dir", null); >>> leaEventMap.put("i/f_name", null); >>> >>> // Split lea string >>> String[] leaStringSplit = lea.split("\\|"); >>> >>> for( int i = 0; i < leaStringSplit.length; i++) { >>> // Split lea field >>> String[] leaFieldSplit = leaStringSplit[i].split("="); >>> >>> // Skip fields >>> if( leaFieldSplit[0].equals("__policy_id_tag")) { >>> continue; >>> } >>> >>> // If key exists, add value to key in the map >>> if(leaEventMap.containsKey(leaFieldSplit[0].toLowerCase())) >>> leaEventMap.put(leaFieldSplit[0].toLowerCase(), >>> leaFieldSplit[1]); >>> else >>> System.out.println("Warning: Missing key, field will be >>> ignored." + leaFieldSplit[0]); >>> } >>> >>> // Temporary print >>> System.out.println("Total: " + leaEventMap.size()); >>> for(String key: leaEventMap.keySet()) >>> System.out.println(key + ": " + leaEventMap.get(key)); >>> System.out.println(); >>> >>> return leaEventMap; >>> } >>> >>> public void prepare(Map stormConf, TopologyContext context, >>> OutputCollector collector) { >>> LOG.info("Preparing Lea Parsing Bolt..."); >>> >>> _collector = collector; >>> } >>> >>> public void execute(Tuple tuple) { >>> _collector.emit(tuple, new Values(tuple.getString(0))); >>> >>> System.out.println("##########"); >>> System.out.println(tuple.getString(0)); >>> System.out.println("##########"); >>> >>> // TODO: just running the function and printing the outcome. >>> // Some conversion magic should happen to get the >>> splitLea-outcome acked to the next Bolt. >>> splitLea(tuple.getValue(0)); >>> >>> _collector.ack(tuple); >>> } >>> >>> public void declareOutputFields(OutputFieldsDeclarer declarer) { >>> declarer.declare(new Fields("event")); >>> } >>> >>> } >>> >>> ### >>> ### End Code ### >>> ### >>> >> >> >> >> -- >> Name : 임 정택 >> Blog : http://www.heartsavior.net / http://dev.heartsavior.net >> Twitter : http://twitter.com/heartsavior >> LinkedIn : http://www.linkedin.com/in/heartsavior >> > > -- Name : 임 정택 Blog : http://www.heartsavior.net / http://dev.heartsavior.net Twitter : http://twitter.com/heartsavior LinkedIn : http://www.linkedin.com/in/heartsavior
