[GitHub] flink issue #2762: [FLINK-3871] Add Kafka TableSource with Avro serializatio...
Github user twalthr commented on the issue: https://github.com/apache/flink/pull/2762 @mushketyk if you don't mind, I would continue working on this issue. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2762: [FLINK-3871] Add Kafka TableSource with Avro serializatio...
Github user fhueske commented on the issue: https://github.com/apache/flink/pull/2762 Hi @mushketyk, sorry for the concise description before. The problem is the following: A `TableSource` provides schema information for the Table it produces. However, the methods `TableSource.getFieldNames()` and `TableSouce.getFieldTypes()` return flat arrays which are interpreted as flat schema (the second field in the array represents the second table attribute) without any nesting. Avro and many other storage formats (JSON, Parquet, ...) support nested data structures. With the current limitation of the `TableSource` interface, we would need to convert the nested data into a flat schema. However, the Table API and SQL support processing of nested data and it would be a much better integration to pass Avro objects in their original structure into the Table API / SQL query (see my updated proposal for [FLINK-3871](https://issues.apache.org/jira/browse/FLINK-3871). In order to be able to create a Table of nested Avro data, we need to improve the `TableSource` interface first. Once this is done, we can continue with this PR. I'm very sorry, that I did not think about this earlier and the effort you already put into this issue. Please let me know if you have any questions. Best, Fabian --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2762: [FLINK-3871] Add Kafka TableSource with Avro serializatio...
Github user mushketyk commented on the issue: https://github.com/apache/flink/pull/2762 Hi @fhueske , Sorry, I am still relatively new to Flink so I don't have the full context. Could you elaborate why this is an issue and how does this shortcoming affect this PR? Do you have an idea how difficult will it be to implement this change? If it's not very involved I could help with it to move this PR forward. Best regards, Ivan. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2762: [FLINK-3871] Add Kafka TableSource with Avro serializatio...
Github user fhueske commented on the issue: https://github.com/apache/flink/pull/2762 Hi @mushketyk, while reviewing your PR I noticed a significant shortcoming of the `TableSource` interface. `TableSource` does not support nested schemas. IMO, this needs to be fixed before we can have a proper integration of Avro integration with the Table API / SQL. I created [FLINK-5280](https://issues.apache.org/jira/browse/FLINK-5280) and also updated the description of this issue ([FLINK-3871](https://issues.apache.org/jira/browse/FLINK-3871). The `KafkaJsonTableSource` needs to be reworked to support nested data as well ([FLINK-5281](https://issues.apache.org/jira/browse/FLINK-5281)). I think we have to pause this PR until FLINK-5280 is resolved. What do you think? Best, Fabian --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2762: [FLINK-3871] Add Kafka TableSource with Avro serializatio...
Github user mushketyk commented on the issue: https://github.com/apache/flink/pull/2762 Hi @fhueske I've updated PR according to your review. Best regards, Ivan. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2762: [FLINK-3871] Add Kafka TableSource with Avro serializatio...
Github user mushketyk commented on the issue: https://github.com/apache/flink/pull/2762 Hi @fhueske Ok, I'll give a try. I've pushed an updated PR where Schema is now external. I'll work on the transformation part tomorrow. I had few issues while trying to accommodate your comments. 1. When I try to to use DateTime as a value of one of the fields of a GenericRecord I receive the following exception: ``` 0[main] INFO org.apache.flink.api.java.typeutils.TypeExtractor - class org.joda.time.DateTime is not a valid POJO type java.lang.RuntimeException: java.lang.InstantiationException at org.apache.avro.specific.SpecificData.newInstance(SpecificData.java:316) at org.apache.avro.specific.SpecificData.newRecord(SpecificData.java:332) at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:173) at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:151) at org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:193) at org.apache.avro.reflect.ReflectDatumReader.readField(ReflectDatumReader.java:230) at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:183) at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:151) at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:155) at org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:193) at org.apache.avro.reflect.ReflectDatumReader.readField(ReflectDatumReader.java:230) at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:183) at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:151) at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:142) at org.apache.flink.streaming.util.serialization.AvroRowDeserializationSchema.readRecord(AvroRowDeserializationSchema.java:77) at org.apache.flink.streaming.util.serialization.AvroRowDeserializationSchema.deserialize(AvroRowDeserializationSchema.java:70) at org.apache.flink.streaming.connectors.kafka.AvroDeserializationSchemaTest.deserializeRowWithComplexTypes(AvroDeserializationSchemaTest.java:117) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:497) at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57) at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) at org.junit.runners.ParentRunner.run(ParentRunner.java:363) at org.junit.runner.JUnitCore.run(JUnitCore.java:137) at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:119) at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:42) at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:234) at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:74) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:497) at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144) Caused by: java.lang.InstantiationException at sun.reflect.InstantiationExceptionConstructorAccessorImpl.newInstance(InstantiationExceptionConstructorAccessorImpl.java:48) at java.lang.reflect.Constructor.newInstance(Constructor.java:422) at org.apache.avro.specific.SpecificData.newInstance(SpecificData.java:314) ... 43 more ```
[GitHub] flink issue #2762: [FLINK-3871] Add Kafka TableSource with Avro serializatio...
Github user fhueske commented on the issue: https://github.com/apache/flink/pull/2762 Would be good if the TableSource could be used by providing parameters and without extending it. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2762: [FLINK-3871] Add Kafka TableSource with Avro serializatio...
Github user mushketyk commented on the issue: https://github.com/apache/flink/pull/2762 Are there any other cases when a user needs a custom conversion from a `GenericRecord` to a `Row` instance? `String -> String` will definitely work, but I also can think about a generic convertor `GenericRecord => Row`, but this may be an overkill. What do you think about this? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2762: [FLINK-3871] Add Kafka TableSource with Avro serializatio...
Github user fhueske commented on the issue: https://github.com/apache/flink/pull/2762 The `String -> String` mapping would be a name mapping of fields from the `GenericRecord` to `Row` attributes. This would allow users to freely choose the attribute names of a table which is generated from an Avro schema. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2762: [FLINK-3871] Add Kafka TableSource with Avro serializatio...
Github user mushketyk commented on the issue: https://github.com/apache/flink/pull/2762 @fhueske Just a small question about your comment. You wrote: "users should be able to provide a Schema and a String ->String to convert aGenericRecordinto a Row" What is String -> String for? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2762: [FLINK-3871] Add Kafka TableSource with Avro serializatio...
Github user fhueske commented on the issue: https://github.com/apache/flink/pull/2762 No worries and no need to hurry! :-) Thanks, Fabian --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2762: [FLINK-3871] Add Kafka TableSource with Avro serializatio...
Github user mushketyk commented on the issue: https://github.com/apache/flink/pull/2762 HI @fhueske, sorry I was a bit busy lately. I'll update the PR tonight. Best regards, Ivan --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2762: [FLINK-3871] Add Kafka TableSource with Avro serializatio...
Github user fhueske commented on the issue: https://github.com/apache/flink/pull/2762 Hi @mushketyk, do you plan to continue with this PR? Thanks, Fabian --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2762: [FLINK-3871] Add Kafka TableSource with Avro serializatio...
Github user mushketyk commented on the issue: https://github.com/apache/flink/pull/2762 @fhueske No worries. Take your time. Best regards, Ivan. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2762: [FLINK-3871] Add Kafka TableSource with Avro serializatio...
Github user fhueske commented on the issue: https://github.com/apache/flink/pull/2762 Thanks a lot for the update @mushketyk. I'm quite busy at the moment but will try to have a look at your PR soon. Thanks, Fabian --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2762: [FLINK-3871] Add Kafka TableSource with Avro serializatio...
Github user mushketyk commented on the issue: https://github.com/apache/flink/pull/2762 @fhueske I've split the PR according to your suggestion. Now this PR only contains serializer, deserializer and table source. Since sink part depends on this PR I cannot publish it now, so I'll publish it when this PR is merged. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---