Aw: Re: JSON source for pyflink stream

2021-04-14 Thread G . G . M . 5611
Thanks to everyone for the tips. It helps a lot. I'll try the table API first and if that doesn't succeed I'll do as Klemens says.

Cheers,

Giacomo

 
 

Gesendet: Mittwoch, 14. April 2021 um 16:18 Uhr
Von: "Dian Fu" 
An: "Klemens Muthmann" 
Cc: "Yik San Chan" , g.g.m.5...@web.de, "user" 
Betreff: Re: JSON source for pyflink stream


Hi Giacomo,

All the connectors supported in the Table & SQL connectors could be used in PyFlink Table API and so you could use file system connector in PyFlink Table API. AFAIK, it supports new line delimited json in the filesystem connector in Flink 1.12. You could refer to [1] for more details.

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/filesystem.html#file-formats

Regards,
Dian
 


On Wed, Apr 14, 2021 at 3:09 PM Klemens Muthmann <klemens.muthm...@cyface.de> wrote:


Hi,
 

We are loading our JSON from a Mongo Database. But we also found no readily available way to stream JSON Data into a Flink Pipeline. I guess this would be hard to implement since you have to know details about the JSON structure to do this. So I guess your best bet would be to implement your own input source, which can stream in your file and create results based on the JSON structure. We are not using Pyflink so I can not give any details about this, but it should not matter, which language you use.

 

Just implement a Source reading your input and employ any JSON parser you like, creating for example domain objects with the same attributes as your JSON structure and forward those into your Flink Pipeline for further processing.

 


Regards

Klemens



 

Am 14.04.2021 um 04:40 schrieb Yik San Chan <evan.chanyik...@gmail.com>:
 


Hi Giacomo,
 

I think you can try using Flink SQL connector. For JSON input such as {"a": 1, "b": {"c": 2, {"d": 3}}}, you can do:

 

CREATE TABLE data (

  a INT,

  b ROW>

) WITH (...)

 

Let me know if that helps.

 

Best,

Yik San

 


On Wed, Apr 14, 2021 at 2:00 AM <g.g.m.5...@web.de> wrote:




Hi,
I'm new to Flink and I am trying to create a stream from locally downloaded tweets. The tweets are in json format, like in this example:
 
{"data":{"text":"Polsek Kakas Cegah Covid-19 https://t.co/ADjEgpt7bC","public_metrics":"retweet_count":0,"reply_count":0,"like_count":0,"quote_count":0},
"author_id":"1367839185764151302","id":"1378275866279469059","created_at":"2021-04-03T09:19:08.000Z","source":"Twitter for Android","lang":"in"},
"includes":{"users":[{"protected":false,"id":"1367839185764151302","name":"Nathan Pareda","created_at":"2021-03-05T14:07:56.000Z",
"public_metrics":{"followers_count":0,"following_count":0,"tweet_count":557,"listed_count":0},
"username":"NathanPareda"}]},"matching_rules":[{"id":1378112825051246596,"tag":"coronavirus"}]}
 
I would like to do it in Python using Pyflink, but could also use Java if there is no reasonable way to do it in Python. I've been looking at different options for loading these objects into a stream, but am not sure what to do. Here's my situation so far:
 
1. There doesn't seem to be a fitting connector. The filesystem-connector doesn't seem to support json format.
2. I've seen in the archive of this mailing list that some reccomend to use the Table API. But I am not sure if this is a viable option given how nested the json objects are.
3. I could of course try to implement a custom DataSource, but that seems to be quite difficult so I'd only consider this if there's no other way.

I'll be very grateful for any kind of input.
Cheers,
Giacomo
 

















Re: JSON source for pyflink stream

2021-04-14 Thread Dian Fu
Hi Giacomo,

All the connectors supported in the Table & SQL connectors could be used in
PyFlink Table API and so you could use file system connector in PyFlink
Table API. AFAIK, it supports new line delimited json in the
filesystem connector in Flink 1.12. You could refer to [1] for more details.

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/filesystem.html#file-formats

Regards,
Dian

On Wed, Apr 14, 2021 at 3:09 PM Klemens Muthmann 
wrote:

