Hi,
I am struggling to familiarise myself with schema evolution and schema
projection using the avro-c implementation.
There doesn't seem to be much information available on how to perform these
tasks. The examples on the C API page confusingly mix the old datum API
with the new value API.
I have built what I think is a really simple example of testing schema
projection but it does not work the way I think it should work - more than
likely my understanding is wrong.
Where I ask for one particular field (by specifying the field name) of a
record to be retrieved I instead get every field that matches the request
type.
The attached file projection_01.c (attached and at
https://gist.github.com/claws/5056626) defines a really simple record with
If I avrocat the container file I see:
{"Field_1": 1, "Field_2": 1}
{"Field_1": 2, "Field_2": 2}
{"Field_1": 3, "Field_2": 3}
{"Field_1": 4, "Field_2": 4}
{"Field_1": 5, "Field_2": 5}
The projection schema being used is a record only containing Field_2 of
type int. I only expected that field to be returned by the reader yet I
receive every int type field, confusingly labelled as "Field_2".
When I run projection_01.c I see:
{"Field_2": 1}
{"Field_2": 1}
{"Field_2": 2}
{"Field_2": 2}
{"Field_2": 3}
{"Field_2": 3}
{"Field_2": 4}
{"Field_2": 4}
{"Field_2": 5}
{"Field_2": 5}
Is this how schema projection is supposed to work? Does it just return
items of the same type irrespective of the field name specified?
I think I am missing something about how this is supposed to work. Perhaps
my example record is too simple.
So, I then created a slightly more complex schema that contained a
sub-record and the projection seems to work how I think it should work.
This can be seen in the output from projection_02.c (attached and at
https://gist.github.com/claws/5056643) which returns:
{"Field_2": {"SubField_1": 1, "SubField_2": 42}}
{"Field_2": {"SubField_1": 24, "SubField_2": 3}}
{"Field_2": {"SubField_1": 2, "SubField_2": 42}}
{"Field_2": {"SubField_1": 24, "SubField_2": 3}}
{"Field_2": {"SubField_1": 3, "SubField_2": 42}}
{"Field_2": {"SubField_1": 24, "SubField_2": 3}}
{"Field_2": {"SubField_1": 4, "SubField_2": 42}}
{"Field_2": {"SubField_1": 24, "SubField_2": 3}}
{"Field_2": {"SubField_1": 5, "SubField_2": 42}}
{"Field_2": {"SubField_1": 24, "SubField_2": 3}}
>From this trial and error it appears that the projection will return me
values that match the projection schema's types - but does not take into
account any 'name' fields. Would that be an accurate assessment?
Can anyone provide some more information on schema projection?
Is there a good example anywhere?
Regards,
Chris
/*
* Build using:
* gcc -Wall -std=c99 projection_01.c -o projection_01_test -I/usr/include -I$AVRO_INCLUDE_DIR -L$AVRO_LIB_DIR -lavro
*/
#include <stdlib.h>
#include <stdio.h>
#include <avro.h>
/* Simple schema for this test */
const char SIMPLE_SCHEMA[] =
"{\"type\":\"record\",\
\"name\":\"SimpleScehma\",\
\"fields\":[\
{\"name\": \"Field_1\", \"type\": \"int\"},\
{\"name\": \"Field_2\", \"type\": \"int\"}]}";
const char PROJECTED_SCHEMA[] =
"{\"type\":\"record\",\
\"name\":\"SimpleScehma\",\
\"fields\":[\
{\"name\": \"Field_2\", \"type\": \"int\"}]}";
const char *archive_file = "archive_file.avro";
int field_1_val = 0;
int field_2_val = 0;
avro_schema_t schema;
void add_item(avro_file_writer_t writer) {
avro_value_t value;
avro_value_iface_t *iface = avro_generic_class_from_schema(schema);
avro_generic_value_new(iface, &value);
avro_value_t field_1;
avro_value_t field_2;
avro_value_t field_3;
avro_value_t sub_field_1;
avro_value_t sub_field_2;
size_t index = 0;
if (avro_value_get_by_name(&value, "Field_1", &field_1, &index) == 0) {
avro_value_set_int(&field_1, ++field_1_val);
}
if (avro_value_get_by_name(&value, "Field_2", &field_2, &index) == 0) {
if (avro_value_get_by_name(&field_2, "SubField_1", &sub_field_1, &index) == 0) {
avro_value_set_long(&sub_field_1, 42);
}
if (avro_value_get_by_name(&field_2, "SubField_2", &sub_field_2, &index) == 0) {
avro_value_set_int(&sub_field_2, 24);
}
}
if (avro_value_get_by_name(&value, "Field_3", &field_3, &index) == 0) {
avro_value_set_int(&field_3, 3);
}
if (avro_file_writer_append_value(writer, &value)) {
printf("Error appending item to archive: %s\n", avro_strerror());
exit(EXIT_FAILURE);
}
avro_value_decref(&field_1);
//avro_value_decref(&field_2); // causes segfault!?
avro_value_iface_decref(iface);
avro_value_decref(&value);
}
void create_archive_test() {
remove(archive_file);
avro_file_writer_t writer;
if (avro_file_writer_create(archive_file, schema, &writer)) {
printf("Error creating file writer: %s\n", avro_strerror());
exit(EXIT_FAILURE);
}
for (int i=0; i<5; i++) {
add_item(writer);
}
avro_file_writer_flush(writer);
avro_file_writer_close(writer);
}
void read_archive_test() {
avro_file_reader_t reader;
if (avro_file_reader(archive_file, &reader)) {
printf("Error creating reader for test: %s\n", avro_strerror());
exit(EXIT_FAILURE);
}
avro_schema_t writer_schema;
writer_schema = avro_file_reader_get_writer_schema(reader);
avro_value_t value;
avro_value_iface_t *iface = avro_generic_class_from_schema(writer_schema);
avro_generic_value_new(iface, &value);
while(avro_file_reader_read_value(reader, &value)) {
char *json;
if (avro_value_to_json(&value, 1, &json)) {
printf("Problem converting value to JSON: %s\n", avro_strerror());
} else {
printf("%s\n", json);
}
free(json);
avro_value_reset(&value);
}
avro_value_decref(&value);
avro_value_iface_decref(iface);
avro_file_reader_close(reader);
avro_schema_decref(writer_schema);
}
void projection_test() {
avro_schema_t projection_schema;
if (avro_schema_from_json_literal(PROJECTED_SCHEMA, &projection_schema)) {
printf("Error loading projection schema from file: %s\n", avro_strerror());
exit(EXIT_FAILURE);
}
avro_value_t value;
avro_value_iface_t *iface = avro_generic_class_from_schema(projection_schema);
avro_generic_value_new(iface, &value);
avro_file_reader_t reader;
if (avro_file_reader(archive_file, &reader)) {
printf("Error creating reader for projection test: %s\n", avro_strerror());
exit(EXIT_FAILURE);
}
while(avro_file_reader_read_value(reader, &value)==0) {
char *json;
if (avro_value_to_json(&value, 1, &json)) {
printf("Problem converting projected value to JSON: %s\n", avro_strerror());
} else {
printf("%s\n", json);
}
free(json);
avro_value_reset(&value);
}
avro_value_decref(&value);
avro_value_iface_decref(iface);
avro_file_reader_close(reader);
avro_schema_decref(projection_schema);
}
int main(int argc, char *argv[]) {
if (avro_schema_from_json_literal(SIMPLE_SCHEMA, &schema)) {
printf("Error loading schema from file: %s\n", avro_strerror());
exit(EXIT_FAILURE);
}
printf("Creating archive...\n");
create_archive_test();
printf("Reading archive...\n");
read_archive_test();
printf("Reading archive using a projection schema...\n");
projection_test();
avro_schema_decref(schema);
return 0;
}
/*
* Build using:
* gcc -Wall -std=c99 projection_02.c -o projection_02_test -I/usr/include -I$AVRO_INCLUDE_DIR -L$AVRO_LIB_DIR -lavro
*/
#include <stdlib.h>
#include <stdio.h>
#include <avro.h>
/* Simple schema for this test */
const char SIMPLE_SCHEMA[] =
"{\"type\":\"record\",\
\"name\":\"SimpleScehma\",\
\"fields\":[\
{\"name\": \"Field_1\", \"type\": \"int\"},\
{\"name\": \"Field_2\", \"type\": {\
\"name\": \"SubRecord\",\
\"type\": \"record\",\
\"fields\": [\
{\"name\": \"SubField_1\", \"type\": \"long\", \"default\": [\"0\"]},\
{\"name\": \"SubField_2\", \"type\": \"int\", \"default\": [\"0\"]}]}},\
{\"name\": \"Field_3\", \"type\": \"int\"}]}";
const char PROJECTED_SCHEMA[] =
"{\"type\":\"record\",\
\"name\":\"SimpleScehma\",\
\"fields\":[\
{\"name\": \"Field_2\", \"type\": {\
\"name\": \"SubRecord\",\
\"type\": \"record\",\
\"fields\": [\
{\"name\": \"SubField_1\", \"type\": \"long\", \"default\": [\"0\"]},\
{\"name\": \"SubField_2\", \"type\": \"int\", \"default\": [\"0\"]}]}}]}";
const char *archive_file = "archive_file.avro";
int field_1_val = 0;
int field_2_val = 0;
avro_schema_t schema;
void add_item(avro_file_writer_t writer) {
avro_value_t value;
avro_value_iface_t *iface = avro_generic_class_from_schema(schema);
avro_generic_value_new(iface, &value);
avro_value_t field_1;
avro_value_t field_2;
avro_value_t field_3;
avro_value_t sub_field_1;
avro_value_t sub_field_2;
size_t index = 0;
if (avro_value_get_by_name(&value, "Field_1", &field_1, &index) == 0) {
avro_value_set_int(&field_1, ++field_1_val);
}
if (avro_value_get_by_name(&value, "Field_2", &field_2, &index) == 0) {
if (avro_value_get_by_name(&field_2, "SubField_1", &sub_field_1, &index) == 0) {
avro_value_set_long(&sub_field_1, 42);
}
if (avro_value_get_by_name(&field_2, "SubField_2", &sub_field_2, &index) == 0) {
avro_value_set_int(&sub_field_2, 24);
}
}
if (avro_value_get_by_name(&value, "Field_3", &field_3, &index) == 0) {
avro_value_set_int(&field_3, 3);
}
if (avro_file_writer_append_value(writer, &value)) {
printf("Error appending item to archive: %s\n", avro_strerror());
exit(EXIT_FAILURE);
}
avro_value_decref(&field_1);
//avro_value_decref(&field_2); // causes segfault!?
avro_value_iface_decref(iface);
avro_value_decref(&value);
}
void create_archive_test() {
remove(archive_file);
avro_file_writer_t writer;
if (avro_file_writer_create(archive_file, schema, &writer)) {
printf("Error creating file writer: %s\n", avro_strerror());
exit(EXIT_FAILURE);
}
for (int i=0; i<5; i++) {
add_item(writer);
}
avro_file_writer_flush(writer);
avro_file_writer_close(writer);
}
void read_archive_test() {
avro_file_reader_t reader;
if (avro_file_reader(archive_file, &reader)) {
printf("Error creating reader for test: %s\n", avro_strerror());
exit(EXIT_FAILURE);
}
avro_schema_t writer_schema;
writer_schema = avro_file_reader_get_writer_schema(reader);
avro_value_t value;
avro_value_iface_t *iface = avro_generic_class_from_schema(writer_schema);
avro_generic_value_new(iface, &value);
while(avro_file_reader_read_value(reader, &value)) {
char *json;
if (avro_value_to_json(&value, 1, &json)) {
printf("Problem converting value to JSON: %s\n", avro_strerror());
} else {
printf("%s\n", json);
}
free(json);
avro_value_reset(&value);
}
avro_value_decref(&value);
avro_value_iface_decref(iface);
avro_file_reader_close(reader);
avro_schema_decref(writer_schema);
}
void projection_test() {
avro_schema_t projection_schema;
if (avro_schema_from_json_literal(PROJECTED_SCHEMA, &projection_schema)) {
printf("Error loading projection schema from file: %s\n", avro_strerror());
exit(EXIT_FAILURE);
}
avro_value_t value;
avro_value_iface_t *iface = avro_generic_class_from_schema(projection_schema);
avro_generic_value_new(iface, &value);
avro_file_reader_t reader;
if (avro_file_reader(archive_file, &reader)) {
printf("Error creating reader for projection test: %s\n", avro_strerror());
exit(EXIT_FAILURE);
}
while(avro_file_reader_read_value(reader, &value)==0) {
char *json;
if (avro_value_to_json(&value, 1, &json)) {
printf("Problem converting projected value to JSON: %s\n", avro_strerror());
} else {
printf("%s\n", json);
}
free(json);
avro_value_reset(&value);
}
avro_value_decref(&value);
avro_value_iface_decref(iface);
avro_file_reader_close(reader);
avro_schema_decref(projection_schema);
}
int main(int argc, char *argv[]) {
if (avro_schema_from_json_literal(SIMPLE_SCHEMA, &schema)) {
printf("Error loading schema from file: %s\n", avro_strerror());
exit(EXIT_FAILURE);
}
printf("Creating archive...\n");
create_archive_test();
printf("Reading archive...\n");
read_archive_test();
printf("Reading archive using a projection schema...\n");
projection_test();
avro_schema_decref(schema);
return 0;
}