Pablo,
For your first question what you want to do is called a projection of your
"grouped" relation. Something like this should work:
grouped = foreach (group cleaned by (timestamp, sensor_name, sig_generator,
sig_id)) generate flatten(group) as (timestamp, sensor_name, sig_generator,
sig_id);
the flatten(...) is to take your group key (a tuple with four fields) and
flatten it into a single tuple in the grouped relation. What I don't understand
is why you'd want to do it that way and not simply do a projection and a
distinct instead (since you're not doing anything with the grouped data;
counting, summing, other operations on the json data for a single sensor and
timestamp, etc). That distinct would look like:
distinct_data = distinct (foreach cleaned generate timestamp..sig_id);
Finally, for the last question, pig has a little known (and therefor little
used) storefunc called "MultiStorage". If you want to store separate files, one
per timestamp-sensor_name-sig_generator-sig_id then I'd first make a new field
using pig's CONCAT function to use as the file name since storing files with
parenthesis is probably a bad idea:
with_filename = foreach distinct_data generate CONCAT(timestamp, CONCAT('-',
CONCAT(sensor_name, CONCAT('-', CONCAT(sig_generator, CONCAT('-', sig_id))))))
as filename, timestamp..sig_id;
store with_filename into '/data' using
org.apache.pig.piggybank.storage.MultiStorage('/data', '0');
where '/data' is the directory on hdfs where the files will go and 0 is the
numerical index of the field to use as the file name, called filename in this
case. I would warn you though, this will open many streams at once and could
ddos your hadoop data nodes if the number of unique filenames is large.
--jacob
@thedatachef
On Jul 30, 2013, at 5:21 AM, Pablo Nebrera wrote:
> Hello
>
> I have this pig script:
>
> register '/path_to_jars/elephant-bird-pig-3.0.7.jar';
> register '/path_to_jars/json-simple-1.1.1.jar';
> register '/path_to_jars/redBorder-pig.jar';
>
> data = load '/data/events/2013/07/29/16h03/part-00001.gz' using
> com.twitter.elephantbird.pig.load.JsonLoader() as (json: map[]);
> cleaned = foreach data generate json#'timestamp'/3600*3600 as timestamp,
> (chararray) json#'sensor_name' as sensor_name, (int) json#'sig_generator'
> as sig_generator, (int) json#'sig_id' as sig_id, json as data;
> grouped = GROUP cleaned BY (timestamp, sensor_name, sig_generator, sig_id);
>
>
>
>
>
> The input file is json file something like:
>
> {"timestamp":1374820560, "sensor_id":2, "sensor_name":"sensor-produccion",
> "sig_generator":1, "sig_id":402, "rev":11, "priority":3,
> "classification":"Misc activity", "msg":"Snort Alert [1:402:11]",
> "payload":"XXXXXXXXX", "proto":"icmp", "proto_id":1, "src":3232287141,
> "src_str":"192.168.201.165", "src_name":"192.168.201.165", "src_net":"
> 0.0.0.0/0", "src_net_name":"0.0.0.0/0", "dst_name":"192.168.201.254",
> "dst_str":"192.168.201.254", "dst_net":"0.0.0.0/0", "dst_net_name":"
> 0.0.0.0/0", "src_country":"N/A", "dst_country":"N/A",
> "src_country_code":"N/A", "dst_country_code":"N/A", "srcport":0,
> "dst":3232287230, "dstport":0, "ethsrc":"0:25:90:56:91:2d",
> "ethdst":"6c:62:6d:42:46:c3", "ethlength":594, "vlan":201,
> "vlan_name":"interna", "vlan_priority":0, "vlan_drop":0, "ttl":64,
> "tos":192, "id":53186, "dgmlen":576, "iplen":65544, "icmptype":3,
> "icmpcode":3, "icmpid":0, "icmpseq":0}
> {"timestamp":1374820618, "sensor_id":2, "sensor_id_snort":0,
> "sensor_name":"sensor-produccion", "sig_generator":1, "sig_id":402,
> "rev":11, "priority":3, "classification":"Misc activity", "msg":"Snort
> Alert [1:402:11]", "payload":"XXXXX2", "proto":"icmp", "proto_id":1,
> "src":3232261121, "src_str":"192.168.100.1", "src_name":"192.168.100.1",
> "src_net":"0.0.0.0/0", "src_net_name":"0.0.0.0/0",
> "dst_name":"192.168.100.125", "dst_str":"192.168.100.125", "dst_net":"
> 0.0.0.0/0", "dst_net_name":"0.0.0.0/0", "src_country":"N/A",
> "dst_country":"N/A", "src_country_code":"N/A", "dst_country_code":"N/A",
> "srcport":0, "dst":3232261245, "dstport":0, "ethsrc":"6c:62:6d:42:46:c3",
> "ethdst":"0:1e:c9:ef:85:fd", "ethlength":105, "vlan":100,
> "vlan_name":"100", "vlan_priority":0, "vlan_drop":0, "ttl":64, "tos":192,
> "id":30974, "dgmlen":87, "iplen":89088, "icmptype":3, "icmpcode":3,
> "icmpid":0, "icmpseq":0}
>
>
> The describe of grouped variable is:
>
> grunt> describe grouped
> 2013-07-30 10:11:58,834 [main] WARN org.apache.pig.PigServer - Encountered
> Warning IMPLICIT_CAST_TO_INT 1 time(s).
> grouped: {group: (timestamp: int,sensor_name: chararray,sig_generator:
> int,sig_id: int),cleaned: {(timestamp: int,sensor_name:
> chararray,sig_generator: int,sig_id: int,data: map[])}}
>
>
>
> And a dump example is:
>
> ((1374818400,sensor-produccion,1,402),{(1374818400,sensor-produccion,1,402,[dst_country_code#N/A,rev#11,sig_id#402,proto_id#1,src_net_name#
> 0.0.0.0/0,ethlength#105,payload#45003bd9d5400401117dc0a8647dc0a864141403502745aa4b2e1001000000d726564426f7264657244454c4c00101,dst#3232261245,dstport#0,timestamp#1374820435,sensor_id_snort#0,id#30968,vlan_name#100,tos#192,src_net#0.0.0.0/0,priority#3,src_name#192.168.100.1,dgmlen#87,ethsrc#6c:62:6d:42:46:c3,src#3232261121,icmpcode#3,src_str#192.168.100.1,srcport#0,sensor_id#2,dst_net#0.0.0.0/0,ttl#64,msg#SnortAlert
> [1:402:11],proto#icmp,vlan_priority#0,dst_country#N/A,dst_name#192.168.100.125,dst_net_name#
> 0.0.0.0/0,ethdst#0:1e:c9:ef:85:fd,iplen#89088,src_country_code#N/A,sensor_name#sensor-produccion,sig_generator#1,dst_str#192.168.100.125,class......})
>
>
> I have some questions:
>
> 1.- When I do the the group by I would like to get an entry with the first
> entry only. Somethink like:
>
> ((1374818400,sensor-produccion,1,402), { only the first
> tuple match the groupby })
>
> 2.- I would like to store this information in multiple files. Something
> like:
>
> /data/(1374818400,sensor-produccion,1,402)/data.gz
>
>
> How could I do this ?
>
> Thanks
>
>
>
>
> Pablo Nebrera