---------- Doorgestuurd bericht ----------
Van: "Bas van de Lustgraaf" <[email protected]>
Datum: 11 mei 2015 16:46
Onderwerp: Converting LinkedHashMap to a Tuple
Aan: <[email protected]>
Cc:
Hello,
I've created a custom bolt for parsing a specific log event format. After
writing the logic in a simple java class for testing, I have converted this
into a bolt.
So far the bolt emits the tuple (just a long String), Splits the log event
into an key/value pair using a (LinkedHashMap to preserve the order of the
fields and prints each k/v pair for debugging.
The next step is to ack the Tuple back to the next bolt. But I have no idea
how to convert the LinkedHashMap to a Tuple.
What is the best approach here? Keeping in mind that the information should
be written to Hive in the end.
Below the code
###
### Start Code ###
###
package storm.os.bolt;
import java.util.LinkedHashMap;
import java.util.Map;
import org.apache.storm.hdfs.bolt.HdfsBolt;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
public class LeaParsingBolt extends BaseRichBolt {
private static final Logger LOG =
LoggerFactory.getLogger(LeaParsingBolt.class);
OutputCollector _collector;
public static Map<String, String> splitLea(Object input) {
// String lea = "loc=49|time=2015-04-29
00:02:19|action=monitor|orig=192.168.40.40|i/f_dir=inbound|i/f_name=eth2|has_accounting=0|uuid=<00000000,00000000,00000000,00000000>|product=SmartDefense|__policy_id_tag=product=VPN-1
&
FireWall-1[db_tag={1580F896-C9E4-FF49-88AE-F5099E3E13B2};mgmt=Management_Server;date=1430221728;policy_name=Standard]|src=1.1.1.1|s_port=42041|dst=10.10.10.10|service=257|proto=tcp|attack=Geo-location
enforcement|Attack Info=Geo-location inbound enforcement";
String lea = input.toString();
// LinkedHashMap to preserve order of the keys
Map<String, String> leaEventMap = new LinkedHashMap<String,
String>();
leaEventMap.put("action", null);
leaEventMap.put("loc", null);
leaEventMap.put("time", null);
leaEventMap.put("orig", null);
leaEventMap.put("i/f_dir", null);
leaEventMap.put("i/f_name", null);
// Split lea string
String[] leaStringSplit = lea.split("\\|");
for( int i = 0; i < leaStringSplit.length; i++) {
// Split lea field
String[] leaFieldSplit = leaStringSplit[i].split("=");
// Skip fields
if( leaFieldSplit[0].equals("__policy_id_tag")) {
continue;
}
// If key exists, add value to key in the map
if(leaEventMap.containsKey(leaFieldSplit[0].toLowerCase()))
leaEventMap.put(leaFieldSplit[0].toLowerCase(),
leaFieldSplit[1]);
else
System.out.println("Warning: Missing key, field will be
ignored." + leaFieldSplit[0]);
}
// Temporary print
System.out.println("Total: " + leaEventMap.size());
for(String key: leaEventMap.keySet())
System.out.println(key + ": " + leaEventMap.get(key));
System.out.println();
return leaEventMap;
}
public void prepare(Map stormConf, TopologyContext context,
OutputCollector collector) {
LOG.info("Preparing Lea Parsing Bolt...");
_collector = collector;
}
public void execute(Tuple tuple) {
_collector.emit(tuple, new Values(tuple.getString(0)));
System.out.println("##########");
System.out.println(tuple.getString(0));
System.out.println("##########");
// TODO: just running the function and printing the outcome.
// Some conversion magic should happen to get the splitLea-outcome
acked to the next Bolt.
splitLea(tuple.getValue(0));
_collector.ack(tuple);
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("event"));
}
}
###
### End Code ###
###