Hi,

I am struggling to familiarise myself with schema evolution and schema
projection using the avro-c implementation.

There doesn't seem to be much information available on how to perform these
tasks. The examples on the C API page confusingly mix the old datum API
with the new value API.

I have built what I think is a really simple example of testing schema
projection but it does not work the way I think it should work - more than
likely my understanding is wrong.

Where I ask for one particular field (by specifying the field name) of a
record to be retrieved I instead get every field that matches the request
type.

The attached file projection_01.c (attached and at
https://gist.github.com/claws/5056626) defines a really simple record with
If I avrocat the container file I see:
{"Field_1": 1, "Field_2": 1}
{"Field_1": 2, "Field_2": 2}
{"Field_1": 3, "Field_2": 3}
{"Field_1": 4, "Field_2": 4}
{"Field_1": 5, "Field_2": 5}

The projection schema being used is a record only containing Field_2 of
type int. I only expected that field to be returned by the reader yet I
receive every int type field, confusingly labelled as "Field_2".

When I run projection_01.c I see:
{"Field_2": 1}
{"Field_2": 1}
{"Field_2": 2}
{"Field_2": 2}
{"Field_2": 3}
{"Field_2": 3}
{"Field_2": 4}
{"Field_2": 4}
{"Field_2": 5}
{"Field_2": 5}

Is this how schema projection is supposed to work? Does it just return
items of the same type irrespective of the field name specified?

I think I am missing something about how this is supposed to work. Perhaps
my example record is too simple.

So, I then created a slightly more complex schema that contained a
sub-record and the projection seems to work how I think it should work.
This can be seen in the output from projection_02.c (attached and at
https://gist.github.com/claws/5056643) which returns:
{"Field_2": {"SubField_1": 1, "SubField_2": 42}}
{"Field_2": {"SubField_1": 24, "SubField_2": 3}}
{"Field_2": {"SubField_1": 2, "SubField_2": 42}}
{"Field_2": {"SubField_1": 24, "SubField_2": 3}}
{"Field_2": {"SubField_1": 3, "SubField_2": 42}}
{"Field_2": {"SubField_1": 24, "SubField_2": 3}}
{"Field_2": {"SubField_1": 4, "SubField_2": 42}}
{"Field_2": {"SubField_1": 24, "SubField_2": 3}}
{"Field_2": {"SubField_1": 5, "SubField_2": 42}}
{"Field_2": {"SubField_1": 24, "SubField_2": 3}}

>From this trial and error it appears that the projection will return me
values that match the projection schema's types - but does not take into
account any 'name' fields. Would that be an accurate assessment?

Can anyone provide some more information on schema projection?
Is there a good example anywhere?

Regards,
Chris
/*
* Build using:
* gcc -Wall -std=c99 projection_01.c -o projection_01_test -I/usr/include -I$AVRO_INCLUDE_DIR -L$AVRO_LIB_DIR -lavro
*/


#include <stdlib.h>
#include <stdio.h>
#include <avro.h>

/* Simple schema for this test */

const char  SIMPLE_SCHEMA[] =
"{\"type\":\"record\",\
  \"name\":\"SimpleScehma\",\
  \"fields\":[\
     {\"name\": \"Field_1\", \"type\": \"int\"},\
     {\"name\": \"Field_2\", \"type\": \"int\"}]}";


const char  PROJECTED_SCHEMA[] =
"{\"type\":\"record\",\
  \"name\":\"SimpleScehma\",\
  \"fields\":[\
     {\"name\": \"Field_2\", \"type\": \"int\"}]}";



const char *archive_file = "archive_file.avro";

int field_1_val = 0;
int field_2_val = 0;

avro_schema_t schema;



void add_item(avro_file_writer_t writer) {

    avro_value_t value;
    avro_value_iface_t *iface = avro_generic_class_from_schema(schema);
    avro_generic_value_new(iface, &value);

    avro_value_t field_1;
    avro_value_t field_2;
    avro_value_t field_3;
    avro_value_t sub_field_1;
    avro_value_t sub_field_2;
    size_t index = 0;

    if (avro_value_get_by_name(&value, "Field_1", &field_1, &index) == 0) {
        avro_value_set_int(&field_1, ++field_1_val);
    }

    if (avro_value_get_by_name(&value, "Field_2", &field_2, &index) == 0) {

        if (avro_value_get_by_name(&field_2, "SubField_1", &sub_field_1, &index) == 0) {
            avro_value_set_long(&sub_field_1, 42);
        }
        if (avro_value_get_by_name(&field_2, "SubField_2", &sub_field_2, &index) == 0) {
            avro_value_set_int(&sub_field_2, 24);
        }
    }

    if (avro_value_get_by_name(&value, "Field_3", &field_3, &index) == 0) {
        avro_value_set_int(&field_3, 3);
    }

    if (avro_file_writer_append_value(writer, &value)) {
        printf("Error appending item to archive: %s\n", avro_strerror());
        exit(EXIT_FAILURE);
    }

    avro_value_decref(&field_1);
    //avro_value_decref(&field_2);   // causes segfault!?
    avro_value_iface_decref(iface);
    avro_value_decref(&value);
}


void create_archive_test() {

    remove(archive_file);

    avro_file_writer_t writer;
    if (avro_file_writer_create(archive_file, schema, &writer)) {
        printf("Error creating file writer: %s\n", avro_strerror());
        exit(EXIT_FAILURE);
    }

    for (int i=0; i<5; i++) {
        add_item(writer);
    }

    avro_file_writer_flush(writer);
    avro_file_writer_close(writer);

}


void read_archive_test() {

    avro_file_reader_t reader;
    if (avro_file_reader(archive_file, &reader)) {
        printf("Error creating reader for test: %s\n", avro_strerror());
        exit(EXIT_FAILURE);
    }

    avro_schema_t writer_schema;
    writer_schema = avro_file_reader_get_writer_schema(reader);

    avro_value_t value;
    avro_value_iface_t *iface = avro_generic_class_from_schema(writer_schema);
    avro_generic_value_new(iface, &value);

    while(avro_file_reader_read_value(reader, &value)) {
        char *json;
        if (avro_value_to_json(&value, 1, &json)) {
            printf("Problem converting value to JSON: %s\n", avro_strerror());
        } else {
            printf("%s\n", json);
        }
        free(json);
        avro_value_reset(&value);
    }

    avro_value_decref(&value);
    avro_value_iface_decref(iface);
    avro_file_reader_close(reader);
    avro_schema_decref(writer_schema);
}


void projection_test() {

    avro_schema_t projection_schema;
    if (avro_schema_from_json_literal(PROJECTED_SCHEMA, &projection_schema)) {
        printf("Error loading projection schema from file: %s\n", avro_strerror());
        exit(EXIT_FAILURE);
    }

    avro_value_t value;
    avro_value_iface_t *iface = avro_generic_class_from_schema(projection_schema);
    avro_generic_value_new(iface, &value);

    avro_file_reader_t reader;
    if (avro_file_reader(archive_file, &reader)) {
        printf("Error creating reader for projection test: %s\n", avro_strerror());
        exit(EXIT_FAILURE);
    }

    while(avro_file_reader_read_value(reader, &value)==0) {
        char *json;
        if (avro_value_to_json(&value, 1, &json)) {
            printf("Problem converting projected value to JSON: %s\n", avro_strerror());
        } else {
            printf("%s\n", json);
        }
        free(json);
        avro_value_reset(&value);
    }

    avro_value_decref(&value);
    avro_value_iface_decref(iface);
    avro_file_reader_close(reader);
    avro_schema_decref(projection_schema);
}




int main(int argc, char *argv[]) {

    if (avro_schema_from_json_literal(SIMPLE_SCHEMA, &schema)) {
        printf("Error loading schema from file: %s\n", avro_strerror());
        exit(EXIT_FAILURE);
    }

    printf("Creating archive...\n");
    create_archive_test();
    printf("Reading archive...\n");
    read_archive_test();
    printf("Reading archive using a projection schema...\n");
    projection_test();

    avro_schema_decref(schema);
    return 0;
}
/*
* Build using:
* gcc -Wall -std=c99 projection_02.c -o projection_02_test -I/usr/include -I$AVRO_INCLUDE_DIR -L$AVRO_LIB_DIR -lavro
*/

#include <stdlib.h>
#include <stdio.h>
#include <avro.h>

/* Simple schema for this test */

const char  SIMPLE_SCHEMA[] =
"{\"type\":\"record\",\
  \"name\":\"SimpleScehma\",\
  \"fields\":[\
     {\"name\": \"Field_1\", \"type\": \"int\"},\
     {\"name\": \"Field_2\", \"type\": {\
       \"name\": \"SubRecord\",\
       \"type\": \"record\",\
       \"fields\": [\
        {\"name\": \"SubField_1\", \"type\": \"long\", \"default\": [\"0\"]},\
        {\"name\": \"SubField_2\", \"type\": \"int\", \"default\": [\"0\"]}]}},\
     {\"name\": \"Field_3\", \"type\": \"int\"}]}";


const char  PROJECTED_SCHEMA[] =
"{\"type\":\"record\",\
  \"name\":\"SimpleScehma\",\
  \"fields\":[\
     {\"name\": \"Field_2\", \"type\": {\
       \"name\": \"SubRecord\",\
       \"type\": \"record\",\
       \"fields\": [\
        {\"name\": \"SubField_1\", \"type\": \"long\", \"default\": [\"0\"]},\
        {\"name\": \"SubField_2\", \"type\": \"int\", \"default\": [\"0\"]}]}}]}";


const char *archive_file = "archive_file.avro";

int field_1_val = 0;
int field_2_val = 0;

avro_schema_t schema;



void add_item(avro_file_writer_t writer) {

    avro_value_t value;
    avro_value_iface_t *iface = avro_generic_class_from_schema(schema);
    avro_generic_value_new(iface, &value);

    avro_value_t field_1;
    avro_value_t field_2;
    avro_value_t field_3;
    avro_value_t sub_field_1;
    avro_value_t sub_field_2;
    size_t index = 0;

    if (avro_value_get_by_name(&value, "Field_1", &field_1, &index) == 0) {
        avro_value_set_int(&field_1, ++field_1_val);
    }

    if (avro_value_get_by_name(&value, "Field_2", &field_2, &index) == 0) {

        if (avro_value_get_by_name(&field_2, "SubField_1", &sub_field_1, &index) == 0) {
            avro_value_set_long(&sub_field_1, 42);
        }
        if (avro_value_get_by_name(&field_2, "SubField_2", &sub_field_2, &index) == 0) {
            avro_value_set_int(&sub_field_2, 24);
        }
    }

    if (avro_value_get_by_name(&value, "Field_3", &field_3, &index) == 0) {
        avro_value_set_int(&field_3, 3);
    }

    if (avro_file_writer_append_value(writer, &value)) {
        printf("Error appending item to archive: %s\n", avro_strerror());
        exit(EXIT_FAILURE);
    }

    avro_value_decref(&field_1);
    //avro_value_decref(&field_2);   // causes segfault!?
    avro_value_iface_decref(iface);
    avro_value_decref(&value);
}


void create_archive_test() {

    remove(archive_file);

    avro_file_writer_t writer;
    if (avro_file_writer_create(archive_file, schema, &writer)) {
        printf("Error creating file writer: %s\n", avro_strerror());
        exit(EXIT_FAILURE);
    }

    for (int i=0; i<5; i++) {
        add_item(writer);
    }

    avro_file_writer_flush(writer);
    avro_file_writer_close(writer);

}


void read_archive_test() {

    avro_file_reader_t reader;
    if (avro_file_reader(archive_file, &reader)) {
        printf("Error creating reader for test: %s\n", avro_strerror());
        exit(EXIT_FAILURE);
    }

    avro_schema_t writer_schema;
    writer_schema = avro_file_reader_get_writer_schema(reader);

    avro_value_t value;
    avro_value_iface_t *iface = avro_generic_class_from_schema(writer_schema);
    avro_generic_value_new(iface, &value);

    while(avro_file_reader_read_value(reader, &value)) {
        char *json;
        if (avro_value_to_json(&value, 1, &json)) {
            printf("Problem converting value to JSON: %s\n", avro_strerror());
        } else {
            printf("%s\n", json);
        }
        free(json);
        avro_value_reset(&value);
    }

    avro_value_decref(&value);
    avro_value_iface_decref(iface);
    avro_file_reader_close(reader);
    avro_schema_decref(writer_schema);
}


void projection_test() {

    avro_schema_t projection_schema;
    if (avro_schema_from_json_literal(PROJECTED_SCHEMA, &projection_schema)) {
        printf("Error loading projection schema from file: %s\n", avro_strerror());
        exit(EXIT_FAILURE);
    }

    avro_value_t value;
    avro_value_iface_t *iface = avro_generic_class_from_schema(projection_schema);
    avro_generic_value_new(iface, &value);

    avro_file_reader_t reader;
    if (avro_file_reader(archive_file, &reader)) {
        printf("Error creating reader for projection test: %s\n", avro_strerror());
        exit(EXIT_FAILURE);
    }

    while(avro_file_reader_read_value(reader, &value)==0) {
        char *json;
        if (avro_value_to_json(&value, 1, &json)) {
            printf("Problem converting projected value to JSON: %s\n", avro_strerror());
        } else {
            printf("%s\n", json);
        }
        free(json);
        avro_value_reset(&value);
    }

    avro_value_decref(&value);
    avro_value_iface_decref(iface);
    avro_file_reader_close(reader);
    avro_schema_decref(projection_schema);
}



int main(int argc, char *argv[]) {

    if (avro_schema_from_json_literal(SIMPLE_SCHEMA, &schema)) {
        printf("Error loading schema from file: %s\n", avro_strerror());
        exit(EXIT_FAILURE);
    }

    printf("Creating archive...\n");
    create_archive_test();
    printf("Reading archive...\n");
    read_archive_test();
    printf("Reading archive using a projection schema...\n");
    projection_test();

    avro_schema_decref(schema);
    return 0;
}

Reply via email to