[jira] [Commented] (FLINK-24379) Support AWS Glue Schema Registry Avro for Table API
[ https://issues.apache.org/jira/browse/FLINK-24379?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17801142#comment-17801142 ] Lorenzo Nicora commented on FLINK-24379: I picked up the work and submitted a PR to flink-connector-aws repo. [https://github.com/apache/flink-connector-aws/pull/122] [~dannycranmer] would you please have a look? > Support AWS Glue Schema Registry Avro for Table API > --- > > Key: FLINK-24379 > URL: https://issues.apache.org/jira/browse/FLINK-24379 > Project: Flink > Issue Type: Improvement > Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile), Table > SQL / API >Affects Versions: aws-connector-4.2.0 >Reporter: Brad Davis >Assignee: Lorenzo Nicora >Priority: Major > Labels: pull-request-available, stale-assigned > Fix For: aws-connector-4.3.0 > > > Unlike most (all?) of the other Avro formats, the AWS Glue Schema Registry > version doesn't include a > META-INF/services/org.apache.flink.table.factories.Factory resource or a > class implementing > org.apache.flink.table.factories.DeserializationFormatFactory and > org.apache.flink.table.factories.SerializationFormatFactory. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-33152) Prometheus Sink Connector - Integration tests
Lorenzo Nicora created FLINK-33152: -- Summary: Prometheus Sink Connector - Integration tests Key: FLINK-33152 URL: https://issues.apache.org/jira/browse/FLINK-33152 Project: Flink Issue Type: Sub-task Reporter: Lorenzo Nicora Integration tests against containerised Prometheus -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-33140) Prometheus Sink Connector - E2E example on AWS
[ https://issues.apache.org/jira/browse/FLINK-33140?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lorenzo Nicora updated FLINK-33140: --- Summary: Prometheus Sink Connector - E2E example on AWS (was: Prometheus Sink Connector - E2E example) > Prometheus Sink Connector - E2E example on AWS > -- > > Key: FLINK-33140 > URL: https://issues.apache.org/jira/browse/FLINK-33140 > Project: Flink > Issue Type: Sub-task >Reporter: Lorenzo Nicora >Priority: Major > > End-to-end example application, to be deployed on Amazon Managed Service for > Apache Flink, and writing to Amazon Managed Prometheus -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-33140) Prometheus Sink Connector - E2E example on AWS
[ https://issues.apache.org/jira/browse/FLINK-33140?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lorenzo Nicora updated FLINK-33140: --- Description: End-to-end example application, deployable on Amazon Managed Service for Apache Flink, and writing to Amazon Managed Prometheus (was: End-to-end example application, to be deployed on Amazon Managed Service for Apache Flink, and writing to Amazon Managed Prometheus) > Prometheus Sink Connector - E2E example on AWS > -- > > Key: FLINK-33140 > URL: https://issues.apache.org/jira/browse/FLINK-33140 > Project: Flink > Issue Type: Sub-task >Reporter: Lorenzo Nicora >Priority: Major > > End-to-end example application, deployable on Amazon Managed Service for > Apache Flink, and writing to Amazon Managed Prometheus -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-33140) Prometheus Sink Connector - E2E example
[ https://issues.apache.org/jira/browse/FLINK-33140?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lorenzo Nicora updated FLINK-33140: --- Summary: Prometheus Sink Connector - E2E example (was: Prometheus Sink Connector - E2E test) > Prometheus Sink Connector - E2E example > --- > > Key: FLINK-33140 > URL: https://issues.apache.org/jira/browse/FLINK-33140 > Project: Flink > Issue Type: Sub-task >Reporter: Lorenzo Nicora >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-33140) Prometheus Sink Connector - E2E example
[ https://issues.apache.org/jira/browse/FLINK-33140?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lorenzo Nicora updated FLINK-33140: --- Description: End-to-end example application, to be deployed on Amazon Managed Service for Apache Flink, and writing to Amazon Managed Prometheus > Prometheus Sink Connector - E2E example > --- > > Key: FLINK-33140 > URL: https://issues.apache.org/jira/browse/FLINK-33140 > Project: Flink > Issue Type: Sub-task >Reporter: Lorenzo Nicora >Priority: Major > > End-to-end example application, to be deployed on Amazon Managed Service for > Apache Flink, and writing to Amazon Managed Prometheus -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-33137) FLIP-312: Prometheus Sink Connector
[ https://issues.apache.org/jira/browse/FLINK-33137?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lorenzo Nicora updated FLINK-33137: --- Labels: Connector (was: ) > FLIP-312: Prometheus Sink Connector > --- > > Key: FLINK-33137 > URL: https://issues.apache.org/jira/browse/FLINK-33137 > Project: Flink > Issue Type: New Feature >Reporter: Lorenzo Nicora >Priority: Major > Labels: Connector > > Umbrella Jira for implementation of Prometheus Sink Connector > https://cwiki.apache.org/confluence/display/FLINK/FLIP-312:+Prometheus+Sink+Connector -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-33141) Promentheus Sink Connector - Amazon Managed Prometheus Request Signer
Lorenzo Nicora created FLINK-33141: -- Summary: Promentheus Sink Connector - Amazon Managed Prometheus Request Signer Key: FLINK-33141 URL: https://issues.apache.org/jira/browse/FLINK-33141 Project: Flink Issue Type: Sub-task Components: Connectors / AWS Reporter: Lorenzo Nicora -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-33142) Prometheus Sink Connector - Update Documentation
Lorenzo Nicora created FLINK-33142: -- Summary: Prometheus Sink Connector - Update Documentation Key: FLINK-33142 URL: https://issues.apache.org/jira/browse/FLINK-33142 Project: Flink Issue Type: Sub-task Components: Documentation Reporter: Lorenzo Nicora -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-33140) Prometheus Sink Connector - E2E test
Lorenzo Nicora created FLINK-33140: -- Summary: Prometheus Sink Connector - E2E test Key: FLINK-33140 URL: https://issues.apache.org/jira/browse/FLINK-33140 Project: Flink Issue Type: Sub-task Reporter: Lorenzo Nicora -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-33139) Prometheus Sink Connector - Table API support
Lorenzo Nicora created FLINK-33139: -- Summary: Prometheus Sink Connector - Table API support Key: FLINK-33139 URL: https://issues.apache.org/jira/browse/FLINK-33139 Project: Flink Issue Type: Sub-task Reporter: Lorenzo Nicora -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-33138) Prometheus Connector Sink - DataStream API implementation
Lorenzo Nicora created FLINK-33138: -- Summary: Prometheus Connector Sink - DataStream API implementation Key: FLINK-33138 URL: https://issues.apache.org/jira/browse/FLINK-33138 Project: Flink Issue Type: Sub-task Reporter: Lorenzo Nicora -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-33137) FLIP-312: Prometheus Sink Connector
Lorenzo Nicora created FLINK-33137: -- Summary: FLIP-312: Prometheus Sink Connector Key: FLINK-33137 URL: https://issues.apache.org/jira/browse/FLINK-33137 Project: Flink Issue Type: New Feature Reporter: Lorenzo Nicora Umbrella Jira for implementation of Prometheus Sink Connector https://cwiki.apache.org/confluence/display/FLINK/FLIP-312:+Prometheus+Sink+Connector -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-18223) AvroSerializer does not correctly instantiate GenericRecord
[ https://issues.apache.org/jira/browse/FLINK-18223?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17130894#comment-17130894 ] Lorenzo Nicora commented on FLINK-18223: I submitted a PR but the bot complains the ticket has not been assigned. Not sure what's the process for a simple bugfix like this > AvroSerializer does not correctly instantiate GenericRecord > --- > > Key: FLINK-18223 > URL: https://issues.apache.org/jira/browse/FLINK-18223 > Project: Flink > Issue Type: Bug > Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile) >Affects Versions: 1.10.1 >Reporter: Lorenzo Nicora >Priority: Major > Labels: AVRO, pull-request-available > > {{AvroSerializer.createInstance()}} simply calls > {{InstantiationUtil.instantiate(type)}} to create a new instance, also when > type is GenericRecord. > This fails with an exception, because a GenericRecord must be instantiated > through {{GenericRecordBuilder}} but {{InstantiationUtil}} is not aware of it. > {code:java} > The class 'org.apache.avro.generic.GenericRecord' is not instantiable: The > class is not a proper class. It is either abstract, an interface, or a > primitive type.{code} > This can be proven with this test > {code:java} > @Test > public void shouldInstantiateGenericRecord() { > org.apache.avro.Schema SCHEMA = new > org.apache.avro.Schema.Parser().parse("{\"type\":\"record\",\"name\":\"Dummy\",\"namespace\":\"dummy\",\"fields\":[{\"name\":\"something\",\"type\":{\"type\":\"string\",\"avro.java.string\":\"String\"}}]}"); > AvroSerializer serializer = new > AvroSerializer<>(GenericRecord.class, SCHEMA); > serializer.createInstance(); > } > {code} > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-18223) AvroSerializer does not correctly instantiate GenericRecord
[ https://issues.apache.org/jira/browse/FLINK-18223?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17130570#comment-17130570 ] Lorenzo Nicora commented on FLINK-18223: This is what I'm trying to do, [~aljoscha]. I have the fix ready and tested. But I cannot make the full Flink test suite (mvn clean verify) pass. It looks like it's randomly failing on completely unrelated modules, while the same tests are not failing if I run them in IntelliJ :-? > AvroSerializer does not correctly instantiate GenericRecord > --- > > Key: FLINK-18223 > URL: https://issues.apache.org/jira/browse/FLINK-18223 > Project: Flink > Issue Type: Bug > Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile) >Affects Versions: 1.10.1 >Reporter: Lorenzo Nicora >Priority: Major > Labels: AVRO > > {{AvroSerializer.createInstance()}} simply calls > {{InstantiationUtil.instantiate(type)}} to create a new instance, also when > type is GenericRecord. > This fails with an exception, because a GenericRecord must be instantiated > through {{GenericRecordBuilder}} but {{InstantiationUtil}} is not aware of it. > {code:java} > The class 'org.apache.avro.generic.GenericRecord' is not instantiable: The > class is not a proper class. It is either abstract, an interface, or a > primitive type.{code} > This can be proven with this test > {code:java} > @Test > public void shouldInstantiateGenericRecord() { > org.apache.avro.Schema SCHEMA = new > org.apache.avro.Schema.Parser().parse("{\"type\":\"record\",\"name\":\"Dummy\",\"namespace\":\"dummy\",\"fields\":[{\"name\":\"something\",\"type\":{\"type\":\"string\",\"avro.java.string\":\"String\"}}]}"); > AvroSerializer serializer = new > AvroSerializer<>(GenericRecord.class, SCHEMA); > serializer.createInstance(); > } > {code} > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-18223) AvroSerializer does not correctly instantiate GenericRecord
Lorenzo Nicora created FLINK-18223: -- Summary: AvroSerializer does not correctly instantiate GenericRecord Key: FLINK-18223 URL: https://issues.apache.org/jira/browse/FLINK-18223 Project: Flink Issue Type: Bug Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile) Affects Versions: 1.10.1 Reporter: Lorenzo Nicora {{AvroSerializer.createInstance()}} simply calls {{InstantiationUtil.instantiate(type)}} to create a new instance, also when type is GenericRecord. This fails with an exception, because a GenericRecord must be instantiated through {{GenericRecordBuilder}} but {{InstantiationUtil}} is not aware of it. {code:java} The class 'org.apache.avro.generic.GenericRecord' is not instantiable: The class is not a proper class. It is either abstract, an interface, or a primitive type.{code} This can be proven with this test {code:java} @Test public void shouldInstantiateGenericRecord() { org.apache.avro.Schema SCHEMA = new org.apache.avro.Schema.Parser().parse("{\"type\":\"record\",\"name\":\"Dummy\",\"namespace\":\"dummy\",\"fields\":[{\"name\":\"something\",\"type\":{\"type\":\"string\",\"avro.java.string\":\"String\"}}]}"); AvroSerializer serializer = new AvroSerializer<>(GenericRecord.class, SCHEMA); serializer.createInstance(); } {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-17486) ClassCastException when checkpointing AVRO SpecificRecord with decimal fields
[ https://issues.apache.org/jira/browse/FLINK-17486?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lorenzo Nicora updated FLINK-17486: --- Summary: ClassCastException when checkpointing AVRO SpecificRecord with decimal fields (was: ClassCastException when copying AVRO SpecificRecord containing a decimal field) > ClassCastException when checkpointing AVRO SpecificRecord with decimal fields > - > > Key: FLINK-17486 > URL: https://issues.apache.org/jira/browse/FLINK-17486 > Project: Flink > Issue Type: Bug > Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile) >Affects Versions: 1.10.0 > Environment: Flink 1.10.0 > AVRO 1.9.2 > Java 1.8.0 (but also Java 14) > Scala binary 2.11 >Reporter: Lorenzo Nicora >Priority: Critical > Labels: AVRO, confluent-kafka, kafka > > When consuming from a Kafka source AVRO SpecificRecord containing a > {{decimal}} (logical type) field, copying the record fails with: > {{java.lang.ClassCastException: class java.math.BigDecimal cannot be cast to > class java.nio.ByteBuffer}} > I understand the problem arises when Flink tries to make a deep-copy of the > record for checkpointing. > This code reproduces the problem > ([https://github.com/nicusX/flink-avro-bug/blob/master/src/test/java/example/TestDeepCopy.java]): > > {code:java} > AvroSerializer serializer = new AvroSerializer<>(Sample.class); > Sample s1 = Sample.newBuilder() > .setPrice(BigDecimal.valueOf(42.32)) > .setId("A12345") > .build(); > Sample s2 = serializer.copy(s1); > {code} > > > The AVRO SpecificRecord is generated from this IDL (using the > maven-avro-plugin): > {code:java} > @namespace("example.avro") > protocol SampleProtocol { > record Sample{ > string id; > decimal(9,2) price; > timestamp_ms eventTime; > } > }{code} > In particular, I had the problem after attaching an > AssignerWithPeriodicWatermark to a Kafka Source consuming AVRO SpecificRecord > and using Confluent Schema Registry. The assigned extracts the event time > from the record and enabling bookmarking (not sure whether this is related). > A simplified version of the application is here: > [https://github.com/nicusX/flink-avro-bug/blob/master/src/main/java/example/StreamJob.java] > > The problem looks similar to AVRO-1895 but that issue has been fixed since > AVRO 1.8.2. > In fact, the following code does deep-copy only relying on AVRO and does > work: > {code:java} > Sample s1 = Sample.newBuilder() > .setPrice(BigDecimal.valueOf(42.32)) > .setId("A12345") > .build(); > Sample s2 = Sample.newBuilder(s1).build();{code} > > Code of the two tests and simplified application: > [https://github.com/nicusX/flink-avro-bug|https://github.com/nicusX/flink-avro-bug/blob/master/src/main/java/example/StreamJob.java] > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-17486) ClassCastException when copying AVRO SpecificRecord containing a decimal field
[ https://issues.apache.org/jira/browse/FLINK-17486?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lorenzo Nicora updated FLINK-17486: --- Labels: AVRO confluent-kafka kafka (was: ) > ClassCastException when copying AVRO SpecificRecord containing a decimal field > -- > > Key: FLINK-17486 > URL: https://issues.apache.org/jira/browse/FLINK-17486 > Project: Flink > Issue Type: Bug > Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile) >Affects Versions: 1.10.0 > Environment: Flink 1.10.0 > AVRO 1.9.2 > Java 1.8.0 (but also Java 14) > Scala binary 2.11 >Reporter: Lorenzo Nicora >Priority: Critical > Labels: AVRO, confluent-kafka, kafka > > When consuming from a Kafka source AVRO SpecificRecord containing a > {{decimal}} (logical type) field, copying the record fails with: > {{java.lang.ClassCastException: class java.math.BigDecimal cannot be cast to > class java.nio.ByteBuffer}} > I understand the problem arises when Flink tries to make a deep-copy of the > record for checkpointing. > This code reproduces the problem > ([https://github.com/nicusX/flink-avro-bug/blob/master/src/test/java/example/TestDeepCopy.java]): > > {code:java} > AvroSerializer serializer = new AvroSerializer<>(Sample.class); > Sample s1 = Sample.newBuilder() > .setPrice(BigDecimal.valueOf(42.32)) > .setId("A12345") > .build(); > Sample s2 = serializer.copy(s1); > {code} > > > The AVRO SpecificRecord is generated from this IDL (using the > maven-avro-plugin): > {code:java} > @namespace("example.avro") > protocol SampleProtocol { > record Sample{ > string id; > decimal(9,2) price; > timestamp_ms eventTime; > } > }{code} > In particular, I had the problem after attaching an > AssignerWithPeriodicWatermark to a Kafka Source consuming AVRO SpecificRecord > and using Confluent Schema Registry. The assigned extracts the event time > from the record and enabling bookmarking (not sure whether this is related). > A simplified version of the application is here: > [https://github.com/nicusX/flink-avro-bug/blob/master/src/main/java/example/StreamJob.java] > > The problem looks similar to AVRO-1895 but that issue has been fixed since > AVRO 1.8.2. > In fact, the following code does deep-copy only relying on AVRO and does > work: > {code:java} > Sample s1 = Sample.newBuilder() > .setPrice(BigDecimal.valueOf(42.32)) > .setId("A12345") > .build(); > Sample s2 = Sample.newBuilder(s1).build();{code} > > Code of the two tests and simplified application: > [https://github.com/nicusX/flink-avro-bug|https://github.com/nicusX/flink-avro-bug/blob/master/src/main/java/example/StreamJob.java] > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-17486) ClassCastException when copying AVRO SpecificRecord containing a decimal field
[ https://issues.apache.org/jira/browse/FLINK-17486?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lorenzo Nicora updated FLINK-17486: --- Description: When consuming from a Kafka source AVRO SpecificRecord containing a {{decimal}} (logical type) field, copying the record fails with: {{java.lang.ClassCastException: class java.math.BigDecimal cannot be cast to class java.nio.ByteBuffer}} I understand the problem arises when Flink tries to make a deep-copy of the record for checkpointing. This code reproduces the problem ([https://github.com/nicusX/flink-avro-bug/blob/master/src/test/java/example/TestDeepCopy.java]): {code:java} AvroSerializer serializer = new AvroSerializer<>(Sample.class); Sample s1 = Sample.newBuilder() .setPrice(BigDecimal.valueOf(42.32)) .setId("A12345") .build(); Sample s2 = serializer.copy(s1); {code} The AVRO SpecificRecord is generated from this IDL (using the maven-avro-plugin): {code:java} @namespace("example.avro") protocol SampleProtocol { record Sample{ string id; decimal(9,2) price; timestamp_ms eventTime; } }{code} In particular, I had the problem after attaching an AssignerWithPeriodicWatermark to a Kafka Source consuming AVRO SpecificRecord and using Confluent Schema Registry. The assigned extracts the event time from the record and enabling bookmarking (not sure whether this is related). A simplified version of the application is here: [https://github.com/nicusX/flink-avro-bug/blob/master/src/main/java/example/StreamJob.java] The problem looks similar to AVRO-1895 but that issue has been fixed since AVRO 1.8.2. In fact, the following code does deep-copy only relying on AVRO and does work: {code:java} Sample s1 = Sample.newBuilder() .setPrice(BigDecimal.valueOf(42.32)) .setId("A12345") .build(); Sample s2 = Sample.newBuilder(s1).build();{code} Code of the two tests and simplified application: [https://github.com/nicusX/flink-avro-bug|https://github.com/nicusX/flink-avro-bug/blob/master/src/main/java/example/StreamJob.java] was: When consuming from a Kafka source AVRO SpecificRecord containing a {{decimal}} (logical type) field, copying the record fails with: {{java.lang.ClassCastException: class java.math.BigDecimal cannot be cast to class java.nio.ByteBuffer}} I understand the problem arises when Flink tries to make a deep-copy of the record for checkpointing. This code reproduces the problem: {{AvroSerializer serializer = new AvroSerializer<>(Sample.class);}} {{Sample s1 = Sample.newBuilder()}} {{ .setPrice(BigDecimal.valueOf(42.32))}} {{ .setId("A12345")}} {{ .build();}} {{Sample s2 = serializer.copy(s1);}} The AVRO SpecificRecord is generated using avro-maven-plugin from this IDL: {{@namespace("example.avro")}} {{protocol SampleProtocol {}} {{ record Sample{}} {{ string id;}} {{ decimal(9,2) price;}} {{ timestamp_ms eventTime;}} {{ }}} {{}}} In particular, I had the problem after attaching an AssignerWithPeriodicWatermark to a Kafka Source consuming AVRO SpecificRecord and using Confluent Schema Registry. The assigned extracts the event time from the record and enabling bookmarking (not sure whether this is related). A simplified version of the application is [here|[https://github.com/nicusX/flink-avro-bug/blob/master/src/main/java/example/StreamJob.java]]. The problem looks similar to AVRO-1895 but that issue has been fixed since AVRO 1.8.2. In fact, the following code does deep-copy only relying on AVRO and does work: {{Sample s1 = Sample.newBuilder()}} {{ .setPrice(BigDecimal.valueOf(42.32))}} {{ .setId("A12345")}} {{ .build();}} {{Sample s2 = Sample.newBuilder(s1).build();}} The code of the two tests and simplified application are [here|[https://github.com/nicusX/flink-avro-bug|https://github.com/nicusX/flink-avro-bug/blob/master/src/main/java/example/StreamJob.java]] > ClassCastException when copying AVRO SpecificRecord containing a decimal field > -- > > Key: FLINK-17486 > URL: https://issues.apache.org/jira/browse/FLINK-17486 > Project: Flink > Issue Type: Bug > Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile) >Affects Versions: 1.10.0 > Environment: Flink 1.10.0 > AVRO 1.9.2 > Java 1.8.0 (but also Java 14) > Scala binary 2.11 >Reporter: Lorenzo Nicora >Priority: Critical > > When consuming from a Kafka source AVRO SpecificRecord containing a > {{decimal}} (logical type) field, copying the record fails with: > {{java.lang.ClassCastException: class java.math.BigDecimal cannot be cast to > class java.nio.ByteBuffer}} > I understand the problem arises when Flink tries to make a deep-copy of the > record for checkpointing. > This code reproduces the problem >
[jira] [Updated] (FLINK-17486) ClassCastException when copying AVRO SpecificRecord containing a decimal field
[ https://issues.apache.org/jira/browse/FLINK-17486?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lorenzo Nicora updated FLINK-17486: --- Description: When consuming from a Kafka source AVRO SpecificRecord containing a {{decimal}} (logical type) field, copying the record fails with: {{java.lang.ClassCastException: class java.math.BigDecimal cannot be cast to class java.nio.ByteBuffer}} I understand the problem arises when Flink tries to make a deep-copy of the record for checkpointing. This code reproduces the problem: {{AvroSerializer serializer = new AvroSerializer<>(Sample.class);}} {{Sample s1 = Sample.newBuilder()}} {{ .setPrice(BigDecimal.valueOf(42.32))}} {{ .setId("A12345")}} {{ .build();}} {{Sample s2 = serializer.copy(s1);}} The AVRO SpecificRecord is generated using avro-maven-plugin from this IDL: {{@namespace("example.avro")}} {{protocol SampleProtocol {}} {{ record Sample{}} {{ string id;}} {{ decimal(9,2) price;}} {{ timestamp_ms eventTime;}} {{ }}} {{}}} In particular, I had the problem after attaching an AssignerWithPeriodicWatermark to a Kafka Source consuming AVRO SpecificRecord and using Confluent Schema Registry. The assigned extracts the event time from the record and enabling bookmarking (not sure whether this is related). A simplified version of the application is [here|[https://github.com/nicusX/flink-avro-bug/blob/master/src/main/java/example/StreamJob.java]]. The problem looks similar to AVRO-1895 but that issue has been fixed since AVRO 1.8.2. In fact, the following code does deep-copy only relying on AVRO and does work: {{Sample s1 = Sample.newBuilder()}} {{ .setPrice(BigDecimal.valueOf(42.32))}} {{ .setId("A12345")}} {{ .build();}} {{Sample s2 = Sample.newBuilder(s1).build();}} The code of the two tests and simplified application are [here|[https://github.com/nicusX/flink-avro-bug|https://github.com/nicusX/flink-avro-bug/blob/master/src/main/java/example/StreamJob.java]] was: When consuming from a Kafka source AVRO SpecificRecord containing a {{decimal}} (logical type) field, copying the record fails with: {{java.lang.ClassCastException: class java.math.BigDecimal cannot be cast to class java.nio.ByteBuffer}} This code reproduces the problem: {{AvroSerializer serializer = new AvroSerializer<>(Sample.class);}} {{Sample s1 = Sample.newBuilder()}} {{ .setPrice(BigDecimal.valueOf(42.32))}} {{ .setId("A12345")}} {{ .build();}} {{Sample s2 = serializer.copy(s1);}} The AVRO SpecificRecord is generated using avro-maven-plugin from this IDL: {{@namespace("example.avro")}} {{protocol SampleProtocol {}} {{ record Sample{}} {{ string id;}} {{ decimal(9,2) price;}} {{ timestamp_ms eventTime;}} {{ }}} {{}}} The deepCopy of the record happens behind the scenes when attaching an AssignerWithPeriodicWatermark to a Kafka Source consuming AVRO SpecificRecord and using Confluent Schema Registry. The assigned extracts the event time from the record and enabling bookmarking (not sure whether this is related) A simplified version of the application is [here|[https://github.com/nicusX/flink-avro-bug|https://github.com/nicusX/flink-avro-bug/blob/master/src/main/java/example/StreamJob.java]]. The problem looks similar to AVRO-1895 but that issue has been fixed since AVRO 1.8.2 (I'm using AVRO 1.9.2) In fact, the following code doing deepCopy and only relying on AVRO does work: {{Sample s1 = Sample.newBuilder()}} {{ .setPrice(BigDecimal.valueOf(42.32))}} {{ .setId("A12345")}} {{ .build();}} {{Sample s2 = Sample.newBuilder(s1).build();}} A simplified version of the Flink application causing the problem is [here|[https://github.com/nicusX/flink-avro-bug/blob/master/src/main/java/example/StreamJob.java]]. > ClassCastException when copying AVRO SpecificRecord containing a decimal field > -- > > Key: FLINK-17486 > URL: https://issues.apache.org/jira/browse/FLINK-17486 > Project: Flink > Issue Type: Bug > Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile) >Affects Versions: 1.10.0 > Environment: Flink 1.10.0 > AVRO 1.9.2 > Java 1.8.0 (but also Java 14) > Scala binary 2.11 >Reporter: Lorenzo Nicora >Priority: Critical > > When consuming from a Kafka source AVRO SpecificRecord containing a > {{decimal}} (logical type) field, copying the record fails with: > {{java.lang.ClassCastException: class java.math.BigDecimal cannot be cast to > class java.nio.ByteBuffer}} > I understand the problem arises when Flink tries to make a deep-copy of the > record for checkpointing. > This code reproduces the problem: > {{AvroSerializer serializer = new AvroSerializer<>(Sample.class);}} > {{Sample s1 = Sample.newBuilder()}} > {{
[jira] [Created] (FLINK-17486) ClassCastException when copying AVRO SpecificRecord containing a decimal field
Lorenzo Nicora created FLINK-17486: -- Summary: ClassCastException when copying AVRO SpecificRecord containing a decimal field Key: FLINK-17486 URL: https://issues.apache.org/jira/browse/FLINK-17486 Project: Flink Issue Type: Bug Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile) Affects Versions: 1.10.0 Environment: Flink 1.10.0 AVRO 1.9.2 Java 1.8.0 (but also Java 14) Scala binary 2.11 Reporter: Lorenzo Nicora When consuming from a Kafka source AVRO SpecificRecord containing a {{decimal}} (logical type) field, copying the record fails with: {{java.lang.ClassCastException: class java.math.BigDecimal cannot be cast to class java.nio.ByteBuffer}} This code reproduces the problem: {{AvroSerializer serializer = new AvroSerializer<>(Sample.class);}} {{Sample s1 = Sample.newBuilder()}} {{ .setPrice(BigDecimal.valueOf(42.32))}} {{ .setId("A12345")}} {{ .build();}} {{Sample s2 = serializer.copy(s1);}} The AVRO SpecificRecord is generated using avro-maven-plugin from this IDL: {{@namespace("example.avro")}} {{protocol SampleProtocol {}} {{ record Sample{}} {{ string id;}} {{ decimal(9,2) price;}} {{ timestamp_ms eventTime;}} {{ }}} {{}}} The deepCopy of the record happens behind the scenes when attaching an AssignerWithPeriodicWatermark to a Kafka Source consuming AVRO SpecificRecord and using Confluent Schema Registry. The assigned extracts the event time from the record and enabling bookmarking (not sure whether this is related) A simplified version of the application is [here|[https://github.com/nicusX/flink-avro-bug|https://github.com/nicusX/flink-avro-bug/blob/master/src/main/java/example/StreamJob.java]]. The problem looks similar to AVRO-1895 but that issue has been fixed since AVRO 1.8.2 (I'm using AVRO 1.9.2) In fact, the following code doing deepCopy and only relying on AVRO does work: {{Sample s1 = Sample.newBuilder()}} {{ .setPrice(BigDecimal.valueOf(42.32))}} {{ .setId("A12345")}} {{ .build();}} {{Sample s2 = Sample.newBuilder(s1).build();}} A simplified version of the Flink application causing the problem is [here|[https://github.com/nicusX/flink-avro-bug/blob/master/src/main/java/example/StreamJob.java]]. -- This message was sent by Atlassian Jira (v8.3.4#803005)