Hi, Mika

Sorry I wasn't more clear in my previous emails. For the time since
after my first email because I was getting a NPE when working with the
Schema, I was able to fetch the Schema elsewhere and apply it with

final GenericRecord clonedRecord = new GenericData.Record(SCHEMA);

Then I put aside that particular issue for the time being.


Then I provided part of the related exception when attempting to write
the copy. Unfortunately I can't currently provide the schema but maybe
I will figure out a way to provide a reproduction eventually.

E.G. Throws UnresolvedUnionException ```Caused by:
org.apache.avro.UnresolvedUnionException: Not in union
["null",{"type":"record","name"...
at org.apache.avro.generic.GenericData.resolveUnion(GenericData.java:853)
at 
org.apache.avro.generic.GenericDatumWriter.resolveUnion(GenericDatumWriter.java:249)
at 
org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:142)
at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:82)
at 
org.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter.java:206)
at 
org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:195)
at 
org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:130)
at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:82)
at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:72)
```

I tried your copy also and got an NPE. I thought that it might be
somewhat obvious from the exception message and details what is
causing the exception.

It might be that I can't do this within a Map operation in a
distributed platform like Spark?


Thanks and Best Regards,

Colin



On Mon, Sep 21, 2020 at 6:22 AM Mika Ristimaki <[email protected]> wrote:
>
> Hi Colin,
>
> You are omitting some details from your mail (such as the actual schema)
> but I am suspecting in your first email you tried to do something like this
>
>  Schema SCHEMA = SchemaBuilder
>                 .record("test")
>                 .namespace("foo")
>                 .fields()
>                     .name("foo").type().stringType().noDefault()
>                     .name("bar").type().optional().stringType()
>                 .endRecord();
>
> GenericRecord castRecord = new GenericData.Record(SCHEMA);
> castRecord.put("foo", "foo value");
> castRecord.put("bar", "bar value");
>
> Schema clonedSchema = Schema.createRecord("othertest",
>                 "",
>                 "bar",
>                 false,
>                 SCHEMA.getFields());
>
> And this didn't work because as Ryan explained schema fields cannot be
> reused. So he suggested that the fields should be copied first.
>
> List<Schema.Field> clonedFields = SCHEMA.getFields().stream()
>                 .map(f -> new Schema.Field(f, f.schema()))
>                 .collect(Collectors.toList());
>
> I tested it and it works fine. You should use it like this:
>
> Schema SCHEMA = SchemaBuilder
>         .record("test")
>         .namespace("foo")
>         .fields()
>             .name("foo").type().stringType().noDefault()
>             .name("bar").type().optional().stringType()
>         .endRecord();
>
> GenericRecord castRecord = new GenericData.Record(SCHEMA);
> castRecord.put("foo", "foo value");
> castRecord.put("bar", "bar value");
>
> List<Schema.Field> clonedFields = SCHEMA.getFields().stream()
>         .map(f -> new Schema.Field(f, f.schema()))
>         .collect(Collectors.toList());
> Schema clonedSchema = Schema.createRecord("othertest",
>         "",
>         "bar",
>         false,
>         clonedFields);
>
> final GenericRecord clonedRecord = new GenericData.Record(clonedSchema);
> for (int i = 0; i < clonedFields.size(); i++) {
>     clonedRecord.put(i, castRecord.get(i));
> }
>
> GenericDatumWriter<GenericRecord> writer = new
>         GenericDatumWriter<GenericRecord>(SCHEMA);
> ByteArrayOutputStream clonedRecordOutputStream = new ByteArrayOutputStream();
> Encoder binaryEncoder =
>         EncoderFactory.get().binaryEncoder(clonedRecordOutputStream, null);
> writer.write(clonedRecord, binaryEncoder);
>
> Also for more robustness against reordered fields in clonedSchema, you
> should probably do the data copying something like this:
>
> for (Schema.Field field : castRecord.getSchema().getFields()) {
>     clonedRecord.put(field.name(), castRecord.get(field.name()));
> }
>
>
> On Sep 20 2020, at 7:33 am, Colin Williams
> <[email protected]> wrote:
>
> > I found a way to get my record schema so I'm no longer throwing the
> > same exception. I am now able to put the record indexes for the cloned
> > record.
> >
> > GenericRecord castRecord = (GenericRecord) input.getPayloadObject();
> > final GenericRecord clonedRecord = new GenericData.Record(SCHEMA);
> > clonedRecord.getSchema().getFields();
> > int length = SCHEMA.getFields().size();
> > for (int i = 0; i < length; i++) {
> >    clonedRecord.put(i, castRecord.get(i));
> > }
> >
> > GenericDatumWriter<GenericRecord> writer = new
> > GenericDatumWriter<GenericRecord>(SCHEMA);
> > ByteArrayOutputStream clonedRecordOutputStream = new 
> > ByteArrayOutputStream();
> > Encoder binaryEncoder =
> > EncoderFactory.get().binaryEncoder(clonedRecordOutputStream, null);
> > writer.write(clonedRecord, binaryEncoder);
> >
> > ^^ Throws UnresolvedUnionException ```Caused by:
> > org.apache.avro.UnresolvedUnionException: Not in union
> > ["null",{"type":"record","name"```
> >
> > The schema is identical between the records, with the exception of the
> > name and namespace. Then I don't understand why I'm getting the
> > exception. I can write with the castRecord and it's schema. Is this a
> > byte alignment issue caused by the name and namespace? I didn't see
> > them in the record indexes.
> >
> >
> > On Sat, Sep 19, 2020 at 3:04 PM Colin Williams
> > <[email protected]> wrote:
> >>
> >> I gave it a shot but from a MapFunction that supposed to be
> >> serializable. I got an NPE.
> >>
> >> I assume I can't create a new Schema.Field from within that
> >> MapFunction . I didn't seem to have trouble accessing the existing
> >> schema fields.
> >>
> >> > List<Schema.Field> clonedFields = existingFields.stream()
> >>         .map(f -> new Schema.Field(f, f.schema()))
> >>         .collect(Collectors.toList());
> >>
> >> On Fri, Sep 18, 2020 at 7:09 PM Colin Williams
> >> <[email protected]> wrote:
> >> >
> >> > Hi Ryan, Thanks for your explanation. I am thinking now that the
> >> > design of AVRO suggests that data and schemas are very planned things.
> >> > That changes are planned through versioning and we don't like
> >> > duplicated schemas (when the positioning makes sense).
> >> >
> >> > I have a round about way of learning. Sometimes I am working with data
> >> > and I think it's convenient to transform my data programmatically and
> >> > try to obtain a schema from that. Also I think that schemas can become
> >> > cumbersome when many fields are involved in intricate patterns.
> >> >
> >> > I think maybe there are other forms maybe more well suited for that.
> >> >
> >> > Regarding your proposals 1,2 seem reasonable to me. But someone like
> >> > myself might also not fully understand the design of AVRO.
> >> > A better exception or some kind of lead for armchair programmers to
> >> > better understand the exception. Thanks for mentioning the copy
> >> > operation.
> >> >
> >> > Finally I do see something about aliases.
> >> >
> >> > Thanks,
> >> >
> >> > Colin
> >> >
> >> >
> >> >
> >> > On Fri, Sep 18, 2020 at 5:32 AM Ryan Skraba <[email protected]> wrote:
> >> > >
> >> > > Hello Colin, you've hit one bit of fussiness with the Java SDK... you
> >> > > can't reuse a Schema.Field object in two Records, because a field
> >> > > knows its own position in the record[1].  If a field were to
> >> belong to
> >> > > two records at different positions, this method would have an
> >> > > ambiguous response.
> >> > >
> >> > > As a workaround, since Avro 1.9, there's a copy constructor that you
> >> > > can use to clone the field:
> >> > >
> >> > > List<Schema.Field> clonedFields = existingFields.stream()
> >> > >         .map(f -> new Schema.Field(f, f.schema()))
> >> > >         .collect(Collectors.toList());
> >> > >
> >> > > That being said, I don't see any reason we MUST throw an exception.
> >> > > There's a couple of alternative strategies we could use in the Java
> >> > > SDK:
> >> > >
> >> > > 1. If the position is the same in both records, allow the field
> >> to be
> >> > > reused (which enables cloning use cases).
> >> > >
> >> > > 2. Make a copy of the field to reuse internally if the position is
> >> > > already set (probably OK, since it's supposed to be immutable).
> >> > >
> >> > > 3. Allow the field to be reused, only throw the exception only if
> >> > > someone calls the position() method later.
> >> > >
> >> > > Any of those sound like a useful change for your use case?  Don't
> >> > > hesitate to create a JIRA or contribution if you like!
> >> > >
> >> > > All my best, Ryan
> >> > >
> >> > > On Fri, Sep 18, 2020 at 8:27 AM Colin Williams
> >> > > <[email protected]> wrote:
> >> > > >
> >> > > > Hello,
> >> > > >
> >> > > > I'm trying to understand working with Avro records and schemas,
> >> > > > programmatically. Then I was first trying to create a new
> >> schema and
> >> > > > records based on existing records, but with a different name /
> >> > > > namespace. It seems then I don't understand getFields() or
> >> > > > createRecord(...). Why can't I use the fields obtained from
> >> > > > getFields() in createRecord()?  How would I go about this properly?
> >> > > >
> >> > > > // for an existing record already present
> >> > > > GenericRecord someRecord
> >> > > >
> >> > > > // get a list of existing fields
> >> > > > List<Schema.Field> existingFields = 
> >> > > > someRecord.getSchema().getFields();
> >> > > >
> >> > > > // schema for new record with existing fields
> >> > > > Schema updatedSchema = createRecord("UpdatedName",
> >> > > > "","avro.com.example.namespace" , false, existingFields);
> >> > > >
> >> > > > ^^ throws an exception ^^
> >> > > >
> >> > > > /* Caused by: org.apache.avro.AvroRuntimeException: Field already
> >> > > > used: eventMetadata type:UNION pos:0
> >> > > > at org.apache.avro.Schema$RecordSchema.setFields(Schema.java:888)
> >> > > > at org.apache.avro.Schema$RecordSchema.<init>(Schema.java:856)
> >> > > > at org.apache.avro.Schema.createRecord(Schema.java:217)
> >> > > > */
> >> > > >
> >> > > > final int length = fields.size();
> >> > > >
> >> > > > GenericRecord clonedRecord = new GenericData.Record(updatedSchema);
> >> > > > for (int i = 0; i < length; i++) {
> >> > > >     final Schema.Field field = existingFields.get(i);
> >> > > >     clonedRecord.put(i, someRecord.get(i));
> >> > > > }
> >> > > >
> >> > > >
> >> > > > Best Regards,
> >> > > >
> >> > > > Colin Williams
> >

Reply via email to