Hi all,
I think the method AvroUtils.toBeamSchema has a not expected side effect.
I found out that, if you invoke it and then you run a pipeline of
GenericRecords containing a timestamp (l tried with logical-type
timestamp-millis), Beam converts such timestamp from long to
org.joda.time.DateTime. Even if you don't apply any transformation to the
pipeline.
Do you think it's a bug?
Below you can find a simple test class I wrote in order to replicate the
problem.
The first test passes while the second fails.
import org.apache.avro.Schema;
import org.apache.avro.SchemaBuilder;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.GenericRecordBuilder;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.schemas.utils.AvroUtils;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.junit.Rule;
import java.sql.Timestamp;
import static org.junit.Assert.assertEquals;
public class AvroUtilsSideEffect {
@Rule
public final transient TestPipeline pipeline = TestPipeline.create();
@Rule
public final transient TestPipeline pipeline2 = TestPipeline.create();
public final Schema testSchema = SchemaBuilder
.record("record").namespace("test")
.fields()
.name("timestamp").type().longBuilder().prop("logicalType",
"timestamp-millis").endLong().noDefault()
.endRecord();
public final GenericRecord record = new GenericRecordBuilder(testSchema)
.set("timestamp", new Timestamp(1563926400000L).getTime())
.build();
@org.junit.Test
public void test() {
pipeline.apply( Create.of(record).withCoder(AvroCoder.of(testSchema)))
.apply( Combine.globally(new TestFn()));
pipeline.run().waitUntilFinish();
}
@org.junit.Test
public void test2() {
AvroUtils.toBeamSchema(testSchema);
pipeline2.apply(Create.of(record).withCoder(AvroCoder.of(testSchema)))
.apply(Combine.globally(new TestFn()));
pipeline2.run().waitUntilFinish();
}
public static class TestFn implements
SerializableFunction<Iterable<GenericRecord>, GenericRecord> {
@Override
public GenericRecord apply(Iterable<GenericRecord> input) {
for (GenericRecord item : input) {
if(item != null){
assertEquals(Long.class, item.get("timestamp").getClass());
assertEquals(1563926400000L, item.get("timestamp"));
}
return item;
}
return null;
}
}
}
Thanks,
Paolo
--
Paolo Tomeo, PhD
Big Data and Machine Learning Engineer
linkedin.com/in/ptomeo <https://www.linkedin.com/in/ptomeo>