[jira] [Commented] (FLINK-24379) Support AWS Glue Schema Registry Avro for Table API

2023-12-29 Thread Lorenzo Nicora (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-24379?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17801142#comment-17801142
 ] 

Lorenzo Nicora commented on FLINK-24379:


I picked up the work and submitted a PR to flink-connector-aws repo.
[https://github.com/apache/flink-connector-aws/pull/122] 

[~dannycranmer] would you please have a look?

> Support AWS Glue Schema Registry Avro for Table API
> ---
>
> Key: FLINK-24379
> URL: https://issues.apache.org/jira/browse/FLINK-24379
> Project: Flink
>  Issue Type: Improvement
>  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile), Table 
> SQL / API
>Affects Versions: aws-connector-4.2.0
>Reporter: Brad Davis
>Assignee: Lorenzo Nicora
>Priority: Major
>  Labels: pull-request-available, stale-assigned
> Fix For: aws-connector-4.3.0
>
>
> Unlike most (all?) of the other Avro formats, the AWS Glue Schema Registry 
> version doesn't include a 
> META-INF/services/org.apache.flink.table.factories.Factory resource or a 
> class implementing 
> org.apache.flink.table.factories.DeserializationFormatFactory and 
> org.apache.flink.table.factories.SerializationFormatFactory.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-33152) Prometheus Sink Connector - Integration tests

2023-09-25 Thread Lorenzo Nicora (Jira)
Lorenzo Nicora created FLINK-33152:
--

 Summary: Prometheus Sink Connector - Integration tests
 Key: FLINK-33152
 URL: https://issues.apache.org/jira/browse/FLINK-33152
 Project: Flink
  Issue Type: Sub-task
Reporter: Lorenzo Nicora


Integration tests against containerised Prometheus



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-33140) Prometheus Sink Connector - E2E example on AWS

2023-09-25 Thread Lorenzo Nicora (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33140?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lorenzo Nicora updated FLINK-33140:
---
Summary: Prometheus Sink Connector - E2E example on AWS  (was: Prometheus 
Sink Connector - E2E example)

> Prometheus Sink Connector - E2E example on AWS
> --
>
> Key: FLINK-33140
> URL: https://issues.apache.org/jira/browse/FLINK-33140
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Lorenzo Nicora
>Priority: Major
>
> End-to-end example application, to be deployed on Amazon Managed Service for 
> Apache Flink, and writing to Amazon Managed Prometheus



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-33140) Prometheus Sink Connector - E2E example on AWS

2023-09-25 Thread Lorenzo Nicora (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33140?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lorenzo Nicora updated FLINK-33140:
---
Description: End-to-end example application, deployable on Amazon Managed 
Service for Apache Flink, and writing to Amazon Managed Prometheus  (was: 
End-to-end example application, to be deployed on Amazon Managed Service for 
Apache Flink, and writing to Amazon Managed Prometheus)

> Prometheus Sink Connector - E2E example on AWS
> --
>
> Key: FLINK-33140
> URL: https://issues.apache.org/jira/browse/FLINK-33140
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Lorenzo Nicora
>Priority: Major
>
> End-to-end example application, deployable on Amazon Managed Service for 
> Apache Flink, and writing to Amazon Managed Prometheus



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-33140) Prometheus Sink Connector - E2E example

2023-09-25 Thread Lorenzo Nicora (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33140?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lorenzo Nicora updated FLINK-33140:
---
Summary: Prometheus Sink Connector - E2E example  (was: Prometheus Sink 
Connector - E2E test)

> Prometheus Sink Connector - E2E example
> ---
>
> Key: FLINK-33140
> URL: https://issues.apache.org/jira/browse/FLINK-33140
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Lorenzo Nicora
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-33140) Prometheus Sink Connector - E2E example

2023-09-25 Thread Lorenzo Nicora (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33140?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lorenzo Nicora updated FLINK-33140:
---
Description: End-to-end example application, to be deployed on Amazon 
Managed Service for Apache Flink, and writing to Amazon Managed Prometheus

> Prometheus Sink Connector - E2E example
> ---
>
> Key: FLINK-33140
> URL: https://issues.apache.org/jira/browse/FLINK-33140
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Lorenzo Nicora
>Priority: Major
>
> End-to-end example application, to be deployed on Amazon Managed Service for 
> Apache Flink, and writing to Amazon Managed Prometheus



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-33137) FLIP-312: Prometheus Sink Connector

