[jira] [Commented] (FLINK-11030) Cannot use Avro logical types with ConfluentRegistryAvroDeserializationSchema

2019-10-08 Thread Jayant Ameta (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-11030?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16947398#comment-16947398
 ] 

Jayant Ameta commented on FLINK-11030:
--

Is there any update/workaround for this?

> Cannot use Avro logical types with ConfluentRegistryAvroDeserializationSchema
> -
>
> Key: FLINK-11030
> URL: https://issues.apache.org/jira/browse/FLINK-11030
> Project: Flink
>  Issue Type: Bug
>  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
>Affects Versions: 1.6.2
>Reporter: Maciej Bryński
>Assignee: Aljoscha Krettek
>Priority: Major
>  Labels: pull-request-available
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> I created Specific class for Kafka topic. 
>  Avro schema includes logicalTypes.
>  Then I want to read data using following code:
> {code:scala}
> val deserializationSchema = 
> ConfluentRegistryAvroDeserializationSchema.forSpecific(classOf[mySpecificClass],
>  schemaRegistryUrl)
> val kafkaStream = env.addSource(
>   new FlinkKafkaConsumer011(topic, deserializationSchema, kafkaProperties)
> )
> kafkaStream.print()
> {code}
>  Result:
>  {code}
>  Exception in thread "main" 
> org.apache.flink.runtime.client.JobExecutionException: 
> java.lang.ClassCastException: java.lang.Long cannot be cast to 
> org.joda.time.DateTime
>  at 
> org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:623)
>  at 
> org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:123)
>  at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1511)
>  at 
> org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:645)
>  at TransactionEnrichment$.main(TransactionEnrichment.scala:50)
>  at TransactionEnrichment.main(TransactionEnrichment.scala)
>  Caused by: java.lang.ClassCastException: java.lang.Long cannot be cast to 
> org.joda.time.DateTime
>  at platform_tbl_game_transactions_v1.Value.put(Value.java:222)
>  at org.apache.avro.generic.GenericData.setField(GenericData.java:690)
>  at 
> org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:119)
>  at 
> org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222)
>  at 
> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175)
>  at 
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
>  at 
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:145)
>  at 
> org.apache.flink.formats.avro.RegistryAvroDeserializationSchema.deserialize(RegistryAvroDeserializationSchema.java:74)
>  at 
> org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper.deserialize(KeyedDeserializationSchemaWrapper.java:44)
>  at 
> org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:142)
>  at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:738)
>  at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:87)
>  at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:56)
>  at 
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:99)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
>  at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>  at java.lang.Thread.run(Thread.java:748)
>  {code}
>  When using Kafka Consumer there was a hack for this to use LogicalConverters.
>  Unfortunately it's not working in flink.
>  {code}
>  SpecificData.get.addLogicalTypeConversion(new 
> TimeConversions.TimestampConversion)
>  {code}
> Problem probably is cause by the fact we're creating own instance of 
> SpecificData
> https://github.com/apache/flink/blob/master/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroDeserializationSchema.java#L145
> And there is no logical conversions added.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-10310) Cassandra Sink - Handling failing requests

2018-09-17 Thread Jayant Ameta (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10310?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16618447#comment-16618447
 ] 

Jayant Ameta commented on FLINK-10310:
--

Hey [~till.rohrmann], do you want to discuss this further?

> Cassandra Sink - Handling failing requests
> --
>
> Key: FLINK-10310
> URL: https://issues.apache.org/jira/browse/FLINK-10310
> Project: Flink
>  Issue Type: Improvement
>  Components: Cassandra Connector
>Reporter: Jayant Ameta
>Priority: Major
>
> The cassandra sink fails for any kind of error. For some exceptions (e.g 
> WriteTimeoutException), ignoring the exception may be acceptable as well.
> Can we discuss having a FailureHandler on the lines of 
> ActionRequestFailureHandler?
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/WriteTimeoutException-in-Cassandra-sink-kill-the-job-tp22706p22707.html



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10310) Cassandra Sink - Handling failing requests

2018-09-13 Thread Jayant Ameta (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10310?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16614371#comment-16614371
 ] 

Jayant Ameta commented on FLINK-10310:
--

Hi [~till.rohrmann]

{{CassandraSinkBase}} would have a field similar to 
{{ActionRequestFailureHandler}}.

In the {{checkAsyncErrors}} method, the failureHandler would be called instead 
of throwing the {{IOException}}

Current code snippet
{code:java}
private void checkAsyncErrors() throws Exception {
Throwable error = exception;
if (error != null) {
// prevent throwing duplicated error
exception = null;
throw new IOException("Error while sending value.", 
error);
}
}
{code}

would change to:
{code:java}
private void checkAsyncErrors() throws Exception {
Throwable error = exception;
if (error != null) {
failureHandler.onFailure(error);
}
}
{code}

Here the {{failureHandler}} can decide what steps to take based on the 
{{Throwable}}.


> Cassandra Sink - Handling failing requests
> --
>
> Key: FLINK-10310
> URL: https://issues.apache.org/jira/browse/FLINK-10310
> Project: Flink
>  Issue Type: Improvement
>  Components: Cassandra Connector
>Reporter: Jayant Ameta
>Priority: Major
>
> The cassandra sink fails for any kind of error. For some exceptions (e.g 
> WriteTimeoutException), ignoring the exception may be acceptable as well.
> Can we discuss having a FailureHandler on the lines of 
> ActionRequestFailureHandler?
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/WriteTimeoutException-in-Cassandra-sink-kill-the-job-tp22706p22707.html



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-10310) Cassandra Sink - Handling failing requests

2018-09-10 Thread Jayant Ameta (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10310?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jayant Ameta updated FLINK-10310:
-
Description: 
The cassandra sink fails for any kind of error. For some exceptions (e.g 
WriteTimeoutException), ignoring the exception may be acceptable as well.

Can we discuss having a FailureHandler on the lines of 
ActionRequestFailureHandler?

http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/WriteTimeoutException-in-Cassandra-sink-kill-the-job-tp22706p22707.html

  was:
The cassandra sink fails for any kind of error. For some exceptions (e.g 
WriteTimeoutException), ignoring the exception may be acceptable as well.

Can we discuss having a FailureHandler on the lines of 
ActionRequestFailureHandler?


> Cassandra Sink - Handling failing requests
> --
>
> Key: FLINK-10310
> URL: https://issues.apache.org/jira/browse/FLINK-10310
> Project: Flink
>  Issue Type: Improvement
>  Components: Cassandra Connector
>Reporter: Jayant Ameta
>Priority: Major
>
> The cassandra sink fails for any kind of error. For some exceptions (e.g 
> WriteTimeoutException), ignoring the exception may be acceptable as well.
> Can we discuss having a FailureHandler on the lines of 
> ActionRequestFailureHandler?
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/WriteTimeoutException-in-Cassandra-sink-kill-the-job-tp22706p22707.html



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-10310) Cassandra Sink - Handling failing requests

2018-09-09 Thread Jayant Ameta (JIRA)
Jayant Ameta created FLINK-10310:


 Summary: Cassandra Sink - Handling failing requests
 Key: FLINK-10310
 URL: https://issues.apache.org/jira/browse/FLINK-10310
 Project: Flink
  Issue Type: Improvement
  Components: Cassandra Connector
Reporter: Jayant Ameta


The cassandra sink fails for any kind of error. For some exceptions (e.g 
WriteTimeoutException), ignoring the exception may be acceptable as well.

Can we discuss having a FailureHandler on the lines of 
ActionRequestFailureHandler?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)