Hi.

Seems like you're confusing emit and ack.
Ack is for guaranteeing message processing, not for sending something new.
http://storm.apache.org/documentation/Guaranteeing-message-processing.html

So when you converted sth. and want to send to next bolt, use emit.
LinkedHashMap is serializable so there would be no issue to include it into
Values.

Hope this helps.

Regards.
Jungtaek Lim (HeartSaVioR)



2015-05-12 1:40 GMT+09:00 Bas van de Lustgraaf <[email protected]>:

> ---------- Doorgestuurd bericht ----------
> Van: "Bas van de Lustgraaf" <[email protected]>
> Datum: 11 mei 2015 16:46
> Onderwerp: Converting LinkedHashMap to a Tuple
> Aan: <[email protected]>
> Cc:
>
> Hello,
>
> I've created a custom bolt for parsing a specific log event format. After
> writing the logic in a simple java class for testing, I have converted this
> into a bolt.
>
> So far the bolt emits the tuple (just a long String), Splits the log event
> into an key/value pair  using a (LinkedHashMap to preserve the order of the
> fields and prints each k/v pair for debugging.
>
> The next step is to ack the Tuple back to the next bolt. But I have no
> idea how to convert the LinkedHashMap to a Tuple.
>
> What is the best approach here? Keeping in mind that the information
> should be written to Hive in the end.
>
> Below the code
>
> ###
> ### Start Code ###
> ###
>
> package storm.os.bolt;
>
> import java.util.LinkedHashMap;
> import java.util.Map;
>
> import org.apache.storm.hdfs.bolt.HdfsBolt;
>
> import org.slf4j.Logger;
> import org.slf4j.LoggerFactory;
>
> import backtype.storm.task.OutputCollector;
> import backtype.storm.task.TopologyContext;
> import backtype.storm.topology.OutputFieldsDeclarer;
> import backtype.storm.topology.base.BaseRichBolt;
> import backtype.storm.tuple.Fields;
> import backtype.storm.tuple.Tuple;
> import backtype.storm.tuple.Values;
>
> public class LeaParsingBolt extends BaseRichBolt {
>     private static final Logger LOG =
> LoggerFactory.getLogger(LeaParsingBolt.class);
>     OutputCollector _collector;
>
>     public static Map<String, String> splitLea(Object input) {
>         // String lea = "loc=49|time=2015-04-29
> 00:02:19|action=monitor|orig=192.168.40.40|i/f_dir=inbound|i/f_name=eth2|has_accounting=0|uuid=<00000000,00000000,00000000,00000000>|product=SmartDefense|__policy_id_tag=product=VPN-1
> &
> FireWall-1[db_tag={1580F896-C9E4-FF49-88AE-F5099E3E13B2};mgmt=Management_Server;date=1430221728;policy_name=Standard]|src=1.1.1.1|s_port=42041|dst=10.10.10.10|service=257|proto=tcp|attack=Geo-location
> enforcement|Attack Info=Geo-location inbound enforcement";
>         String lea = input.toString();
>
>         // LinkedHashMap to preserve order of the keys
>         Map<String, String> leaEventMap = new LinkedHashMap<String,
> String>();
>         leaEventMap.put("action", null);
>         leaEventMap.put("loc", null);
>         leaEventMap.put("time", null);
>         leaEventMap.put("orig", null);
>         leaEventMap.put("i/f_dir", null);
>         leaEventMap.put("i/f_name", null);
>
>         // Split lea string
>         String[] leaStringSplit = lea.split("\\|");
>
>         for( int i = 0; i < leaStringSplit.length; i++) {
>             // Split lea field
>             String[] leaFieldSplit = leaStringSplit[i].split("=");
>
>             // Skip fields
>             if( leaFieldSplit[0].equals("__policy_id_tag")) {
>                 continue;
>             }
>
>             // If key exists, add value to key in the map
>             if(leaEventMap.containsKey(leaFieldSplit[0].toLowerCase()))
>                 leaEventMap.put(leaFieldSplit[0].toLowerCase(),
> leaFieldSplit[1]);
>             else
>                 System.out.println("Warning: Missing key, field will be
> ignored." + leaFieldSplit[0]);
>         }
>
>         // Temporary print
>         System.out.println("Total: " + leaEventMap.size());
>         for(String key: leaEventMap.keySet())
>             System.out.println(key + ": " + leaEventMap.get(key));
>             System.out.println();
>
>         return leaEventMap;
>     }
>
>     public void prepare(Map stormConf, TopologyContext context,
>             OutputCollector collector) {
>         LOG.info("Preparing Lea Parsing Bolt...");
>
>         _collector = collector;
>     }
>
>     public void execute(Tuple tuple) {
>         _collector.emit(tuple, new Values(tuple.getString(0)));
>
>         System.out.println("##########");
>         System.out.println(tuple.getString(0));
>         System.out.println("##########");
>
>         // TODO: just running the function and printing the outcome.
>         // Some conversion magic should happen to get the splitLea-outcome
> acked to the next Bolt.
>         splitLea(tuple.getValue(0));
>
>         _collector.ack(tuple);
>     }
>
>     public void declareOutputFields(OutputFieldsDeclarer declarer) {
>         declarer.declare(new Fields("event"));
>     }
>
> }
>
> ###
> ### End Code ###
> ###
>



-- 
Name : 임 정택
Blog : http://www.heartsavior.net / http://dev.heartsavior.net
Twitter : http://twitter.com/heartsavior
LinkedIn : http://www.linkedin.com/in/heartsavior

Reply via email to