2023-09-24 Thread Lorenzo Nicora (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33137?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lorenzo Nicora updated FLINK-33137:
---
Labels: Connector  (was: )

> FLIP-312: Prometheus Sink Connector
> ---
>
> Key: FLINK-33137
> URL: https://issues.apache.org/jira/browse/FLINK-33137
> Project: Flink
>  Issue Type: New Feature
>Reporter: Lorenzo Nicora
>Priority: Major
>  Labels: Connector
>
> Umbrella Jira for implementation of Prometheus Sink Connector
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-312:+Prometheus+Sink+Connector



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-33141) Promentheus Sink Connector - Amazon Managed Prometheus Request Signer

2023-09-24 Thread Lorenzo Nicora (Jira)
Lorenzo Nicora created FLINK-33141:
--

 Summary: Promentheus Sink Connector - Amazon Managed Prometheus 
Request Signer
 Key: FLINK-33141
 URL: https://issues.apache.org/jira/browse/FLINK-33141
 Project: Flink
  Issue Type: Sub-task
  Components: Connectors / AWS
Reporter: Lorenzo Nicora






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-33142) Prometheus Sink Connector - Update Documentation

2023-09-24 Thread Lorenzo Nicora (Jira)
Lorenzo Nicora created FLINK-33142:
--

 Summary: Prometheus Sink Connector - Update Documentation
 Key: FLINK-33142
 URL: https://issues.apache.org/jira/browse/FLINK-33142
 Project: Flink
  Issue Type: Sub-task
  Components: Documentation
Reporter: Lorenzo Nicora






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-33140) Prometheus Sink Connector - E2E test

2023-09-24 Thread Lorenzo Nicora (Jira)
Lorenzo Nicora created FLINK-33140:
--

 Summary: Prometheus Sink Connector - E2E test
 Key: FLINK-33140
 URL: https://issues.apache.org/jira/browse/FLINK-33140
 Project: Flink
  Issue Type: Sub-task
Reporter: Lorenzo Nicora






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-33139) Prometheus Sink Connector - Table API support

2023-09-24 Thread Lorenzo Nicora (Jira)
Lorenzo Nicora created FLINK-33139:
--

 Summary: Prometheus Sink Connector - Table API support
 Key: FLINK-33139
 URL: https://issues.apache.org/jira/browse/FLINK-33139
 Project: Flink
  Issue Type: Sub-task
Reporter: Lorenzo Nicora






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-33138) Prometheus Connector Sink - DataStream API implementation

2023-09-24 Thread Lorenzo Nicora (Jira)
Lorenzo Nicora created FLINK-33138:
--

 Summary: Prometheus Connector Sink - DataStream API implementation
 Key: FLINK-33138
 URL: https://issues.apache.org/jira/browse/FLINK-33138
 Project: Flink
  Issue Type: Sub-task
Reporter: Lorenzo Nicora






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-33137) FLIP-312: Prometheus Sink Connector

2023-09-24 Thread Lorenzo Nicora (Jira)
Lorenzo Nicora created FLINK-33137:
--

 Summary: FLIP-312: Prometheus Sink Connector
 Key: FLINK-33137
 URL: https://issues.apache.org/jira/browse/FLINK-33137
 Project: Flink
  Issue Type: New Feature
Reporter: Lorenzo Nicora


Umbrella Jira for implementation of Prometheus Sink Connector
https://cwiki.apache.org/confluence/display/FLINK/FLIP-312:+Prometheus+Sink+Connector



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-18223) AvroSerializer does not correctly instantiate GenericRecord

2020-06-10 Thread Lorenzo Nicora (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-18223?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17130894#comment-17130894
 ] 

Lorenzo Nicora commented on FLINK-18223:


I submitted a PR but the bot complains the ticket has not been assigned.
Not sure what's the process for a simple bugfix like this

