Hi Colin,
You are omitting some details from your mail (such as the actual schema)
but I am suspecting in your first email you tried to do something like this
Schema SCHEMA = SchemaBuilder
.record("test")
.namespace("foo")
.fields()
.name("foo").type().stringType().noDefault()
.name("bar").type().optional().stringType()
.endRecord();
GenericRecord castRecord = new GenericData.Record(SCHEMA);
castRecord.put("foo", "foo value");
castRecord.put("bar", "bar value");
Schema clonedSchema = Schema.createRecord("othertest",
"",
"bar",
false,
SCHEMA.getFields());
And this didn't work because as Ryan explained schema fields cannot be
reused. So he suggested that the fields should be copied first.
List<Schema.Field> clonedFields = SCHEMA.getFields().stream()
.map(f -> new Schema.Field(f, f.schema()))
.collect(Collectors.toList());
I tested it and it works fine. You should use it like this:
Schema SCHEMA = SchemaBuilder
.record("test")
.namespace("foo")
.fields()
.name("foo").type().stringType().noDefault()
.name("bar").type().optional().stringType()
.endRecord();
GenericRecord castRecord = new GenericData.Record(SCHEMA);
castRecord.put("foo", "foo value");
castRecord.put("bar", "bar value");
List<Schema.Field> clonedFields = SCHEMA.getFields().stream()
.map(f -> new Schema.Field(f, f.schema()))
.collect(Collectors.toList());
Schema clonedSchema = Schema.createRecord("othertest",
"",
"bar",
false,
clonedFields);
final GenericRecord clonedRecord = new GenericData.Record(clonedSchema);
for (int i = 0; i < clonedFields.size(); i++) {
clonedRecord.put(i, castRecord.get(i));
}
GenericDatumWriter<GenericRecord> writer = new
GenericDatumWriter<GenericRecord>(SCHEMA);
ByteArrayOutputStream clonedRecordOutputStream = new ByteArrayOutputStream();
Encoder binaryEncoder =
EncoderFactory.get().binaryEncoder(clonedRecordOutputStream, null);
writer.write(clonedRecord, binaryEncoder);
Also for more robustness against reordered fields in clonedSchema, you
should probably do the data copying something like this:
for (Schema.Field field : castRecord.getSchema().getFields()) {
clonedRecord.put(field.name(), castRecord.get(field.name()));
}
On Sep 20 2020, at 7:33 am, Colin Williams
<[email protected]> wrote:
> I found a way to get my record schema so I'm no longer throwing the
> same exception. I am now able to put the record indexes for the cloned
> record.
>
> GenericRecord castRecord = (GenericRecord) input.getPayloadObject();
> final GenericRecord clonedRecord = new GenericData.Record(SCHEMA);
> clonedRecord.getSchema().getFields();
> int length = SCHEMA.getFields().size();
> for (int i = 0; i < length; i++) {
> clonedRecord.put(i, castRecord.get(i));
> }
>
> GenericDatumWriter<GenericRecord> writer = new
> GenericDatumWriter<GenericRecord>(SCHEMA);
> ByteArrayOutputStream clonedRecordOutputStream = new ByteArrayOutputStream();
> Encoder binaryEncoder =
> EncoderFactory.get().binaryEncoder(clonedRecordOutputStream, null);
> writer.write(clonedRecord, binaryEncoder);
>
> ^^ Throws UnresolvedUnionException ```Caused by:
> org.apache.avro.UnresolvedUnionException: Not in union
> ["null",{"type":"record","name"```
>
> The schema is identical between the records, with the exception of the
> name and namespace. Then I don't understand why I'm getting the
> exception. I can write with the castRecord and it's schema. Is this a
> byte alignment issue caused by the name and namespace? I didn't see
> them in the record indexes.
>
>
> On Sat, Sep 19, 2020 at 3:04 PM Colin Williams
> <[email protected]> wrote:
>>
>> I gave it a shot but from a MapFunction that supposed to be
>> serializable. I got an NPE.
>>
>> I assume I can't create a new Schema.Field from within that
>> MapFunction . I didn't seem to have trouble accessing the existing
>> schema fields.
>>
>> > List<Schema.Field> clonedFields = existingFields.stream()
>> .map(f -> new Schema.Field(f, f.schema()))
>> .collect(Collectors.toList());
>>
>> On Fri, Sep 18, 2020 at 7:09 PM Colin Williams
>> <[email protected]> wrote:
>> >
>> > Hi Ryan, Thanks for your explanation. I am thinking now that the
>> > design of AVRO suggests that data and schemas are very planned things.
>> > That changes are planned through versioning and we don't like
>> > duplicated schemas (when the positioning makes sense).
>> >
>> > I have a round about way of learning. Sometimes I am working with data
>> > and I think it's convenient to transform my data programmatically and
>> > try to obtain a schema from that. Also I think that schemas can become
>> > cumbersome when many fields are involved in intricate patterns.
>> >
>> > I think maybe there are other forms maybe more well suited for that.
>> >
>> > Regarding your proposals 1,2 seem reasonable to me. But someone like
>> > myself might also not fully understand the design of AVRO.
>> > A better exception or some kind of lead for armchair programmers to
>> > better understand the exception. Thanks for mentioning the copy
>> > operation.
>> >
>> > Finally I do see something about aliases.
>> >
>> > Thanks,
>> >
>> > Colin
>> >
>> >
>> >
>> > On Fri, Sep 18, 2020 at 5:32 AM Ryan Skraba <[email protected]> wrote:
>> > >
>> > > Hello Colin, you've hit one bit of fussiness with the Java SDK... you
>> > > can't reuse a Schema.Field object in two Records, because a field
>> > > knows its own position in the record[1]. If a field were to
>> belong to
>> > > two records at different positions, this method would have an
>> > > ambiguous response.
>> > >
>> > > As a workaround, since Avro 1.9, there's a copy constructor that you
>> > > can use to clone the field:
>> > >
>> > > List<Schema.Field> clonedFields = existingFields.stream()
>> > > .map(f -> new Schema.Field(f, f.schema()))
>> > > .collect(Collectors.toList());
>> > >
>> > > That being said, I don't see any reason we MUST throw an exception.
>> > > There's a couple of alternative strategies we could use in the Java
>> > > SDK:
>> > >
>> > > 1. If the position is the same in both records, allow the field
>> to be
>> > > reused (which enables cloning use cases).
>> > >
>> > > 2. Make a copy of the field to reuse internally if the position is
>> > > already set (probably OK, since it's supposed to be immutable).
>> > >
>> > > 3. Allow the field to be reused, only throw the exception only if
>> > > someone calls the position() method later.
>> > >
>> > > Any of those sound like a useful change for your use case? Don't
>> > > hesitate to create a JIRA or contribution if you like!
>> > >
>> > > All my best, Ryan
>> > >
>> > > On Fri, Sep 18, 2020 at 8:27 AM Colin Williams
>> > > <[email protected]> wrote:
>> > > >
>> > > > Hello,
>> > > >
>> > > > I'm trying to understand working with Avro records and schemas,
>> > > > programmatically. Then I was first trying to create a new
>> schema and
>> > > > records based on existing records, but with a different name /
>> > > > namespace. It seems then I don't understand getFields() or
>> > > > createRecord(...). Why can't I use the fields obtained from
>> > > > getFields() in createRecord()? How would I go about this properly?
>> > > >
>> > > > // for an existing record already present
>> > > > GenericRecord someRecord
>> > > >
>> > > > // get a list of existing fields
>> > > > List<Schema.Field> existingFields = someRecord.getSchema().getFields();
>> > > >
>> > > > // schema for new record with existing fields
>> > > > Schema updatedSchema = createRecord("UpdatedName",
>> > > > "","avro.com.example.namespace" , false, existingFields);
>> > > >
>> > > > ^^ throws an exception ^^
>> > > >
>> > > > /* Caused by: org.apache.avro.AvroRuntimeException: Field already
>> > > > used: eventMetadata type:UNION pos:0
>> > > > at org.apache.avro.Schema$RecordSchema.setFields(Schema.java:888)
>> > > > at org.apache.avro.Schema$RecordSchema.<init>(Schema.java:856)
>> > > > at org.apache.avro.Schema.createRecord(Schema.java:217)
>> > > > */
>> > > >
>> > > > final int length = fields.size();
>> > > >
>> > > > GenericRecord clonedRecord = new GenericData.Record(updatedSchema);
>> > > > for (int i = 0; i < length; i++) {
>> > > > final Schema.Field field = existingFields.get(i);
>> > > > clonedRecord.put(i, someRecord.get(i));
>> > > > }
>> > > >
>> > > >
>> > > > Best Regards,
>> > > >
>> > > > Colin Williams
>