*I tried your copy also and got the same UnresolvedUnionException*

On Mon, Sep 21, 2020 at 9:57 PM Colin Williams
<[email protected]> wrote:
>
> Hi, Mika
>
> Sorry I wasn't more clear in my previous emails. For the time since
> after my first email because I was getting a NPE when working with the
> Schema, I was able to fetch the Schema elsewhere and apply it with
>
> final GenericRecord clonedRecord = new GenericData.Record(SCHEMA);
>
> Then I put aside that particular issue for the time being.
>
>
> Then I provided part of the related exception when attempting to write
> the copy. Unfortunately I can't currently provide the schema but maybe
> I will figure out a way to provide a reproduction eventually.
>
> E.G. Throws UnresolvedUnionException ```Caused by:
> org.apache.avro.UnresolvedUnionException: Not in union
> ["null",{"type":"record","name"...
> at org.apache.avro.generic.GenericData.resolveUnion(GenericData.java:853)
> at 
> org.apache.avro.generic.GenericDatumWriter.resolveUnion(GenericDatumWriter.java:249)
> at 
> org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:142)
> at 
> org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:82)
> at 
> org.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter.java:206)
> at 
> org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:195)
> at 
> org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:130)
> at 
> org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:82)
> at 
> org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:72)
> ```
>
> I tried your copy also and got an NPE. I thought that it might be
> somewhat obvious from the exception message and details what is
> causing the exception.
>
> It might be that I can't do this within a Map operation in a
> distributed platform like Spark?
>
>
> Thanks and Best Regards,
>
> Colin
>
>
>
> On Mon, Sep 21, 2020 at 6:22 AM Mika Ristimaki <[email protected]> 
> wrote:
> >
> > Hi Colin,
> >
> > You are omitting some details from your mail (such as the actual schema)
> > but I am suspecting in your first email you tried to do something like this
> >
> >  Schema SCHEMA = SchemaBuilder
> >                 .record("test")
> >                 .namespace("foo")
> >                 .fields()
> >                     .name("foo").type().stringType().noDefault()
> >                     .name("bar").type().optional().stringType()
> >                 .endRecord();
> >
> > GenericRecord castRecord = new GenericData.Record(SCHEMA);
> > castRecord.put("foo", "foo value");
> > castRecord.put("bar", "bar value");
> >
> > Schema clonedSchema = Schema.createRecord("othertest",
> >                 "",
> >                 "bar",
> >                 false,
> >                 SCHEMA.getFields());
> >
> > And this didn't work because as Ryan explained schema fields cannot be
> > reused. So he suggested that the fields should be copied first.
> >
> > List<Schema.Field> clonedFields = SCHEMA.getFields().stream()
> >                 .map(f -> new Schema.Field(f, f.schema()))
> >                 .collect(Collectors.toList());
> >
> > I tested it and it works fine. You should use it like this:
> >
> > Schema SCHEMA = SchemaBuilder
> >         .record("test")
> >         .namespace("foo")
> >         .fields()
> >             .name("foo").type().stringType().noDefault()
> >             .name("bar").type().optional().stringType()
> >         .endRecord();
> >
> > GenericRecord castRecord = new GenericData.Record(SCHEMA);
> > castRecord.put("foo", "foo value");
> > castRecord.put("bar", "bar value");
> >
> > List<Schema.Field> clonedFields = SCHEMA.getFields().stream()
> >         .map(f -> new Schema.Field(f, f.schema()))
> >         .collect(Collectors.toList());
> > Schema clonedSchema = Schema.createRecord("othertest",
> >         "",
> >         "bar",
> >         false,
> >         clonedFields);
> >
> > final GenericRecord clonedRecord = new GenericData.Record(clonedSchema);
> > for (int i = 0; i < clonedFields.size(); i++) {
> >     clonedRecord.put(i, castRecord.get(i));
> > }
> >
> > GenericDatumWriter<GenericRecord> writer = new
> >         GenericDatumWriter<GenericRecord>(SCHEMA);
> > ByteArrayOutputStream clonedRecordOutputStream = new 
> > ByteArrayOutputStream();
> > Encoder binaryEncoder =
> >         EncoderFactory.get().binaryEncoder(clonedRecordOutputStream, null);
> > writer.write(clonedRecord, binaryEncoder);
> >
> > Also for more robustness against reordered fields in clonedSchema, you
> > should probably do the data copying something like this:
> >
> > for (Schema.Field field : castRecord.getSchema().getFields()) {
> >     clonedRecord.put(field.name(), castRecord.get(field.name()));
> > }
> >
> >
> > On Sep 20 2020, at 7:33 am, Colin Williams
> > <[email protected]> wrote:
> >
> > > I found a way to get my record schema so I'm no longer throwing the
> > > same exception. I am now able to put the record indexes for the cloned
> > > record.
> > >
> > > GenericRecord castRecord = (GenericRecord) input.getPayloadObject();
> > > final GenericRecord clonedRecord = new GenericData.Record(SCHEMA);
> > > clonedRecord.getSchema().getFields();
> > > int length = SCHEMA.getFields().size();
> > > for (int i = 0; i < length; i++) {
> > >    clonedRecord.put(i, castRecord.get(i));
> > > }
> > >
> > > GenericDatumWriter<GenericRecord> writer = new
> > > GenericDatumWriter<GenericRecord>(SCHEMA);
> > > ByteArrayOutputStream clonedRecordOutputStream = new 
> > > ByteArrayOutputStream();
> > > Encoder binaryEncoder =
> > > EncoderFactory.get().binaryEncoder(clonedRecordOutputStream, null);
> > > writer.write(clonedRecord, binaryEncoder);
> > >
> > > ^^ Throws UnresolvedUnionException ```Caused by:
> > > org.apache.avro.UnresolvedUnionException: Not in union
> > > ["null",{"type":"record","name"```
> > >
> > > The schema is identical between the records, with the exception of the
> > > name and namespace. Then I don't understand why I'm getting the
> > > exception. I can write with the castRecord and it's schema. Is this a
> > > byte alignment issue caused by the name and namespace? I didn't see
> > > them in the record indexes.
> > >
> > >
> > > On Sat, Sep 19, 2020 at 3:04 PM Colin Williams
> > > <[email protected]> wrote:
> > >>
> > >> I gave it a shot but from a MapFunction that supposed to be
> > >> serializable. I got an NPE.
> > >>
> > >> I assume I can't create a new Schema.Field from within that
> > >> MapFunction . I didn't seem to have trouble accessing the existing
> > >> schema fields.
> > >>
> > >> > List<Schema.Field> clonedFields = existingFields.stream()
> > >>         .map(f -> new Schema.Field(f, f.schema()))
> > >>         .collect(Collectors.toList());
> > >>
> > >> On Fri, Sep 18, 2020 at 7:09 PM Colin Williams
> > >> <[email protected]> wrote:
> > >> >
> > >> > Hi Ryan, Thanks for your explanation. I am thinking now that the
> > >> > design of AVRO suggests that data and schemas are very planned things.
> > >> > That changes are planned through versioning and we don't like
> > >> > duplicated schemas (when the positioning makes sense).
> > >> >
> > >> > I have a round about way of learning. Sometimes I am working with data
> > >> > and I think it's convenient to transform my data programmatically and
> > >> > try to obtain a schema from that. Also I think that schemas can become
> > >> > cumbersome when many fields are involved in intricate patterns.
> > >> >
> > >> > I think maybe there are other forms maybe more well suited for that.
> > >> >
> > >> > Regarding your proposals 1,2 seem reasonable to me. But someone like
> > >> > myself might also not fully understand the design of AVRO.
> > >> > A better exception or some kind of lead for armchair programmers to
> > >> > better understand the exception. Thanks for mentioning the copy
> > >> > operation.
> > >> >
> > >> > Finally I do see something about aliases.
> > >> >
> > >> > Thanks,
> > >> >
> > >> > Colin
> > >> >
> > >> >
> > >> >
> > >> > On Fri, Sep 18, 2020 at 5:32 AM Ryan Skraba <[email protected]> wrote:
> > >> > >
> > >> > > Hello Colin, you've hit one bit of fussiness with the Java SDK... you
> > >> > > can't reuse a Schema.Field object in two Records, because a field
> > >> > > knows its own position in the record[1].  If a field were to
> > >> belong to
> > >> > > two records at different positions, this method would have an
> > >> > > ambiguous response.
> > >> > >
> > >> > > As a workaround, since Avro 1.9, there's a copy constructor that you
> > >> > > can use to clone the field:
> > >> > >
> > >> > > List<Schema.Field> clonedFields = existingFields.stream()
> > >> > >         .map(f -> new Schema.Field(f, f.schema()))
> > >> > >         .collect(Collectors.toList());
> > >> > >
> > >> > > That being said, I don't see any reason we MUST throw an exception.
> > >> > > There's a couple of alternative strategies we could use in the Java
> > >> > > SDK:
> > >> > >
> > >> > > 1. If the position is the same in both records, allow the field
> > >> to be
> > >> > > reused (which enables cloning use cases).
> > >> > >
> > >> > > 2. Make a copy of the field to reuse internally if the position is
> > >> > > already set (probably OK, since it's supposed to be immutable).
> > >> > >
> > >> > > 3. Allow the field to be reused, only throw the exception only if
> > >> > > someone calls the position() method later.
> > >> > >
> > >> > > Any of those sound like a useful change for your use case?  Don't
> > >> > > hesitate to create a JIRA or contribution if you like!
> > >> > >
> > >> > > All my best, Ryan
> > >> > >
> > >> > > On Fri, Sep 18, 2020 at 8:27 AM Colin Williams
> > >> > > <[email protected]> wrote:
> > >> > > >
> > >> > > > Hello,
> > >> > > >
> > >> > > > I'm trying to understand working with Avro records and schemas,
> > >> > > > programmatically. Then I was first trying to create a new
> > >> schema and
> > >> > > > records based on existing records, but with a different name /
> > >> > > > namespace. It seems then I don't understand getFields() or
> > >> > > > createRecord(...). Why can't I use the fields obtained from
> > >> > > > getFields() in createRecord()?  How would I go about this properly?
> > >> > > >
> > >> > > > // for an existing record already present
> > >> > > > GenericRecord someRecord
> > >> > > >
> > >> > > > // get a list of existing fields
> > >> > > > List<Schema.Field> existingFields = 
> > >> > > > someRecord.getSchema().getFields();
> > >> > > >
> > >> > > > // schema for new record with existing fields
> > >> > > > Schema updatedSchema = createRecord("UpdatedName",
> > >> > > > "","avro.com.example.namespace" , false, existingFields);
> > >> > > >
> > >> > > > ^^ throws an exception ^^
> > >> > > >
> > >> > > > /* Caused by: org.apache.avro.AvroRuntimeException: Field already
> > >> > > > used: eventMetadata type:UNION pos:0
> > >> > > > at org.apache.avro.Schema$RecordSchema.setFields(Schema.java:888)
> > >> > > > at org.apache.avro.Schema$RecordSchema.<init>(Schema.java:856)
> > >> > > > at org.apache.avro.Schema.createRecord(Schema.java:217)
> > >> > > > */
> > >> > > >
> > >> > > > final int length = fields.size();
> > >> > > >
> > >> > > > GenericRecord clonedRecord = new GenericData.Record(updatedSchema);
> > >> > > > for (int i = 0; i < length; i++) {
> > >> > > >     final Schema.Field field = existingFields.get(i);
> > >> > > >     clonedRecord.put(i, someRecord.get(i));
> > >> > > > }
> > >> > > >
> > >> > > >
> > >> > > > Best Regards,
> > >> > > >
> > >> > > > Colin Williams
> > >

Reply via email to