> AvroSerializer does not correctly instantiate GenericRecord
> ---
>
> Key: FLINK-18223
> URL: https://issues.apache.org/jira/browse/FLINK-18223
> Project: Flink
>  Issue Type: Bug
>  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
>Affects Versions: 1.10.1
>Reporter: Lorenzo Nicora
>Priority: Major
>  Labels: AVRO, pull-request-available
>
> {{AvroSerializer.createInstance()}} simply calls 
> {{InstantiationUtil.instantiate(type)}} to create a new instance, also when 
> type is GenericRecord.
> This fails with an exception, because a GenericRecord must be instantiated 
> through {{GenericRecordBuilder}} but {{InstantiationUtil}} is not aware of it.
> {code:java}
> The class 'org.apache.avro.generic.GenericRecord' is not instantiable: The 
> class is not a proper class. It is either abstract, an interface, or a 
> primitive type.{code}
> This can be proven with this test
> {code:java}
> @Test
> public void shouldInstantiateGenericRecord() {
> org.apache.avro.Schema SCHEMA = new 
> org.apache.avro.Schema.Parser().parse("{\"type\":\"record\",\"name\":\"Dummy\",\"namespace\":\"dummy\",\"fields\":[{\"name\":\"something\",\"type\":{\"type\":\"string\",\"avro.java.string\":\"String\"}}]}");
> AvroSerializer serializer = new 
> AvroSerializer<>(GenericRecord.class, SCHEMA);
> serializer.createInstance();
> }
> {code}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-18223) AvroSerializer does not correctly instantiate GenericRecord

2020-06-10 Thread Lorenzo Nicora (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-18223?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17130570#comment-17130570
 ] 

Lorenzo Nicora commented on FLINK-18223:


This is what I'm trying to do, [~aljoscha]. I have the fix ready and tested.
But I cannot make the full Flink test suite (mvn clean verify) pass. It looks 
like it's randomly failing on completely unrelated modules, while the same 
tests are not failing if I run them in IntelliJ :-?

> AvroSerializer does not correctly instantiate GenericRecord
> ---
>
> Key: FLINK-18223
> URL: https://issues.apache.org/jira/browse/FLINK-18223
> Project: Flink
>  Issue Type: Bug
>  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
>Affects Versions: 1.10.1
>Reporter: Lorenzo Nicora
>Priority: Major
>  Labels: AVRO
>
> {{AvroSerializer.createInstance()}} simply calls 
> {{InstantiationUtil.instantiate(type)}} to create a new instance, also when 
> type is GenericRecord.
> This fails with an exception, because a GenericRecord must be instantiated 
> through {{GenericRecordBuilder}} but {{InstantiationUtil}} is not aware of it.
> {code:java}
> The class 'org.apache.avro.generic.GenericRecord' is not instantiable: The 
> class is not a proper class. It is either abstract, an interface, or a 
> primitive type.{code}
> This can be proven with this test
> {code:java}
> @Test
> public void shouldInstantiateGenericRecord() {
> org.apache.avro.Schema SCHEMA = new 
> org.apache.avro.Schema.Parser().parse("{\"type\":\"record\",\"name\":\"Dummy\",\"namespace\":\"dummy\",\"fields\":[{\"name\":\"something\",\"type\":{\"type\":\"string\",\"avro.java.string\":\"String\"}}]}");
> AvroSerializer serializer = new 
> AvroSerializer<>(GenericRecord.class, SCHEMA);
> serializer.createInstance();
> }
> {code}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-18223) AvroSerializer does not correctly instantiate GenericRecord

2020-06-09 Thread Lorenzo Nicora (Jira)
Lorenzo Nicora created FLINK-18223:
--

 Summary: AvroSerializer does not correctly instantiate 
GenericRecord
 Key: FLINK-18223
 URL: https://issues.apache.org/jira/browse/FLINK-18223
 Project: Flink
  Issue Type: Bug
  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
Affects Versions: 1.10.1
Reporter: Lorenzo Nicora


{{AvroSerializer.createInstance()}} simply calls 
{{InstantiationUtil.instantiate(type)}} to create a new instance, also when 
type is GenericRecord.

