*I tried your copy also and got the same UnresolvedUnionException*
On Mon, Sep 21, 2020 at 9:57 PM Colin Williams <[email protected]> wrote: > > Hi, Mika > > Sorry I wasn't more clear in my previous emails. For the time since > after my first email because I was getting a NPE when working with the > Schema, I was able to fetch the Schema elsewhere and apply it with > > final GenericRecord clonedRecord = new GenericData.Record(SCHEMA); > > Then I put aside that particular issue for the time being. > > > Then I provided part of the related exception when attempting to write > the copy. Unfortunately I can't currently provide the schema but maybe > I will figure out a way to provide a reproduction eventually. > > E.G. Throws UnresolvedUnionException ```Caused by: > org.apache.avro.UnresolvedUnionException: Not in union > ["null",{"type":"record","name"... > at org.apache.avro.generic.GenericData.resolveUnion(GenericData.java:853) > at > org.apache.avro.generic.GenericDatumWriter.resolveUnion(GenericDatumWriter.java:249) > at > org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:142) > at > org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:82) > at > org.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter.java:206) > at > org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:195) > at > org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:130) > at > org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:82) > at > org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:72) > ``` > > I tried your copy also and got an NPE. I thought that it might be > somewhat obvious from the exception message and details what is > causing the exception. > > It might be that I can't do this within a Map operation in a > distributed platform like Spark? > > > Thanks and Best Regards, > > Colin > > > > On Mon, Sep 21, 2020 at 6:22 AM Mika Ristimaki <[email protected]> > wrote: > > > > Hi Colin, > > > > You are omitting some details from your mail (such as the actual schema) > > but I am suspecting in your first email you tried to do something like this > > > > Schema SCHEMA = SchemaBuilder > > .record("test") > > .namespace("foo") > > .fields() > > .name("foo").type().stringType().noDefault() > > .name("bar").type().optional().stringType() > > .endRecord(); > > > > GenericRecord castRecord = new GenericData.Record(SCHEMA); > > castRecord.put("foo", "foo value"); > > castRecord.put("bar", "bar value"); > > > > Schema clonedSchema = Schema.createRecord("othertest", > > "", > > "bar", > > false, > > SCHEMA.getFields()); > > > > And this didn't work because as Ryan explained schema fields cannot be > > reused. So he suggested that the fields should be copied first. > > > > List<Schema.Field> clonedFields = SCHEMA.getFields().stream() > > .map(f -> new Schema.Field(f, f.schema())) > > .collect(Collectors.toList()); > > > > I tested it and it works fine. You should use it like this: > > > > Schema SCHEMA = SchemaBuilder > > .record("test") > > .namespace("foo") > > .fields() > > .name("foo").type().stringType().noDefault() > > .name("bar").type().optional().stringType() > > .endRecord(); > > > > GenericRecord castRecord = new GenericData.Record(SCHEMA); > > castRecord.put("foo", "foo value"); > > castRecord.put("bar", "bar value"); > > > > List<Schema.Field> clonedFields = SCHEMA.getFields().stream() > > .map(f -> new Schema.Field(f, f.schema())) > > .collect(Collectors.toList()); > > Schema clonedSchema = Schema.createRecord("othertest", > > "", > > "bar", > > false, > > clonedFields); > > > > final GenericRecord clonedRecord = new GenericData.Record(clonedSchema); > > for (int i = 0; i < clonedFields.size(); i++) { > > clonedRecord.put(i, castRecord.get(i)); > > } > > > > GenericDatumWriter<GenericRecord> writer = new > > GenericDatumWriter<GenericRecord>(SCHEMA); > > ByteArrayOutputStream clonedRecordOutputStream = new > > ByteArrayOutputStream(); > > Encoder binaryEncoder = > > EncoderFactory.get().binaryEncoder(clonedRecordOutputStream, null); > > writer.write(clonedRecord, binaryEncoder); > > > > Also for more robustness against reordered fields in clonedSchema, you > > should probably do the data copying something like this: > > > > for (Schema.Field field : castRecord.getSchema().getFields()) { > > clonedRecord.put(field.name(), castRecord.get(field.name())); > > } > > > > > > On Sep 20 2020, at 7:33 am, Colin Williams > > <[email protected]> wrote: > > > > > I found a way to get my record schema so I'm no longer throwing the > > > same exception. I am now able to put the record indexes for the cloned > > > record. > > > > > > GenericRecord castRecord = (GenericRecord) input.getPayloadObject(); > > > final GenericRecord clonedRecord = new GenericData.Record(SCHEMA); > > > clonedRecord.getSchema().getFields(); > > > int length = SCHEMA.getFields().size(); > > > for (int i = 0; i < length; i++) { > > > clonedRecord.put(i, castRecord.get(i)); > > > } > > > > > > GenericDatumWriter<GenericRecord> writer = new > > > GenericDatumWriter<GenericRecord>(SCHEMA); > > > ByteArrayOutputStream clonedRecordOutputStream = new > > > ByteArrayOutputStream(); > > > Encoder binaryEncoder = > > > EncoderFactory.get().binaryEncoder(clonedRecordOutputStream, null); > > > writer.write(clonedRecord, binaryEncoder); > > > > > > ^^ Throws UnresolvedUnionException ```Caused by: > > > org.apache.avro.UnresolvedUnionException: Not in union > > > ["null",{"type":"record","name"``` > > > > > > The schema is identical between the records, with the exception of the > > > name and namespace. Then I don't understand why I'm getting the > > > exception. I can write with the castRecord and it's schema. Is this a > > > byte alignment issue caused by the name and namespace? I didn't see > > > them in the record indexes. > > > > > > > > > On Sat, Sep 19, 2020 at 3:04 PM Colin Williams > > > <[email protected]> wrote: > > >> > > >> I gave it a shot but from a MapFunction that supposed to be > > >> serializable. I got an NPE. > > >> > > >> I assume I can't create a new Schema.Field from within that > > >> MapFunction . I didn't seem to have trouble accessing the existing > > >> schema fields. > > >> > > >> > List<Schema.Field> clonedFields = existingFields.stream() > > >> .map(f -> new Schema.Field(f, f.schema())) > > >> .collect(Collectors.toList()); > > >> > > >> On Fri, Sep 18, 2020 at 7:09 PM Colin Williams > > >> <[email protected]> wrote: > > >> > > > >> > Hi Ryan, Thanks for your explanation. I am thinking now that the > > >> > design of AVRO suggests that data and schemas are very planned things. > > >> > That changes are planned through versioning and we don't like > > >> > duplicated schemas (when the positioning makes sense). > > >> > > > >> > I have a round about way of learning. Sometimes I am working with data > > >> > and I think it's convenient to transform my data programmatically and > > >> > try to obtain a schema from that. Also I think that schemas can become > > >> > cumbersome when many fields are involved in intricate patterns. > > >> > > > >> > I think maybe there are other forms maybe more well suited for that. > > >> > > > >> > Regarding your proposals 1,2 seem reasonable to me. But someone like > > >> > myself might also not fully understand the design of AVRO. > > >> > A better exception or some kind of lead for armchair programmers to > > >> > better understand the exception. Thanks for mentioning the copy > > >> > operation. > > >> > > > >> > Finally I do see something about aliases. > > >> > > > >> > Thanks, > > >> > > > >> > Colin > > >> > > > >> > > > >> > > > >> > On Fri, Sep 18, 2020 at 5:32 AM Ryan Skraba <[email protected]> wrote: > > >> > > > > >> > > Hello Colin, you've hit one bit of fussiness with the Java SDK... you > > >> > > can't reuse a Schema.Field object in two Records, because a field > > >> > > knows its own position in the record[1]. If a field were to > > >> belong to > > >> > > two records at different positions, this method would have an > > >> > > ambiguous response. > > >> > > > > >> > > As a workaround, since Avro 1.9, there's a copy constructor that you > > >> > > can use to clone the field: > > >> > > > > >> > > List<Schema.Field> clonedFields = existingFields.stream() > > >> > > .map(f -> new Schema.Field(f, f.schema())) > > >> > > .collect(Collectors.toList()); > > >> > > > > >> > > That being said, I don't see any reason we MUST throw an exception. > > >> > > There's a couple of alternative strategies we could use in the Java > > >> > > SDK: > > >> > > > > >> > > 1. If the position is the same in both records, allow the field > > >> to be > > >> > > reused (which enables cloning use cases). > > >> > > > > >> > > 2. Make a copy of the field to reuse internally if the position is > > >> > > already set (probably OK, since it's supposed to be immutable). > > >> > > > > >> > > 3. Allow the field to be reused, only throw the exception only if > > >> > > someone calls the position() method later. > > >> > > > > >> > > Any of those sound like a useful change for your use case? Don't > > >> > > hesitate to create a JIRA or contribution if you like! > > >> > > > > >> > > All my best, Ryan > > >> > > > > >> > > On Fri, Sep 18, 2020 at 8:27 AM Colin Williams > > >> > > <[email protected]> wrote: > > >> > > > > > >> > > > Hello, > > >> > > > > > >> > > > I'm trying to understand working with Avro records and schemas, > > >> > > > programmatically. Then I was first trying to create a new > > >> schema and > > >> > > > records based on existing records, but with a different name / > > >> > > > namespace. It seems then I don't understand getFields() or > > >> > > > createRecord(...). Why can't I use the fields obtained from > > >> > > > getFields() in createRecord()? How would I go about this properly? > > >> > > > > > >> > > > // for an existing record already present > > >> > > > GenericRecord someRecord > > >> > > > > > >> > > > // get a list of existing fields > > >> > > > List<Schema.Field> existingFields = > > >> > > > someRecord.getSchema().getFields(); > > >> > > > > > >> > > > // schema for new record with existing fields > > >> > > > Schema updatedSchema = createRecord("UpdatedName", > > >> > > > "","avro.com.example.namespace" , false, existingFields); > > >> > > > > > >> > > > ^^ throws an exception ^^ > > >> > > > > > >> > > > /* Caused by: org.apache.avro.AvroRuntimeException: Field already > > >> > > > used: eventMetadata type:UNION pos:0 > > >> > > > at org.apache.avro.Schema$RecordSchema.setFields(Schema.java:888) > > >> > > > at org.apache.avro.Schema$RecordSchema.<init>(Schema.java:856) > > >> > > > at org.apache.avro.Schema.createRecord(Schema.java:217) > > >> > > > */ > > >> > > > > > >> > > > final int length = fields.size(); > > >> > > > > > >> > > > GenericRecord clonedRecord = new GenericData.Record(updatedSchema); > > >> > > > for (int i = 0; i < length; i++) { > > >> > > > final Schema.Field field = existingFields.get(i); > > >> > > > clonedRecord.put(i, someRecord.get(i)); > > >> > > > } > > >> > > > > > >> > > > > > >> > > > Best Regards, > > >> > > > > > >> > > > Colin Williams > > >
