Pablo,

For your first question what you want to do is called a projection of your 
"grouped" relation. Something like this should work:

grouped = foreach (group cleaned by (timestamp, sensor_name, sig_generator, 
sig_id)) generate flatten(group) as (timestamp, sensor_name, sig_generator, 
sig_id);

the flatten(...) is to take your group key (a tuple with four fields) and 
flatten it into a single tuple in the grouped relation. What I don't understand 
is why you'd want to do it that way and not simply do a projection and a 
distinct instead (since you're not doing anything with the grouped data; 
counting, summing, other operations on the json data for a single sensor and 
timestamp, etc). That distinct would look like:

distinct_data = distinct (foreach cleaned generate timestamp..sig_id);


Finally, for the last question, pig has a little known (and therefor little 
used) storefunc called "MultiStorage". If you want to store separate files, one 
per timestamp-sensor_name-sig_generator-sig_id then I'd first make a new field 
using pig's CONCAT function to use as the file name since storing files with 
parenthesis is probably a bad idea:

with_filename = foreach distinct_data generate CONCAT(timestamp, CONCAT('-', 
CONCAT(sensor_name, CONCAT('-', CONCAT(sig_generator, CONCAT('-', sig_id)))))) 
as filename, timestamp..sig_id;

store with_filename into '/data' using 
org.apache.pig.piggybank.storage.MultiStorage('/data', '0');

where '/data' is the directory on hdfs where the files will go and 0 is the 
numerical index of the field to use as the file name, called filename in this 
case. I would warn you though, this will open many streams at once and could 
ddos your hadoop data nodes if the number of unique filenames is large.

--jacob
@thedatachef

On Jul 30, 2013, at 5:21 AM, Pablo Nebrera wrote:

> Hello
> 
> I have this pig script:
> 
> register '/path_to_jars/elephant-bird-pig-3.0.7.jar';
> register '/path_to_jars/json-simple-1.1.1.jar';
> register '/path_to_jars/redBorder-pig.jar';
> 
> data = load '/data/events/2013/07/29/16h03/part-00001.gz' using
> com.twitter.elephantbird.pig.load.JsonLoader() as (json: map[]);
> cleaned = foreach data generate json#'timestamp'/3600*3600 as timestamp,
> (chararray) json#'sensor_name' as sensor_name, (int) json#'sig_generator'
> as sig_generator, (int) json#'sig_id' as sig_id, json as data;
> grouped = GROUP cleaned BY (timestamp, sensor_name, sig_generator, sig_id);
> 
> 
> 
> 
> 
> The input file is json file something like:
> 
> {"timestamp":1374820560, "sensor_id":2, "sensor_name":"sensor-produccion",
> "sig_generator":1, "sig_id":402, "rev":11, "priority":3,
> "classification":"Misc activity", "msg":"Snort Alert [1:402:11]",
> "payload":"XXXXXXXXX", "proto":"icmp", "proto_id":1, "src":3232287141,
> "src_str":"192.168.201.165", "src_name":"192.168.201.165", "src_net":"
> 0.0.0.0/0", "src_net_name":"0.0.0.0/0", "dst_name":"192.168.201.254",
> "dst_str":"192.168.201.254", "dst_net":"0.0.0.0/0", "dst_net_name":"
> 0.0.0.0/0", "src_country":"N/A", "dst_country":"N/A",
> "src_country_code":"N/A", "dst_country_code":"N/A", "srcport":0,
> "dst":3232287230, "dstport":0, "ethsrc":"0:25:90:56:91:2d",
> "ethdst":"6c:62:6d:42:46:c3", "ethlength":594, "vlan":201,
> "vlan_name":"interna", "vlan_priority":0, "vlan_drop":0, "ttl":64,
> "tos":192, "id":53186, "dgmlen":576, "iplen":65544, "icmptype":3,
> "icmpcode":3, "icmpid":0, "icmpseq":0}
> {"timestamp":1374820618, "sensor_id":2, "sensor_id_snort":0,
> "sensor_name":"sensor-produccion", "sig_generator":1, "sig_id":402,
> "rev":11, "priority":3, "classification":"Misc activity", "msg":"Snort
> Alert [1:402:11]", "payload":"XXXXX2", "proto":"icmp", "proto_id":1,
> "src":3232261121, "src_str":"192.168.100.1", "src_name":"192.168.100.1",
> "src_net":"0.0.0.0/0", "src_net_name":"0.0.0.0/0",
> "dst_name":"192.168.100.125", "dst_str":"192.168.100.125", "dst_net":"
> 0.0.0.0/0", "dst_net_name":"0.0.0.0/0", "src_country":"N/A",
> "dst_country":"N/A", "src_country_code":"N/A", "dst_country_code":"N/A",
> "srcport":0, "dst":3232261245, "dstport":0, "ethsrc":"6c:62:6d:42:46:c3",
> "ethdst":"0:1e:c9:ef:85:fd", "ethlength":105, "vlan":100,
> "vlan_name":"100", "vlan_priority":0, "vlan_drop":0, "ttl":64, "tos":192,
> "id":30974, "dgmlen":87, "iplen":89088, "icmptype":3, "icmpcode":3,
> "icmpid":0, "icmpseq":0}
> 
> 
> The describe of grouped variable is:
> 
> grunt> describe grouped
> 2013-07-30 10:11:58,834 [main] WARN  org.apache.pig.PigServer - Encountered
> Warning IMPLICIT_CAST_TO_INT 1 time(s).
> grouped: {group: (timestamp: int,sensor_name: chararray,sig_generator:
> int,sig_id: int),cleaned: {(timestamp: int,sensor_name:
> chararray,sig_generator: int,sig_id: int,data: map[])}}
> 
> 
> 
> And a dump example is:
> 
> ((1374818400,sensor-produccion,1,402),{(1374818400,sensor-produccion,1,402,[dst_country_code#N/A,rev#11,sig_id#402,proto_id#1,src_net_name#
> 0.0.0.0/0,ethlength#105,payload#45003bd9d5400401117dc0a8647dc0a864141403502745aa4b2e1001000000d726564426f7264657244454c4c00101,dst#3232261245,dstport#0,timestamp#1374820435,sensor_id_snort#0,id#30968,vlan_name#100,tos#192,src_net#0.0.0.0/0,priority#3,src_name#192.168.100.1,dgmlen#87,ethsrc#6c:62:6d:42:46:c3,src#3232261121,icmpcode#3,src_str#192.168.100.1,srcport#0,sensor_id#2,dst_net#0.0.0.0/0,ttl#64,msg#SnortAlert
> [1:402:11],proto#icmp,vlan_priority#0,dst_country#N/A,dst_name#192.168.100.125,dst_net_name#
> 0.0.0.0/0,ethdst#0:1e:c9:ef:85:fd,iplen#89088,src_country_code#N/A,sensor_name#sensor-produccion,sig_generator#1,dst_str#192.168.100.125,class......})
> 
> 
> I have some questions:
> 
> 1.- When I do the the group by I would like to get an entry with the first
> entry only. Somethink like:
> 
>                  ((1374818400,sensor-produccion,1,402), { only the first
> tuple match the groupby })
> 
> 2.- I would like to store this information in multiple files. Something
> like:
> 
>                   /data/(1374818400,sensor-produccion,1,402)/data.gz
> 
> 
> How could I do this ?
> 
> Thanks
> 
> 
> 
> 
> Pablo Nebrera

Reply via email to