This fails with an exception, because a GenericRecord must be instantiated 
through {{GenericRecordBuilder}} but {{InstantiationUtil}} is not aware of it.
{code:java}
The class 'org.apache.avro.generic.GenericRecord' is not instantiable: The 
class is not a proper class. It is either abstract, an interface, or a 
primitive type.{code}
This can be proven with this test
{code:java}
@Test
public void shouldInstantiateGenericRecord() {
org.apache.avro.Schema SCHEMA = new 
org.apache.avro.Schema.Parser().parse("{\"type\":\"record\",\"name\":\"Dummy\",\"namespace\":\"dummy\",\"fields\":[{\"name\":\"something\",\"type\":{\"type\":\"string\",\"avro.java.string\":\"String\"}}]}");
AvroSerializer serializer = new 
AvroSerializer<>(GenericRecord.class, SCHEMA);

serializer.createInstance();
}
{code}
 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-17486) ClassCastException when checkpointing AVRO SpecificRecord with decimal fields

2020-05-04 Thread Lorenzo Nicora (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-17486?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lorenzo Nicora updated FLINK-17486:
---
Summary: ClassCastException when checkpointing AVRO SpecificRecord with 
decimal fields  (was: ClassCastException when copying AVRO SpecificRecord 
containing a decimal field)

> ClassCastException when checkpointing AVRO SpecificRecord with decimal fields
> -
>
> Key: FLINK-17486
> URL: https://issues.apache.org/jira/browse/FLINK-17486
> Project: Flink
>  Issue Type: Bug
>  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
>Affects Versions: 1.10.0
> Environment: Flink 1.10.0
> AVRO 1.9.2
> Java 1.8.0 (but also Java 14)
> Scala binary 2.11
>Reporter: Lorenzo Nicora
>Priority: Critical
>  Labels: AVRO, confluent-kafka, kafka
>
> When consuming from a Kafka source AVRO SpecificRecord containing a 
> {{decimal}} (logical type) field, copying the record fails with:
> {{java.lang.ClassCastException: class java.math.BigDecimal cannot be cast to 
> class java.nio.ByteBuffer}}
> I understand the problem arises when Flink tries to make a deep-copy of the 
> record for checkpointing.
> This code reproduces the problem 
> ([https://github.com/nicusX/flink-avro-bug/blob/master/src/test/java/example/TestDeepCopy.java]):
>  
> {code:java}
> AvroSerializer serializer = new AvroSerializer<>(Sample.class);
> Sample s1 = Sample.newBuilder()
>    .setPrice(BigDecimal.valueOf(42.32))
>    .setId("A12345")
>    .build();
> Sample s2 = serializer.copy(s1);
> {code}
>  
>  
> The AVRO SpecificRecord is generated from this IDL (using the 
> maven-avro-plugin):
> {code:java}
> @namespace("example.avro")
>  protocol SampleProtocol {
>    record Sample{
>      string id;
>      decimal(9,2) price;
>      timestamp_ms eventTime;
>     }
>  }{code}
> In particular, I had the problem after attaching an 
> AssignerWithPeriodicWatermark to a Kafka Source consuming AVRO SpecificRecord 
> and using Confluent Schema Registry. The assigned extracts the event time 
> from the record and enabling bookmarking (not sure whether this is related).
>  A simplified version of the application is here: 
> [https://github.com/nicusX/flink-avro-bug/blob/master/src/main/java/example/StreamJob.java]
>  
> The problem looks similar to AVRO-1895 but that issue has been fixed since 
> AVRO 1.8.2.
> In fact, the following code does deep-copy only relying on AVRO and does 
> work:  
> {code:java}
> Sample s1 = Sample.newBuilder()
>    .setPrice(BigDecimal.valueOf(42.32))
>    .setId("A12345")
>    .build();
>  Sample s2 = Sample.newBuilder(s1).build();{code}
>  
> Code of the two tests and simplified application: 
> [https://github.com/nicusX/flink-avro-bug|https://github.com/nicusX/flink-avro-bug/blob/master/src/main/java/example/StreamJob.java]
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-17486) ClassCastException when copying AVRO SpecificRecord containing a decimal field

2020-05-01 Thread Lorenzo Nicora (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-17486?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lorenzo Nicora updated FLINK-17486:
---
Labels: AVRO confluent-kafka kafka  (was: )

> ClassCastException when copying AVRO SpecificRecord containing a decimal field
> --
>
> Key: FLINK-17486
> URL: https://issues.apache.org/jira/browse/FLINK-17486
> Project: Flink
>  Issue Type: Bug
>  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
>Affects Versions: 1.10.0
> Environment: Flink 1.10.0
> AVRO 1.9.2
> Java 1.8.0 (but also Java 14)
> Scala binary 2.11
>Reporter: Lorenzo Nicora
>Priority: Critical
>  Labels: AVRO, confluent-kafka, kafka
>
> When consuming from a Kafka source AVRO SpecificRecord containing a 
> {{decimal}} (logical type) field, copying the record fails with:
> {{java.lang.ClassCastException: class java.math.BigDecimal cannot be cast to 
> class java.nio.ByteBuffer}}
> I understand the problem arises when Flink tries to make a deep-copy of the 
> record for checkpointing.
> This code reproduces the problem 
> ([https://github.com/nicusX/flink-avro-bug/blob/master/src/test/java/example/TestDeepCopy.java]):
>  
> {code:java}
> AvroSerializer serializer = new AvroSerializer<>(Sample.class);
> Sample s1 = Sample.newBuilder()
>    .setPrice(BigDecimal.valueOf(42.32))
>    .setId("A12345")
>    .build();
> Sample s2 = serializer.copy(s1);
> {code}
>  
>  
> The AVRO SpecificRecord is generated from this IDL (using the 
> maven-avro-plugin):
> {code:java}
> @namespace("example.avro")
>  protocol SampleProtocol {
>    record Sample{
>      string id;
>      decimal(9,2) price;
>      timestamp_ms eventTime;
>     }
>  }{code}
> In particular, I had the problem after attaching an 
> AssignerWithPeriodicWatermark to a Kafka Source consuming AVRO SpecificRecord 
> and using Confluent Schema Registry. The assigned extracts the event time 
> from the record and enabling bookmarking (not sure whether this is related).
>  A simplified version of the application is here: 
> [https://github.com/nicusX/flink-avro-bug/blob/master/src/main/java/example/StreamJob.java]
>  
> The problem looks similar to AVRO-1895 but that issue has been fixed since 
> AVRO 1.8.2.
> In fact, the following code does deep-copy only relying on AVRO and does 
> work:  
> {code:java}
> Sample s1 = Sample.newBuilder()
>    .setPrice(BigDecimal.valueOf(42.32))
>    .setId("A12345")
>    .build();
>  Sample s2 = Sample.newBuilder(s1).build();{code}
>  
> Code of the two tests and simplified application: 
> [https://github.com/nicusX/flink-avro-bug|https://github.com/nicusX/flink-avro-bug/blob/master/src/main/java/example/StreamJob.java]
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-17486) ClassCastException when copying AVRO SpecificRecord containing a decimal field

2020-05-01 Thread Lorenzo Nicora (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-17486?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lorenzo Nicora updated FLINK-17486:
---
Description: 
When consuming from a Kafka source AVRO SpecificRecord containing a {{decimal}} 
(logical type) field, copying the record fails with:

{{java.lang.ClassCastException: class java.math.BigDecimal cannot be cast to 
class java.nio.ByteBuffer}}

I understand the problem arises when Flink tries to make a deep-copy of the 
record for checkpointing.

This code reproduces the problem 
([https://github.com/nicusX/flink-avro-bug/blob/master/src/test/java/example/TestDeepCopy.java]):

 
{code:java}
AvroSerializer serializer = new AvroSerializer<>(Sample.class);
Sample s1 = Sample.newBuilder()
   .setPrice(BigDecimal.valueOf(42.32))
   .setId("A12345")
   .build();
Sample s2 = serializer.copy(s1);
{code}
 

 

The AVRO SpecificRecord is generated from this IDL (using the 
maven-avro-plugin):
{code:java}
@namespace("example.avro")
 protocol SampleProtocol {
   record Sample{
     string id;
     decimal(9,2) price;
     timestamp_ms eventTime;
    }
 }{code}
In particular, I had the problem after attaching an 
AssignerWithPeriodicWatermark to a Kafka Source consuming AVRO SpecificRecord 
and using Confluent Schema Registry. The assigned extracts the event time from 
the record and enabling bookmarking (not sure whether this is related).
 A simplified version of the application is here: 
[https://github.com/nicusX/flink-avro-bug/blob/master/src/main/java/example/StreamJob.java]

 

The problem looks similar to AVRO-1895 but that issue has been fixed since AVRO 
1.8.2.

In fact, the following code does deep-copy only relying on AVRO and does work:  
{code:java}
Sample s1 = Sample.newBuilder()
   .setPrice(BigDecimal.valueOf(42.32))
   .setId("A12345")
   .build();
 Sample s2 = Sample.newBuilder(s1).build();{code}
 

Code of the two tests and simplified application: 
[https://github.com/nicusX/flink-avro-bug|https://github.com/nicusX/flink-avro-bug/blob/master/src/main/java/example/StreamJob.java]

 

  was:
When consuming from a Kafka source AVRO SpecificRecord containing a {{decimal}} 
(logical type) field, copying the record fails with:

{{java.lang.ClassCastException: class java.math.BigDecimal cannot be cast to 
class java.nio.ByteBuffer}}

I understand the problem arises when Flink tries to make a deep-copy of the 
record for checkpointing.

This code reproduces the problem:

{{AvroSerializer serializer = new AvroSerializer<>(Sample.class);}}

{{Sample s1 = Sample.newBuilder()}}
 {{  .setPrice(BigDecimal.valueOf(42.32))}}
 {{  .setId("A12345")}}
 {{  .build();}}

{{Sample s2 = serializer.copy(s1);}}

 

The AVRO SpecificRecord is generated using avro-maven-plugin from this IDL:

{{@namespace("example.avro")}}
 {{protocol SampleProtocol {}}
 {{  record Sample{}}
 {{    string id;}}
 {{    decimal(9,2) price;}}
 {{    timestamp_ms eventTime;}}
 {{   }}}
 {{}}}

In particular, I had the problem after attaching an 
AssignerWithPeriodicWatermark to a Kafka Source consuming AVRO SpecificRecord 
and using Confluent Schema Registry. The assigned extracts the event time from 
the record and enabling bookmarking (not sure whether this is related).
A simplified version of the application is 
[here|[https://github.com/nicusX/flink-avro-bug/blob/master/src/main/java/example/StreamJob.java]].

 

The problem looks similar to AVRO-1895 but that issue has been fixed since AVRO 
1.8.2.

In fact, the following code does deep-copy only relying on AVRO and does work:  

{{Sample s1 = Sample.newBuilder()}}
 {{  .setPrice(BigDecimal.valueOf(42.32))}}
 {{  .setId("A12345")}}
 {{  .build();}}
 {{Sample s2 = Sample.newBuilder(s1).build();}}

 

The code of the two tests and simplified application are  
[here|[https://github.com/nicusX/flink-avro-bug|https://github.com/nicusX/flink-avro-bug/blob/master/src/main/java/example/StreamJob.java]]

 


> ClassCastException when copying AVRO SpecificRecord containing a decimal field
> --
>
> Key: FLINK-17486
> URL: https://issues.apache.org/jira/browse/FLINK-17486
> Project: Flink
>  Issue Type: Bug
>  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
>Affects Versions: 1.10.0
> Environment: Flink 1.10.0
> AVRO 1.9.2
> Java 1.8.0 (but also Java 14)
> Scala binary 2.11
>Reporter: Lorenzo Nicora
>Priority: Critical
>
> When consuming from a Kafka source AVRO SpecificRecord containing a 
> {{decimal}} (logical type) field, copying the record fails with:
> {{java.lang.ClassCastException: class java.math.BigDecimal cannot be cast to 
> class java.nio.ByteBuffer}}
> I understand the problem arises when Flink tries to make a deep-copy of the 
> record for checkpointing.
> This code reproduces the problem 
> 

[jira] [Updated] (FLINK-17486) ClassCastException when copying AVRO SpecificRecord containing a decimal field

2020-05-01 Thread Lorenzo Nicora (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-17486?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lorenzo Nicora updated FLINK-17486:
---
Description: 
When consuming from a Kafka source AVRO SpecificRecord containing a {{decimal}} 
(logical type) field, copying the record fails with:

{{java.lang.ClassCastException: class java.math.BigDecimal cannot be cast to 
class java.nio.ByteBuffer}}

I understand the problem arises when Flink tries to make a deep-copy of the 
record for checkpointing.

This code reproduces the problem:

{{AvroSerializer serializer = new AvroSerializer<>(Sample.class);}}

{{Sample s1 = Sample.newBuilder()}}
 {{  .setPrice(BigDecimal.valueOf(42.32))}}
 {{  .setId("A12345")}}
 {{  .build();}}

{{Sample s2 = serializer.copy(s1);}}

 

The AVRO SpecificRecord is generated using avro-maven-plugin from this IDL:

{{@namespace("example.avro")}}
 {{protocol SampleProtocol {}}
 {{  record Sample{}}
 {{    string id;}}
 {{    decimal(9,2) price;}}
 {{    timestamp_ms eventTime;}}
 {{   }}}
 {{}}}

In particular, I had the problem after attaching an 
AssignerWithPeriodicWatermark to a Kafka Source consuming AVRO SpecificRecord 
and using Confluent Schema Registry. The assigned extracts the event time from 
the record and enabling bookmarking (not sure whether this is related).
A simplified version of the application is 
[here|[https://github.com/nicusX/flink-avro-bug/blob/master/src/main/java/example/StreamJob.java]].

 

The problem looks similar to AVRO-1895 but that issue has been fixed since AVRO 
1.8.2.

In fact, the following code does deep-copy only relying on AVRO and does work:  

{{Sample s1 = Sample.newBuilder()}}
 {{  .setPrice(BigDecimal.valueOf(42.32))}}
 {{  .setId("A12345")}}
 {{  .build();}}
 {{Sample s2 = Sample.newBuilder(s1).build();}}

 

The code of the two tests and simplified application are  
[here|[https://github.com/nicusX/flink-avro-bug|https://github.com/nicusX/flink-avro-bug/blob/master/src/main/java/example/StreamJob.java]]

 

  was:
When consuming from a Kafka source AVRO SpecificRecord containing a {{decimal}} 
(logical type) field, copying the record fails with:

{{java.lang.ClassCastException: class java.math.BigDecimal cannot be cast to 
class java.nio.ByteBuffer}}

 

This code reproduces the problem:

{{AvroSerializer serializer = new AvroSerializer<>(Sample.class);}}

{{Sample s1 = Sample.newBuilder()}}
 {{  .setPrice(BigDecimal.valueOf(42.32))}}
 {{  .setId("A12345")}}
 {{  .build();}}

{{Sample s2 = serializer.copy(s1);}}

 

The AVRO SpecificRecord is generated using avro-maven-plugin from this IDL:

{{@namespace("example.avro")}}
{{protocol SampleProtocol {}}
{{  record Sample{}}
{{    string id;}}
{{    decimal(9,2) price;}}
{{    timestamp_ms eventTime;}}
{{   }}}
{{}}}

 

The deepCopy of the record happens behind the scenes when attaching an 
AssignerWithPeriodicWatermark to a Kafka Source consuming AVRO SpecificRecord 
and using Confluent Schema Registry. The assigned extracts the event time from 
the record and enabling bookmarking (not sure whether this is related)

A simplified version of the application is 
[here|[https://github.com/nicusX/flink-avro-bug|https://github.com/nicusX/flink-avro-bug/blob/master/src/main/java/example/StreamJob.java]].

 

The problem looks similar to AVRO-1895 but that issue has been fixed since AVRO 
1.8.2 (I'm using AVRO 1.9.2)

In fact, the following code doing deepCopy and only relying on AVRO does work: 

 

{{Sample s1 = Sample.newBuilder()}}
 {{  .setPrice(BigDecimal.valueOf(42.32))}}
 {{  .setId("A12345")}}
 {{  .build();}}
 {{Sample s2 = Sample.newBuilder(s1).build();}}

 

A simplified version of the Flink application causing the problem is 
[here|[https://github.com/nicusX/flink-avro-bug/blob/master/src/main/java/example/StreamJob.java]].

 

 


> ClassCastException when copying AVRO SpecificRecord containing a decimal field
> --
>
> Key: FLINK-17486
> URL: https://issues.apache.org/jira/browse/FLINK-17486
> Project: Flink
>  Issue Type: Bug
>  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
>Affects Versions: 1.10.0
> Environment: Flink 1.10.0
> AVRO 1.9.2
> Java 1.8.0 (but also Java 14)
> Scala binary 2.11
>Reporter: Lorenzo Nicora
>Priority: Critical
>
> When consuming from a Kafka source AVRO SpecificRecord containing a 
> {{decimal}} (logical type) field, copying the record fails with:
> {{java.lang.ClassCastException: class java.math.BigDecimal cannot be cast to 
> class java.nio.ByteBuffer}}
> I understand the problem arises when Flink tries to make a deep-copy of the 
> record for checkpointing.
> This code reproduces the problem:
> {{AvroSerializer serializer = new AvroSerializer<>(Sample.class);}}
> {{Sample s1 = Sample.newBuilder()}}
>  {{  

[jira] [Created] (FLINK-17486) ClassCastException when copying AVRO SpecificRecord containing a decimal field

2020-04-30 Thread Lorenzo Nicora (Jira)
Lorenzo Nicora created FLINK-17486:
--

 Summary: ClassCastException when copying AVRO SpecificRecord 
containing a decimal field
 Key: FLINK-17486
 URL: https://issues.apache.org/jira/browse/FLINK-17486
 Project: Flink
  Issue Type: Bug
  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
Affects Versions: 1.10.0
 Environment: Flink 1.10.0

AVRO 1.9.2

Java 1.8.0 (but also Java 14)

Scala binary 2.11
Reporter: Lorenzo Nicora


When consuming from a Kafka source AVRO SpecificRecord containing a {{decimal}} 
(logical type) field, copying the record fails with:

{{java.lang.ClassCastException: class java.math.BigDecimal cannot be cast to 
class java.nio.ByteBuffer}}

 

This code reproduces the problem:

{{AvroSerializer serializer = new AvroSerializer<>(Sample.class);}}

{{Sample s1 = Sample.newBuilder()}}
 {{  .setPrice(BigDecimal.valueOf(42.32))}}
 {{  .setId("A12345")}}
 {{  .build();}}

{{Sample s2 = serializer.copy(s1);}}

 

The AVRO SpecificRecord is generated using avro-maven-plugin from this IDL:

{{@namespace("example.avro")}}
{{protocol SampleProtocol {}}
{{  record Sample{}}
{{    string id;}}
{{    decimal(9,2) price;}}
{{    timestamp_ms eventTime;}}
{{   }}}
{{}}}

 

The deepCopy of the record happens behind the scenes when attaching an 
AssignerWithPeriodicWatermark to a Kafka Source consuming AVRO SpecificRecord 
and using Confluent Schema Registry. The assigned extracts the event time from 
the record and enabling bookmarking (not sure whether this is related)

A simplified version of the application is 
[here|[https://github.com/nicusX/flink-avro-bug|https://github.com/nicusX/flink-avro-bug/blob/master/src/main/java/example/StreamJob.java]].

 

The problem looks similar to AVRO-1895 but that issue has been fixed since AVRO 
1.8.2 (I'm using AVRO 1.9.2)

In fact, the following code doing deepCopy and only relying on AVRO does work: 

 

{{Sample s1 = Sample.newBuilder()}}
 {{  .setPrice(BigDecimal.valueOf(42.32))}}
 {{  .setId("A12345")}}
 {{  .build();}}
 {{Sample s2 = Sample.newBuilder(s1).build();}}

 

A simplified version of the Flink application causing the problem is 
[here|[https://github.com/nicusX/flink-avro-bug/blob/master/src/main/java/example/StreamJob.java]].

 

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)