I am going to try to write the generic input Record into a new avro schema
that has some new elements + the original input record as json string. Then
I will dedup the records based on grouping key and then invoke another fn
that will reconstruct the original Avro schema based on the Json value and
finally write that using FSDataOutputStream. I think that should work but
it is bit hacky

On Wed, Feb 24, 2016 at 7:52 AM, Josh Wills <[email protected]> wrote:

> In theory, any PType that supports GenericRecord will work- even a dummy
> one that defines a schema that isn't the same as the one you're using.
>
> I don't recommend doing that, of course, but it will work.
>
> On Wed, Feb 24, 2016 at 12:18 AM Marcin Michalski <[email protected]>
> wrote:
>
>> Hi, is there an easy way to pass GenericData.Record between Fns in
>> crunch without specifically stating the schema? Since I want to pass
>> multiple avro files that have various schemas as input to a single DoFn
>> which will enhance the data into a Pair and later I want to do an
>> aggregation (deduping) Fn on that data but don't want to specify the Schema
>> in between (I just want to work with GenericData.Record instances. Here is
>> an example
>>
>> PCollection<Record> messages =
>> pipeline.read(From.avroFile("/events/*/20160223/"));
>>
>> // I don't want pass the schema instance but rather just work with
>> GenericData.Record, is that possible? Or do I need to store use Avros.bytes
>> instead and then reconstruct the Record later in the next Fn?
>> messages.parallellDo(new EventEnhancerDoFn(),
>> Avros.generics(messageSchema)).groupByKey...
>>
>>
>> Thanks,
>> Marcin
>>
>>


-- 
Marcin Michalski | Big Data Engineer
[email protected] <[email protected]> | (917) 478-9422 (c)
<http://www.ifwe.co/>
Tagged, Inc. is now if(we). Learn more at ifwe.co

Reply via email to