> Hi,
>
> We are loading our JSON from a Mongo Database. But we also found no
> readily available way to stream JSON Data into a Flink Pipeline. I guess
> this would be hard to implement since you have to know details about the
> JSON structure to do this. So I guess your best bet would be to implement
> your own input source, which can stream in your file and create results
> based on the JSON structure. We are not using Pyflink so I can not give any
> details about this, but it should not matter, which language you use.
>
> Just implement a Source reading your input and employ any JSON parser you
> like, creating for example domain objects with the same attributes as your
> JSON structure and forward those into your Flink Pipeline for further
> processing.
>
> Regards
> Klemens
>
> Am 14.04.2021 um 04:40 schrieb Yik San Chan :
>
> Hi Giacomo,
>
> I think you can try using Flink SQL connector. For JSON input such as
> {"a": 1, "b": {"c": 2, {"d": 3}}}, you can do:
>
> CREATE TABLE data (
>   a INT,
>   b ROW>
> ) WITH (...)
>
> Let me know if that helps.
>
> Best,
> Yik San
>
> On Wed, Apr 14, 2021 at 2:00 AM  wrote:
>
>> Hi,
>> I'm new to Flink and I am trying to create a stream from locally
>> downloaded tweets. The tweets are in json format, like in this example:
>>
>> {"data":{"text":"Polsek Kakas Cegah Covid-19 https://t.co/ADjEgpt7bC
>> 
>> ","public_metrics":"retweet_count":0,"reply_count":0,"like_count":0,"quote_count":0},
>> "author_id":"1367839185764151302","id":"1378275866279469059","created_at":"2021-04-03T09:19:08.000Z","source":"Twitter
>> for Android","lang":"in"},
>> "includes":{"users":[{"protected":false,"id":"1367839185764151302","name":"Nathan
>> Pareda","created_at":"2021-03-05T14:07:56.000Z",
>>
>> "public_metrics":{"followers_count":0,"following_count":0,"tweet_count":557,"listed_count":0},
>>
>> "username":"NathanPareda"}]},"matching_rules":[{"id":1378112825051246596,"tag":"coronavirus"}]}
>>
>> I would like to do it in Python using Pyflink, but could also use Java if
>> there is no reasonable way to do it in Python. I've been looking at
>> different options for loading these objects into a stream, but am not sure
>> what to do. Here's my situation so far:
>>
>> 1. There doesn't seem to be a fitting connector. The filesystem-connector
>> doesn't seem to support json format.
>> 2. I've seen in the archive of this mailing list that some reccomend to
>> use the Table API. But I am not sure if this is a viable option given how
>> nested the json objects are.
>> 3. I could of course try to implement a custom DataSource, but that seems
>> to be quite difficult so I'd only consider this if there's no other way.
>>
>> I'll be very grateful for any kind of input.
>> Cheers,
>> Giacomo
>>
>>
>
>


Re: JSON source for pyflink stream

2021-04-14 Thread Klemens Muthmann
Hi,

We are loading our JSON from a Mongo Database. But we also found no readily 
available way to stream JSON Data into a Flink Pipeline. I guess this would be 
hard to implement since you have to know details about the JSON structure to do 
this. So I guess your best bet would be to implement your own input source, 
which can stream in your file and create results based on the JSON structure. 
We are not using Pyflink so I can not give any details about this, but it 
should not matter, which language you use.

Just implement a Source reading your input and employ any JSON parser you like, 
creating for example domain objects with the same attributes as your JSON 
structure and forward those into your Flink Pipeline for further processing.

Regards
Klemens

> Am 14.04.2021 um 04:40 schrieb Yik San Chan :
> 
> Hi Giacomo,
> 
> I think you can try using Flink SQL connector. For JSON input such as {"a": 
> 1, "b": {"c": 2, {"d": 3}}}, you can do:
> 
> CREATE TABLE data (
>   a INT,
>   b ROW>
> ) WITH (...)
> 
> Let me know if that helps.
> 
> Best,
> Yik San
> 
> On Wed, Apr 14, 2021 at 2:00 AM  > wrote:
> Hi,
> I'm new to Flink and I am trying to create a stream from locally downloaded 
> tweets. The tweets are in json format, like in this example:
>  
> {"data":{"text":"Polsek Kakas Cegah Covid-19 https://t.co/ADjEgpt7bC 
> ","public_metrics":"retweet_count":0,"reply_count":0,"like_count":0,"quote_count":0},
> "author_id":"1367839185764151302","id":"1378275866279469059","created_at":"2021-04-03T09:19:08.000Z","source":"Twitter
>  for Android","lang":"in"},
> "includes":{"users":[{"protected":false,"id":"1367839185764151302","name":"Nathan
>  Pareda","created_at":"2021-03-05T14:07:56.000Z",
> "public_metrics":{"followers_count":0,"following_count":0,"tweet_count":557,"listed_count":0},
> "username":"NathanPareda"}]},"matching_rules":[{"id":1378112825051246596,"tag":"coronavirus"}]}
>  
> I would like to do it in Python using Pyflink, but could also use Java if 
> there is no reasonable way to do it in Python. I've been looking at different 
> options for loading these objects into a stream, but am not sure what to do. 
> Here's my situation so far:
>  
> 1. There doesn't seem to be a fitting connector. The filesystem-connector 
> doesn't seem to support json format.
> 2. I've seen in the archive of this mailing list that some reccomend to use 
> the Table API. But I am not sure if this is a viable option given how nested 
> the json objects are.
> 3. I could of course try to implement a custom DataSource, but that seems to 
> be quite difficult so I'd only consider this if there's no other way.
> 
> I'll be very grateful for any kind of input.
> Cheers,
> Giacomo
>  



Re: JSON source for pyflink stream

2021-04-13 Thread Yik San Chan
Hi Giacomo,

I think you can try using Flink SQL connector. For JSON input such as {"a":
1, "b": {"c": 2, {"d": 3}}}, you can do:

CREATE TABLE data (
  a INT,
  b ROW>
) WITH (...)

Let me know if that helps.

Best,
Yik San

On Wed, Apr 14, 2021 at 2:00 AM  wrote:

> Hi,
> I'm new to Flink and I am trying to create a stream from locally
> downloaded tweets. The tweets are in json format, like in this example:
>
> {"data":{"text":"Polsek Kakas Cegah Covid-19 https://t.co/ADjEgpt7bC
> 
> ","public_metrics":"retweet_count":0,"reply_count":0,"like_count":0,"quote_count":0},
> "author_id":"1367839185764151302","id":"1378275866279469059","created_at":"2021-04-03T09:19:08.000Z","source":"Twitter
> for Android","lang":"in"},
> "includes":{"users":[{"protected":false,"id":"1367839185764151302","name":"Nathan
> Pareda","created_at":"2021-03-05T14:07:56.000Z",
>
> "public_metrics":{"followers_count":0,"following_count":0,"tweet_count":557,"listed_count":0},
>
> "username":"NathanPareda"}]},"matching_rules":[{"id":1378112825051246596,"tag":"coronavirus"}]}
>
> I would like to do it in Python using Pyflink, but could also use Java if
> there is no reasonable way to do it in Python. I've been looking at
> different options for loading these objects into a stream, but am not sure
> what to do. Here's my situation so far:
>
> 1. There doesn't seem to be a fitting connector. The filesystem-connector
> doesn't seem to support json format.
> 2. I've seen in the archive of this mailing list that some reccomend to
> use the Table API. But I am not sure if this is a viable option given how
> nested the json objects are.
> 3. I could of course try to implement a custom DataSource, but that seems
> to be quite difficult so I'd only consider this if there's no other way.
>
> I'll be very grateful for any kind of input.
> Cheers,
> Giacomo
>
>


JSON source for pyflink stream

2021-04-13 Thread G . G . M . 5611
Hi,
I'm new to Flink and I am trying to create a stream from locally downloaded tweets. The tweets are in json format, like in this example:
 
{"data":{"text":"Polsek Kakas Cegah Covid-19 https://t.co/ADjEgpt7bC","public_metrics":"retweet_count":0,"reply_count":0,"like_count":0,"quote_count":0},
"author_id":"1367839185764151302","id":"1378275866279469059","created_at":"2021-04-03T09:19:08.000Z","source":"Twitter for Android","lang":"in"},
"includes":{"users":[{"protected":false,"id":"1367839185764151302","name":"Nathan Pareda","created_at":"2021-03-05T14:07:56.000Z",
"public_metrics":{"followers_count":0,"following_count":0,"tweet_count":557,"listed_count":0},
"username":"NathanPareda"}]},"matching_rules":[{"id":1378112825051246596,"tag":"coronavirus"}]}
 
I would like to do it in Python using Pyflink, but could also use Java if there is no reasonable way to do it in Python. I've been looking at different options for loading these objects into a stream, but am not sure what to do. Here's my situation so far:
 
1. There doesn't seem to be a fitting connector. The filesystem-connector doesn't seem to support json format.
2. I've seen in the archive of this mailing list that some reccomend to use the Table API. But I am not sure if this is a viable option given how nested the json objects are.
3. I could of course try to implement a custom DataSource, but that seems to be quite difficult so I'd only consider this if there's no other way.

I'll be very grateful for any kind of input.
Cheers,
Giacomo