Hi, 
For PIG 12.0, I have the following data stored in the AVRO format:
grunt> describe all_contacts;all_contacts: {account_id: long,contact_id: 
chararray,sequence_id: int,state: chararray,name: chararray,kind: 
chararray,prefix_name: chararray,first_name: chararray,middle_name: 
chararray,company_name: chararray,job_title: chararray,source_name: 
chararray,source_details: chararray,provider_name: chararray,provider_details: 
chararray,created_at: long,create_source: chararray,updated_at: 
long,update_source: chararray,accessed_at: long,deleted_at: long,delta: 
boolean,contact_fields: {ARRAY_ELEM: (custom_field_id: chararray,value: 
chararray,date_value: long)},related_contacts: {ARRAY_ELEM: 
(related_contact_id: chararray,kind: chararray,role: chararray,created_at: 
long,updated_at: long,deleted_at: long)},contact_channels: {ARRAY_ELEM: 
(channel_id: chararray,address: chararray,state: chararray,create_source: 
chararray,update_source: chararray,created_at: long,updated_at: 
long,deleted_at: long,description: chararray,history: chararray,source_ip: 
long,optin_at: long,optout_at: long,confirmed: chararray)},contact_notes: 
{ARRAY_ELEM: (note_id: chararray,content: chararray,created_at: 
long,updated_at: long)},contact_service_addresses: {ARRAY_ELEM: 
(service_address_id: chararray,type: chararray,address: chararray,sort_order: 
int,kind: chararray,created_at: long,updated_at: long,create_source: 
chararray,update_source: chararray,deleted_at: long)},contact_street_addresses: 
{ARRAY_ELEM: (street_address_id: chararray,kind: chararray,street: 
chararray,city: chararray,state: chararray,postal_code: chararray,country: 
chararray,sort_order: int,created_at: long,create_source: chararray,updated_at: 
long,update_source: chararray,deleted_at: long)}}
As you can see, for each row of contact, there is an array of associated 
channels, which itself is struct in AVRO, or Tuple in Pig. Same apply to 
contact_service_address.And you can see there are "address" field in both 
contact_channels and contact_service_addresses tuples. Now my requirement is to 
compare the address from contact_channels to contact_service_addresses, if 
matched, count this channel as one; otherwise, ignore this channel.
I tried different way in PIG for this request, but now I kind of facing some 
problems. The big one is how to keep the Tuple together, and only compare one 
subfield to another tuple's subfield.
I am using the following example to describe what I did, and what is my current 
problem.
-- load all the contactsall_contacts = LOAD 'data' USING 
org.apache.pig.piggybank.storage.avro.AvroStorage();
-- generate contact, channels and service_addresses with needed 
fieldsnon_deleted_channels_contacts = FOREACH all_contacts {    csa_addresses = 
FOREACH contact_service_addresses GENERATE address;    channels = FOREACH 
contact_channels GENERATE channel_id, address, deleted_at, state;    
non_deleted_channels = FILTER channels BY state != 'D';    GENERATE account_id, 
contact_id, deleted_at as contact_deleted_at, csa_addresses, 
non_deleted_channels;};
-- now I kind of not sure how to do it, here is what I 
tried:match_address_non_deleted_channels_contacts = FOREACH 
non_deleted_channels_contacts     GENERATE account_id, contact_id, 
contact_deleted_at, FLATTEN(csa_addresses.address) as csa_address, 
FLATTEN(non_deleted_channels.address) as cc_address,    
FLATTEN(non_deleted_channels.channel_id) as cc_channel_id;
I don't want to flatten the data, but I cannot do it in nested FOREACH, and 
FOREACH can only do 2 levels nested in PIG. But if I want to compare the 
address of channel to the address of service_address, I have to use 3 level 
FOREACH, as following:FOREACH contacts {    FOREACH service_addresses {        
FOREACH channels {        }    }}
which is not supported in PIG 12. So I gave up this way.
But if I use FLATTEN, I have to FLATTEN both (channels.address) and 
(channels.channel_id), which expands the data not the way I want.For example, 
if one contact has 4 service_address, and 2 channels, what I assume after 
FLATTEN it will be 8 rows for this contact, as (4 x 2 = 8). But what pig gives 
me back is 16 rows, as (4 service_address.address x 2 channels.address x 2 
channels.channel_id = 16 rows).The pig treats the address and channel_id from 
Channel tuple as separated elements in the flatten operation, which makes the 
rows grow way too fast. If I need additional fields out from the channel tuple, 
it just explode too much unnecessary. But I don't know how to flatten the array 
of tuple together, as right now I can only flatten one field of tuple 
individually.
If I do this:
match_address_non_deleted_channels_contacts = FOREACH 
non_deleted_channels_contacts     GENERATE account_id, contact_id, 
contact_deleted_at, FLATTEN(csa_addresses.address) as csa_address, 
FLATTEN(non_deleted_channels) as channels;
It will give me error:2014-04-17 15:02:16,384 [main] ERROR 
org.apache.pig.tools.grunt.Grunt - ERROR 1031: Incompatable schema: left is 
"channels:NULL", right is 
"non_deleted_channels::channel_id:chararray,non_deleted_channels::address:chararray,non_deleted_channels::deleted_at:long,non_deleted_channels::state:chararray"
Is there a way to flatten the array of tuple out as a whole tuple, so I can use 
individual fields later, instead of flattening individual fields out, make the 
rows explode wrong?
I google this case, only suggestion I can find looks like is to NOT flatten the 
array, but then in my case, I cannot compare the field from one tuple to 
another field in another tuple.
I hope I explain my problem clearly. Any suggestion is appreciated.
Thanks
Yong

                                          

Reply via email to