[GitHub] flink issue #2762: [FLINK-3871] Add Kafka TableSource with Avro serializatio...

2017-03-28 Thread twalthr
Github user twalthr commented on the issue:

https://github.com/apache/flink/pull/2762
  
@mushketyk if you don't mind, I would continue working on this issue.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2762: [FLINK-3871] Add Kafka TableSource with Avro serializatio...

2016-12-08 Thread fhueske
Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/2762
  
Hi @mushketyk, sorry for the concise description before. The problem is the 
following:

A `TableSource` provides schema information for the Table it produces. 
However, the methods `TableSource.getFieldNames()` and 
`TableSouce.getFieldTypes()` return flat arrays which are interpreted as flat 
schema (the second field in the array represents the second table attribute) 
without any nesting.

Avro and many other storage formats (JSON, Parquet, ...) support nested 
data structures. With the current limitation of the `TableSource` interface, we 
would need to convert the nested data into a flat schema. However, the Table 
API and SQL support processing of nested data and it would be a much better 
integration to pass Avro objects in their original structure into the Table API 
/ SQL query (see my updated proposal for 
[FLINK-3871](https://issues.apache.org/jira/browse/FLINK-3871). 

In order to be able to create a Table of nested Avro data, we need to 
improve the `TableSource` interface first. Once this is done, we can continue 
with this PR.

I'm very sorry, that I did not think about this earlier and the effort you 
already put into this issue.
Please let me know if you have any questions. 
Best, Fabian


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2762: [FLINK-3871] Add Kafka TableSource with Avro serializatio...

2016-12-07 Thread mushketyk
Github user mushketyk commented on the issue:

https://github.com/apache/flink/pull/2762
  
Hi @fhueske ,

Sorry, I am still relatively new to Flink so I don't have the full context. 
Could you elaborate why this is an issue and how does this shortcoming affect 
this PR?

Do you have an idea how difficult will it be to implement this change? If 
it's not very involved I could help with it to move this PR forward.

Best regards,
Ivan.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2762: [FLINK-3871] Add Kafka TableSource with Avro serializatio...

2016-12-07 Thread fhueske
Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/2762
  
Hi @mushketyk, while reviewing your PR I noticed a significant shortcoming 
of the `TableSource` interface. `TableSource` does not support nested schemas. 
IMO, this needs to be fixed before we can have a proper integration of Avro 
integration with the Table API / SQL. 

I created [FLINK-5280](https://issues.apache.org/jira/browse/FLINK-5280) 
and also updated the description of this issue 
([FLINK-3871](https://issues.apache.org/jira/browse/FLINK-3871). The 
`KafkaJsonTableSource` needs to be reworked to support nested data as well 
([FLINK-5281](https://issues.apache.org/jira/browse/FLINK-5281)).

I think we have to pause this PR until FLINK-5280 is resolved.
What do you think?

Best, Fabian


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2762: [FLINK-3871] Add Kafka TableSource with Avro serializatio...

2016-12-02 Thread mushketyk
Github user mushketyk commented on the issue:

https://github.com/apache/flink/pull/2762
  
Hi @fhueske 

I've updated PR according to your review.

Best regards,
Ivan.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2762: [FLINK-3871] Add Kafka TableSource with Avro serializatio...

2016-12-02 Thread mushketyk
Github user mushketyk commented on the issue:

https://github.com/apache/flink/pull/2762
  
Hi @fhueske 
Ok, I'll give a try.

I've pushed an updated PR where Schema is now external. I'll work on the 
transformation part tomorrow.

I had few issues while trying to accommodate your comments.
1. When I try to to use DateTime as a value of one of the fields of a 
GenericRecord I receive the following exception:

```
0[main] INFO  org.apache.flink.api.java.typeutils.TypeExtractor  - 
class org.joda.time.DateTime is not a valid POJO type

java.lang.RuntimeException: java.lang.InstantiationException

at 
org.apache.avro.specific.SpecificData.newInstance(SpecificData.java:316)
at 
org.apache.avro.specific.SpecificData.newRecord(SpecificData.java:332)
at 
org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:173)
at 
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:151)
at 
org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:193)
at 
org.apache.avro.reflect.ReflectDatumReader.readField(ReflectDatumReader.java:230)
at 
org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:183)
at 
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:151)
at 
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:155)
at 
org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:193)
at 
org.apache.avro.reflect.ReflectDatumReader.readField(ReflectDatumReader.java:230)
at 
org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:183)
at 
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:151)
at 
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:142)
at 
org.apache.flink.streaming.util.serialization.AvroRowDeserializationSchema.readRecord(AvroRowDeserializationSchema.java:77)
at 
org.apache.flink.streaming.util.serialization.AvroRowDeserializationSchema.deserialize(AvroRowDeserializationSchema.java:70)
at 
org.apache.flink.streaming.connectors.kafka.AvroDeserializationSchemaTest.deserializeRowWithComplexTypes(AvroDeserializationSchemaTest.java:117)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
at 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
at 
com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:119)
at 
com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:42)
at 
com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:234)
at 
com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:74)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)
Caused by: java.lang.InstantiationException
at 
sun.reflect.InstantiationExceptionConstructorAccessorImpl.newInstance(InstantiationExceptionConstructorAccessorImpl.java:48)
at java.lang.reflect.Constructor.newInstance(Constructor.java:422)
at 
org.apache.avro.specific.SpecificData.newInstance(SpecificData.java:314)
... 43 more
```

[GitHub] flink issue #2762: [FLINK-3871] Add Kafka TableSource with Avro serializatio...

2016-12-01 Thread fhueske
Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/2762
  
Would be good if the TableSource could be used by providing parameters and 
without extending it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2762: [FLINK-3871] Add Kafka TableSource with Avro serializatio...

2016-12-01 Thread mushketyk
Github user mushketyk commented on the issue:

https://github.com/apache/flink/pull/2762
  
Are there any other cases when a user needs a custom conversion from a 
`GenericRecord` to a `Row` instance?
`String -> String` will definitely work, but I also can think about a 
generic convertor `GenericRecord => Row`, but this may be an overkill. What do 
you think about this?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2762: [FLINK-3871] Add Kafka TableSource with Avro serializatio...

2016-12-01 Thread fhueske
Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/2762
  
The `String -> String` mapping would be a name mapping of fields from the 
`GenericRecord` to `Row` attributes. This would allow users to freely choose 
the attribute names of a table which is generated from an Avro schema.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2762: [FLINK-3871] Add Kafka TableSource with Avro serializatio...

2016-12-01 Thread mushketyk
Github user mushketyk commented on the issue:

https://github.com/apache/flink/pull/2762
  
@fhueske  Just a small question about your comment. You wrote:
"users should be able to provide a Schema and a String ->String to convert 
aGenericRecordinto a Row"

What is String -> String for?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2762: [FLINK-3871] Add Kafka TableSource with Avro serializatio...

2016-12-01 Thread fhueske
Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/2762
  
No worries and no need to hurry! :-)
Thanks, Fabian


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2762: [FLINK-3871] Add Kafka TableSource with Avro serializatio...

2016-12-01 Thread mushketyk
Github user mushketyk commented on the issue:

https://github.com/apache/flink/pull/2762
  
HI @fhueske, sorry I was a bit busy lately.  I'll update the PR tonight.

Best regards, 
Ivan 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2762: [FLINK-3871] Add Kafka TableSource with Avro serializatio...

2016-12-01 Thread fhueske
Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/2762
  
Hi @mushketyk, do you plan to continue with this PR?

Thanks, Fabian


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2762: [FLINK-3871] Add Kafka TableSource with Avro serializatio...

2016-11-08 Thread mushketyk
Github user mushketyk commented on the issue:

https://github.com/apache/flink/pull/2762
  
@fhueske No worries. Take your time.

Best regards,
Ivan.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2762: [FLINK-3871] Add Kafka TableSource with Avro serializatio...

2016-11-08 Thread fhueske
Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/2762
  
Thanks a lot for the update @mushketyk. I'm quite busy at the moment but 
will try to have a look at your PR soon. Thanks, Fabian


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2762: [FLINK-3871] Add Kafka TableSource with Avro serializatio...

2016-11-07 Thread mushketyk
Github user mushketyk commented on the issue:

https://github.com/apache/flink/pull/2762
  
@fhueske I've split the PR according to your suggestion. Now this PR only 
contains serializer, deserializer and table source.
Since sink part depends on this PR I cannot publish it now, so I'll publish 
it when this PR is merged.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---