Hi,

We are loading our JSON from a Mongo Database. But we also found no readily 
available way to stream JSON Data into a Flink Pipeline. I guess this would be 
hard to implement since you have to know details about the JSON structure to do 
this. So I guess your best bet would be to implement your own input source, 
which can stream in your file and create results based on the JSON structure. 
We are not using Pyflink so I can not give any details about this, but it 
should not matter, which language you use.

Just implement a Source reading your input and employ any JSON parser you like, 
creating for example domain objects with the same attributes as your JSON 
structure and forward those into your Flink Pipeline for further processing.

Regards
        Klemens

> Am 14.04.2021 um 04:40 schrieb Yik San Chan <evan.chanyik...@gmail.com>:
> 
> Hi Giacomo,
> 
> I think you can try using Flink SQL connector. For JSON input such as {"a": 
> 1, "b": {"c": 2, {"d": 3}}}, you can do:
> 
> CREATE TABLE data (
>   a INT,
>   b ROW<c INT, ROW<d INT>>
> ) WITH (...)
> 
> Let me know if that helps.
> 
> Best,
> Yik San
> 
> On Wed, Apr 14, 2021 at 2:00 AM <g.g.m.5...@web.de 
> <mailto:g.g.m.5...@web.de>> wrote:
> Hi,
> I'm new to Flink and I am trying to create a stream from locally downloaded 
> tweets. The tweets are in json format, like in this example:
>  
> {"data":{"text":"Polsek Kakas Cegah Covid-19 https://t.co/ADjEgpt7bC 
> <https://deref-web.de/mail/client/2UPTbOw73vE/dereferrer/?redirectUrl=https%3A%2F%2Ft.co%2FADjEgpt7bC>","public_metrics":"retweet_count":0,"reply_count":0,"like_count":0,"quote_count":0},
> "author_id":"1367839185764151302","id":"1378275866279469059","created_at":"2021-04-03T09:19:08.000Z","source":"Twitter
>  for Android","lang":"in"},
> "includes":{"users":[{"protected":false,"id":"1367839185764151302","name":"Nathan
>  Pareda","created_at":"2021-03-05T14:07:56.000Z",
> "public_metrics":{"followers_count":0,"following_count":0,"tweet_count":557,"listed_count":0},
> "username":"NathanPareda"}]},"matching_rules":[{"id":1378112825051246596,"tag":"coronavirus"}]}
>  
> I would like to do it in Python using Pyflink, but could also use Java if 
> there is no reasonable way to do it in Python. I've been looking at different 
> options for loading these objects into a stream, but am not sure what to do. 
> Here's my situation so far:
>  
> 1. There doesn't seem to be a fitting connector. The filesystem-connector 
> doesn't seem to support json format.
> 2. I've seen in the archive of this mailing list that some reccomend to use 
> the Table API. But I am not sure if this is a viable option given how nested 
> the json objects are.
> 3. I could of course try to implement a custom DataSource, but that seems to 
> be quite difficult so I'd only consider this if there's no other way.
> 
> I'll be very grateful for any kind of input.
> Cheers,
> Giacomo
>  

Reply via email to