Hi,
For PIG 12.0, I have the following data stored in the AVRO format:
grunt> describe all_contacts;all_contacts: {account_id: long,contact_id:
chararray,sequence_id: int,state: chararray,name: chararray,kind:
chararray,prefix_name: chararray,first_name: chararray,middle_name:
chararray,company_name: chararray,job_title: chararray,source_name:
chararray,source_details: chararray,provider_name: chararray,provider_details:
chararray,created_at: long,create_source: chararray,updated_at:
long,update_source: chararray,accessed_at: long,deleted_at: long,delta:
boolean,contact_fields: {ARRAY_ELEM: (custom_field_id: chararray,value:
chararray,date_value: long)},related_contacts: {ARRAY_ELEM:
(related_contact_id: chararray,kind: chararray,role: chararray,created_at:
long,updated_at: long,deleted_at: long)},contact_channels: {ARRAY_ELEM:
(channel_id: chararray,address: chararray,state: chararray,create_source:
chararray,update_source: chararray,created_at: long,updated_at:
long,deleted_at: long,description: chararray,history: chararray,source_ip:
long,optin_at: long,optout_at: long,confirmed: chararray)},contact_notes:
{ARRAY_ELEM: (note_id: chararray,content: chararray,created_at:
long,updated_at: long)},contact_service_addresses: {ARRAY_ELEM:
(service_address_id: chararray,type: chararray,address: chararray,sort_order:
int,kind: chararray,created_at: long,updated_at: long,create_source:
chararray,update_source: chararray,deleted_at: long)},contact_street_addresses:
{ARRAY_ELEM: (street_address_id: chararray,kind: chararray,street:
chararray,city: chararray,state: chararray,postal_code: chararray,country:
chararray,sort_order: int,created_at: long,create_source: chararray,updated_at:
long,update_source: chararray,deleted_at: long)}}
As you can see, for each row of contact, there is an array of associated
channels, which itself is struct in AVRO, or Tuple in Pig. Same apply to
contact_service_address.And you can see there are "address" field in both
contact_channels and contact_service_addresses tuples. Now my requirement is to
compare the address from contact_channels to contact_service_addresses, if
matched, count this channel as one; otherwise, ignore this channel.
I tried different way in PIG for this request, but now I kind of facing some
problems. The big one is how to keep the Tuple together, and only compare one
subfield to another tuple's subfield.
I am using the following example to describe what I did, and what is my current
problem.
-- load all the contactsall_contacts = LOAD 'data' USING
org.apache.pig.piggybank.storage.avro.AvroStorage();
-- generate contact, channels and service_addresses with needed
fieldsnon_deleted_channels_contacts = FOREACH all_contacts { csa_addresses =
FOREACH contact_service_addresses GENERATE address; channels = FOREACH
contact_channels GENERATE channel_id, address, deleted_at, state;
non_deleted_channels = FILTER channels BY state != 'D'; GENERATE account_id,
contact_id, deleted_at as contact_deleted_at, csa_addresses,
non_deleted_channels;};
-- now I kind of not sure how to do it, here is what I
tried:match_address_non_deleted_channels_contacts = FOREACH
non_deleted_channels_contacts GENERATE account_id, contact_id,
contact_deleted_at, FLATTEN(csa_addresses.address) as csa_address,
FLATTEN(non_deleted_channels.address) as cc_address,
FLATTEN(non_deleted_channels.channel_id) as cc_channel_id;
I don't want to flatten the data, but I cannot do it in nested FOREACH, and
FOREACH can only do 2 levels nested in PIG. But if I want to compare the
address of channel to the address of service_address, I have to use 3 level
FOREACH, as following:FOREACH contacts { FOREACH service_addresses {
FOREACH channels { } }}
which is not supported in PIG 12. So I gave up this way.
But if I use FLATTEN, I have to FLATTEN both (channels.address) and
(channels.channel_id), which expands the data not the way I want.For example,
if one contact has 4 service_address, and 2 channels, what I assume after
FLATTEN it will be 8 rows for this contact, as (4 x 2 = 8). But what pig gives
me back is 16 rows, as (4 service_address.address x 2 channels.address x 2
channels.channel_id = 16 rows).The pig treats the address and channel_id from
Channel tuple as separated elements in the flatten operation, which makes the
rows grow way too fast. If I need additional fields out from the channel tuple,
it just explode too much unnecessary. But I don't know how to flatten the array
of tuple together, as right now I can only flatten one field of tuple
individually.
If I do this:
match_address_non_deleted_channels_contacts = FOREACH
non_deleted_channels_contacts GENERATE account_id, contact_id,
contact_deleted_at, FLATTEN(csa_addresses.address) as csa_address,
FLATTEN(non_deleted_channels) as channels;
It will give me error:2014-04-17 15:02:16,384 [main] ERROR
org.apache.pig.tools.grunt.Grunt - ERROR 1031: Incompatable schema: left is
"channels:NULL", right is
"non_deleted_channels::channel_id:chararray,non_deleted_channels::address:chararray,non_deleted_channels::deleted_at:long,non_deleted_channels::state:chararray"
Is there a way to flatten the array of tuple out as a whole tuple, so I can use
individual fields later, instead of flattening individual fields out, make the
rows explode wrong?
I google this case, only suggestion I can find looks like is to NOT flatten the
array, but then in my case, I cannot compare the field from one tuple to
another field in another tuple.
I hope I explain my problem clearly. Any suggestion is appreciated.
Thanks
Yong