---------- Doorgestuurd bericht ----------
Van: "Bas van de Lustgraaf" <[email protected]>
Datum: 11 mei 2015 16:46
Onderwerp: Converting LinkedHashMap to a Tuple
Aan: <[email protected]>
Cc:

Hello,

I've created a custom bolt for parsing a specific log event format. After
writing the logic in a simple java class for testing, I have converted this
into a bolt.

So far the bolt emits the tuple (just a long String), Splits the log event
into an key/value pair  using a (LinkedHashMap to preserve the order of the
fields and prints each k/v pair for debugging.

The next step is to ack the Tuple back to the next bolt. But I have no idea
how to convert the LinkedHashMap to a Tuple.

What is the best approach here? Keeping in mind that the information should
be written to Hive in the end.

Below the code

###
### Start Code ###
###

package storm.os.bolt;

import java.util.LinkedHashMap;
import java.util.Map;

import org.apache.storm.hdfs.bolt.HdfsBolt;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;

public class LeaParsingBolt extends BaseRichBolt {
    private static final Logger LOG =
LoggerFactory.getLogger(LeaParsingBolt.class);
    OutputCollector _collector;

    public static Map<String, String> splitLea(Object input) {
        // String lea = "loc=49|time=2015-04-29
00:02:19|action=monitor|orig=192.168.40.40|i/f_dir=inbound|i/f_name=eth2|has_accounting=0|uuid=<00000000,00000000,00000000,00000000>|product=SmartDefense|__policy_id_tag=product=VPN-1
&
FireWall-1[db_tag={1580F896-C9E4-FF49-88AE-F5099E3E13B2};mgmt=Management_Server;date=1430221728;policy_name=Standard]|src=1.1.1.1|s_port=42041|dst=10.10.10.10|service=257|proto=tcp|attack=Geo-location
enforcement|Attack Info=Geo-location inbound enforcement";
        String lea = input.toString();

        // LinkedHashMap to preserve order of the keys
        Map<String, String> leaEventMap = new LinkedHashMap<String,
String>();
        leaEventMap.put("action", null);
        leaEventMap.put("loc", null);
        leaEventMap.put("time", null);
        leaEventMap.put("orig", null);
        leaEventMap.put("i/f_dir", null);
        leaEventMap.put("i/f_name", null);

        // Split lea string
        String[] leaStringSplit = lea.split("\\|");

        for( int i = 0; i < leaStringSplit.length; i++) {
            // Split lea field
            String[] leaFieldSplit = leaStringSplit[i].split("=");

            // Skip fields
            if( leaFieldSplit[0].equals("__policy_id_tag")) {
                continue;
            }

            // If key exists, add value to key in the map
            if(leaEventMap.containsKey(leaFieldSplit[0].toLowerCase()))
                leaEventMap.put(leaFieldSplit[0].toLowerCase(),
leaFieldSplit[1]);
            else
                System.out.println("Warning: Missing key, field will be
ignored." + leaFieldSplit[0]);
        }

        // Temporary print
        System.out.println("Total: " + leaEventMap.size());
        for(String key: leaEventMap.keySet())
            System.out.println(key + ": " + leaEventMap.get(key));
            System.out.println();

        return leaEventMap;
    }

    public void prepare(Map stormConf, TopologyContext context,
            OutputCollector collector) {
        LOG.info("Preparing Lea Parsing Bolt...");

        _collector = collector;
    }

    public void execute(Tuple tuple) {
        _collector.emit(tuple, new Values(tuple.getString(0)));

        System.out.println("##########");
        System.out.println(tuple.getString(0));
        System.out.println("##########");

        // TODO: just running the function and printing the outcome.
        // Some conversion magic should happen to get the splitLea-outcome
acked to the next Bolt.
        splitLea(tuple.getValue(0));

        _collector.ack(tuple);
    }

    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("event"));
    }

}

###
### End Code ###
###

Reply via email to