I am working with Avro 1.9.1 C++. I have reviewed the simple examples and
unit tests. I am having a hard time figuring out how to build a record
datum to encode into a memory buffer to provide to Kafka. I understand the
memory writer part - it's the building of the record datum that is stumping
me.

Here's my scenario. I want to create a library that can be called from some
existing C/C++ code the generates CSV records and the requirement is I have
to take as input a CSV header and record and Avro schema to map the CSV
into. The caller will provide the CSV header and a CSV data record. The CSV
header field names will exactly match the field names in an Avro JSON
schema designed for each CSV data type. However, the order of the fields in
the CSV might not match the order of the fields in the Avro schema and
there could be more or less fields in either. I need to validate all that
as part of my new library.

I am trying the simple case here where the Avro schema is flat, like the
CSV, and only has a top-level record with a bunch of fields matching those
in the CSV header. However, the Avro schema "may" have more fields than the
CSV because, as the application creating the CSV evolves, its stats do as
well. For example, it might no longer use some fields and add new fields.
The next version of the schema must still reflect the deleted fields (with
defaults) as well as new fields to support backwards/forward compatibility
from a consumer's perspective. We need to manage schema evolution in a
forward/backwards compatibility mode per the Confluent.io Kafka way of
doing this.

So, those are the preconditions. What I propose to do in this sort of
"Avroizing Proxy" is as follows:

INITIALIZATION:

1. Load and validate the Avro schema from a JSON file as a one-time
initialization action
2. Load a CSV header and create a vector of each field name parsed from the
CSV header as a one-time initialization action

FOR EACH CSV RECORD:

1. Create an empty record datum. I want to start with an empty one as a way
to ensure that, after the following steps complete, I end up will the
minimum record datum value needed to validate against the schema
2. Iterate over the CSV record, field by field. For each field value, use
its positional index to fetch the CSV header field name from the vector at
the same index
3. Using the same field name and the Avro schema, lookup the data type in
the schema for that field (We will only be dealing with the primitive types
like string, long, and float - no nested records, array, maps, enums, etc)
4. Use the data type to determine which type of primitive encoder (string,
long, float, etc) to use and attempt to cast the CSV value string into the
appropriate type required by the encoder
5. Call some method to add this encoded value to a record datum

FWIW - This is basically how I had this working in the C Avro library but
cannot seem to figure of the C++ equivalent.

AFTER EACH RECORD ITERATION COMPLETES:

It is possible that the CSV has 5 fields that all match fields in the
schema but the schema defines 7 fields due to backwards compatibility
requirements. So, now that the record darum is completely built up (with 5
fields from the CSV data), it will still NOT validate against the schema
that requires 7 fields. So, another step is required to deal with the 2
extra fields as follows:

AUTOMATIC DEFAULT HANDLING:

I don't want to make the caller have to provide all 7 fields. He has
evolved his app (and stats) and does not care about those "no longer used"
values in his code. Only, he has ensured that he has evolved his schema in
a compatible way so those old fields MUST stil be represented there. I want
the library to detect missing fields and fill in the defaults
automatically, iff defined.

1. For each field in the schema that is NOT in the record datum, check to
see if the field defines a default value
2. If a field has a default, then encode that field name into the record
datum and set its value to the default defined in the schema
3. If a field does NOT have a default, then the caller did not provide all
the required values and this would be treated as an error condition and the
record rejected
4. In the case of fully valid and now defaulted record, use the
avro::memoryOutputStream() and avro::binaryEncoder() to build the binary
Avro message that I can then publish to Kafka.

I know this is a lot of detail but I would appreciate any guidance on
specifically how to programmatically build up a datum that can then be run
through the appropriate Avro method, with a schema, to validate.

Thanks in advance.

Reply via email to