Oh you are running this in Spark. Even the newest version of Spark Core
(3.0.1) has dependency to Avro 1.8.2. And the copy constructor that Ryan
was talking about was added in Avro 1.9. Although I don't understand how this

List<Schema.Field> clonedFields = existingFields.stream()
     .map(f -> new Schema.Field(f, f.schema()))
     .collect(Collectors.toList());

throws NPE since it shouldn't even compile, because such constructor
shouldn't exist.

-Mika 

On Sep 22 2020, at 7:59 am, Colin Williams
<[email protected]> wrote:

> *I tried your copy also and got the same UnresolvedUnionException*
> 
> On Mon, Sep 21, 2020 at 9:57 PM Colin Williams
> <[email protected]> wrote:
>> 
>> Hi, Mika
>> 
>> Sorry I wasn't more clear in my previous emails. For the time since
>> after my first email because I was getting a NPE when working with the
>> Schema, I was able to fetch the Schema elsewhere and apply it with
>> 
>> final GenericRecord clonedRecord = new GenericData.Record(SCHEMA);
>> 
>> Then I put aside that particular issue for the time being.
>> 
>> 
>> Then I provided part of the related exception when attempting to write
>> the copy. Unfortunately I can't currently provide the schema but maybe
>> I will figure out a way to provide a reproduction eventually.
>> 
>> E.G. Throws UnresolvedUnionException ```Caused by:
>> org.apache.avro.UnresolvedUnionException: Not in union
>> ["null",{"type":"record","name"...
>> at org.apache.avro.generic.GenericData.resolveUnion(GenericData.java:853)
>> at 
>> org.apache.avro.generic.GenericDatumWriter.resolveUnion(GenericDatumWriter.java:249)
>> at 
>> org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:142)
>> at 
>> org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:82)
>> at 
>> org.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter.java:206)
>> at 
>> org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:195)
>> at 
>> org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:130)
>> at 
>> org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:82)
>> at 
>> org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:72)
>> ```
>> 
>> I tried your copy also and got an NPE. I thought that it might be
>> somewhat obvious from the exception message and details what is
>> causing the exception.
>> 
>> It might be that I can't do this within a Map operation in a
>> distributed platform like Spark?
>> 
>> 
>> Thanks and Best Regards,
>> 
>> Colin
>> 
>> 
>> 
>> On Mon, Sep 21, 2020 at 6:22 AM Mika Ristimaki
>> <[email protected]> wrote:
>> >
>> > Hi Colin,
>> >
>> > You are omitting some details from your mail (such as the actual schema)
>> > but I am suspecting in your first email you tried to do something
>> like this
>> >
>> >  Schema SCHEMA = SchemaBuilder
>> >                 .record("test")
>> >                 .namespace("foo")
>> >                 .fields()
>> >                     .name("foo").type().stringType().noDefault()
>> >                     .name("bar").type().optional().stringType()
>> >                 .endRecord();
>> >
>> > GenericRecord castRecord = new GenericData.Record(SCHEMA);
>> > castRecord.put("foo", "foo value");
>> > castRecord.put("bar", "bar value");
>> >
>> > Schema clonedSchema = Schema.createRecord("othertest",
>> >                 "",
>> >                 "bar",
>> >                 false,
>> >                 SCHEMA.getFields());
>> >
>> > And this didn't work because as Ryan explained schema fields cannot be
>> > reused. So he suggested that the fields should be copied first.
>> >
>> > List<Schema.Field> clonedFields = SCHEMA.getFields().stream()
>> >                 .map(f -> new Schema.Field(f, f.schema()))
>> >                 .collect(Collectors.toList());
>> >
>> > I tested it and it works fine. You should use it like this:
>> >
>> > Schema SCHEMA = SchemaBuilder
>> >         .record("test")
>> >         .namespace("foo")
>> >         .fields()
>> >             .name("foo").type().stringType().noDefault()
>> >             .name("bar").type().optional().stringType()
>> >         .endRecord();
>> >
>> > GenericRecord castRecord = new GenericData.Record(SCHEMA);
>> > castRecord.put("foo", "foo value");
>> > castRecord.put("bar", "bar value");
>> >
>> > List<Schema.Field> clonedFields = SCHEMA.getFields().stream()
>> >         .map(f -> new Schema.Field(f, f.schema()))
>> >         .collect(Collectors.toList());
>> > Schema clonedSchema = Schema.createRecord("othertest",
>> >         "",
>> >         "bar",
>> >         false,
>> >         clonedFields);
>> >
>> > final GenericRecord clonedRecord = new GenericData.Record(clonedSchema);
>> > for (int i = 0; i < clonedFields.size(); i++) {
>> >     clonedRecord.put(i, castRecord.get(i));
>> > }
>> >
>> > GenericDatumWriter<GenericRecord> writer = new
>> >         GenericDatumWriter<GenericRecord>(SCHEMA);
>> > ByteArrayOutputStream clonedRecordOutputStream = new 
>> > ByteArrayOutputStream();
>> > Encoder binaryEncoder =
>> >        
>> EncoderFactory.get().binaryEncoder(clonedRecordOutputStream, null);
>> > writer.write(clonedRecord, binaryEncoder);
>> >
>> > Also for more robustness against reordered fields in clonedSchema, you
>> > should probably do the data copying something like this:
>> >
>> > for (Schema.Field field : castRecord.getSchema().getFields()) {
>> >     clonedRecord.put(field.name(), castRecord.get(field.name()));
>> > }
>> >
>> >
>> > On Sep 20 2020, at 7:33 am, Colin Williams
>> > <[email protected]> wrote:
>> >
>> > > I found a way to get my record schema so I'm no longer throwing the
>> > > same exception. I am now able to put the record indexes for the cloned
>> > > record.
>> > >
>> > > GenericRecord castRecord = (GenericRecord) input.getPayloadObject();
>> > > final GenericRecord clonedRecord = new GenericData.Record(SCHEMA);
>> > > clonedRecord.getSchema().getFields();
>> > > int length = SCHEMA.getFields().size();
>> > > for (int i = 0; i < length; i++) {
>> > >    clonedRecord.put(i, castRecord.get(i));
>> > > }
>> > >
>> > > GenericDatumWriter<GenericRecord> writer = new
>> > > GenericDatumWriter<GenericRecord>(SCHEMA);
>> > > ByteArrayOutputStream clonedRecordOutputStream = new 
>> > > ByteArrayOutputStream();
>> > > Encoder binaryEncoder =
>> > > EncoderFactory.get().binaryEncoder(clonedRecordOutputStream, null);
>> > > writer.write(clonedRecord, binaryEncoder);
>> > >
>> > > ^^ Throws UnresolvedUnionException ```Caused by:
>> > > org.apache.avro.UnresolvedUnionException: Not in union
>> > > ["null",{"type":"record","name"```
>> > >
>> > > The schema is identical between the records, with the exception
>> of the
>> > > name and namespace. Then I don't understand why I'm getting the
>> > > exception. I can write with the castRecord and it's schema. Is
>> this a
>> > > byte alignment issue caused by the name and namespace? I didn't see
>> > > them in the record indexes.
>> > >
>> > >
>> > > On Sat, Sep 19, 2020 at 3:04 PM Colin Williams
>> > > <[email protected]> wrote:
>> > >>
>> > >> I gave it a shot but from a MapFunction that supposed to be
>> > >> serializable. I got an NPE.
>> > >>
>> > >> I assume I can't create a new Schema.Field from within that
>> > >> MapFunction . I didn't seem to have trouble accessing the existing
>> > >> schema fields.
>> > >>
>> > >> > List<Schema.Field> clonedFields = existingFields.stream()
>> > >>         .map(f -> new Schema.Field(f, f.schema()))
>> > >>         .collect(Collectors.toList());
>> > >>
>> > >> On Fri, Sep 18, 2020 at 7:09 PM Colin Williams
>> > >> <[email protected]> wrote:
>> > >> >
>> > >> > Hi Ryan, Thanks for your explanation. I am thinking now that the
>> > >> > design of AVRO suggests that data and schemas are very planned things.
>> > >> > That changes are planned through versioning and we don't like
>> > >> > duplicated schemas (when the positioning makes sense).
>> > >> >
>> > >> > I have a round about way of learning. Sometimes I am working
>> with data
>> > >> > and I think it's convenient to transform my data
>> programmatically and
>> > >> > try to obtain a schema from that. Also I think that schemas
>> can become
>> > >> > cumbersome when many fields are involved in intricate patterns.
>> > >> >
>> > >> > I think maybe there are other forms maybe more well suited for that.
>> > >> >
>> > >> > Regarding your proposals 1,2 seem reasonable to me. But
>> someone like
>> > >> > myself might also not fully understand the design of AVRO.
>> > >> > A better exception or some kind of lead for armchair
>> programmers to
>> > >> > better understand the exception. Thanks for mentioning the copy
>> > >> > operation.
>> > >> >
>> > >> > Finally I do see something about aliases.
>> > >> >
>> > >> > Thanks,
>> > >> >
>> > >> > Colin
>> > >> >
>> > >> >
>> > >> >
>> > >> > On Fri, Sep 18, 2020 at 5:32 AM Ryan Skraba <[email protected]> wrote:
>> > >> > >
>> > >> > > Hello Colin, you've hit one bit of fussiness with the Java
>> SDK... you
>> > >> > > can't reuse a Schema.Field object in two Records, because a field
>> > >> > > knows its own position in the record[1].  If a field were to
>> > >> belong to
>> > >> > > two records at different positions, this method would have an
>> > >> > > ambiguous response.
>> > >> > >
>> > >> > > As a workaround, since Avro 1.9, there's a copy constructor
>> that you
>> > >> > > can use to clone the field:
>> > >> > >
>> > >> > > List<Schema.Field> clonedFields = existingFields.stream()
>> > >> > >         .map(f -> new Schema.Field(f, f.schema()))
>> > >> > >         .collect(Collectors.toList());
>> > >> > >
>> > >> > > That being said, I don't see any reason we MUST throw an exception.
>> > >> > > There's a couple of alternative strategies we could use in
>> the Java
>> > >> > > SDK:
>> > >> > >
>> > >> > > 1. If the position is the same in both records, allow the field
>> > >> to be
>> > >> > > reused (which enables cloning use cases).
>> > >> > >
>> > >> > > 2. Make a copy of the field to reuse internally if the
>> position is
>> > >> > > already set (probably OK, since it's supposed to be immutable).
>> > >> > >
>> > >> > > 3. Allow the field to be reused, only throw the exception
>> only if
>> > >> > > someone calls the position() method later.
>> > >> > >
>> > >> > > Any of those sound like a useful change for your use case?  Don't
>> > >> > > hesitate to create a JIRA or contribution if you like!
>> > >> > >
>> > >> > > All my best, Ryan
>> > >> > >
>> > >> > > On Fri, Sep 18, 2020 at 8:27 AM Colin Williams
>> > >> > > <[email protected]> wrote:
>> > >> > > >
>> > >> > > > Hello,
>> > >> > > >
>> > >> > > > I'm trying to understand working with Avro records and schemas,
>> > >> > > > programmatically. Then I was first trying to create a new
>> > >> schema and
>> > >> > > > records based on existing records, but with a different
>> name /
>> > >> > > > namespace. It seems then I don't understand getFields() or
>> > >> > > > createRecord(...). Why can't I use the fields obtained from
>> > >> > > > getFields() in createRecord()?  How would I go about this 
>> > >> > > > properly?
>> > >> > > >
>> > >> > > > // for an existing record already present
>> > >> > > > GenericRecord someRecord
>> > >> > > >
>> > >> > > > // get a list of existing fields
>> > >> > > > List<Schema.Field> existingFields = 
>> > >> > > > someRecord.getSchema().getFields();
>> > >> > > >
>> > >> > > > // schema for new record with existing fields
>> > >> > > > Schema updatedSchema = createRecord("UpdatedName",
>> > >> > > > "","avro.com.example.namespace" , false, existingFields);
>> > >> > > >
>> > >> > > > ^^ throws an exception ^^
>> > >> > > >
>> > >> > > > /* Caused by: org.apache.avro.AvroRuntimeException: Field already
>> > >> > > > used: eventMetadata type:UNION pos:0
>> > >> > > > at org.apache.avro.Schema$RecordSchema.setFields(Schema.java:888)
>> > >> > > > at org.apache.avro.Schema$RecordSchema.<init>(Schema.java:856)
>> > >> > > > at org.apache.avro.Schema.createRecord(Schema.java:217)
>> > >> > > > */
>> > >> > > >
>> > >> > > > final int length = fields.size();
>> > >> > > >
>> > >> > > > GenericRecord clonedRecord = new 
>> > >> > > > GenericData.Record(updatedSchema);
>> > >> > > > for (int i = 0; i < length; i++) {
>> > >> > > >     final Schema.Field field = existingFields.get(i);
>> > >> > > >     clonedRecord.put(i, someRecord.get(i));
>> > >> > > > }
>> > >> > > >
>> > >> > > >
>> > >> > > > Best Regards,
>> > >> > > >
>> > >> > > > Colin Williams
>> > >
> 

Reply via email to