Thanks to everyone for the tips. It helps a lot. I'll try the table API first and if that doesn't succeed I'll do as Klemens says.
Cheers,
Giacomo
 
 
Gesendet: Mittwoch, 14. April 2021 um 16:18 Uhr
Von: "Dian Fu" <dian0511...@gmail.com>
An: "Klemens Muthmann" <klemens.muthm...@cyface.de>
Cc: "Yik San Chan" <evan.chanyik...@gmail.com>, g.g.m.5...@web.de, "user" <user@flink.apache.org>
Betreff: Re: JSON source for pyflink stream
Hi Giacomo,

All the connectors supported in the Table & SQL connectors could be used in PyFlink Table API and so you could use file system connector in PyFlink Table API. AFAIK, it supports new line delimited json in the filesystem connector in Flink 1.12. You could refer to [1] for more details.

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/filesystem.html#file-formats

Regards,
Dian
 
On Wed, Apr 14, 2021 at 3:09 PM Klemens Muthmann <klemens.muthm...@cyface.de> wrote:
Hi,
 
We are loading our JSON from a Mongo Database. But we also found no readily available way to stream JSON Data into a Flink Pipeline. I guess this would be hard to implement since you have to know details about the JSON structure to do this. So I guess your best bet would be to implement your own input source, which can stream in your file and create results based on the JSON structure. We are not using Pyflink so I can not give any details about this, but it should not matter, which language you use.
 
Just implement a Source reading your input and employ any JSON parser you like, creating for example domain objects with the same attributes as your JSON structure and forward those into your Flink Pipeline for further processing.
 
Regards
Klemens
 
Am 14.04.2021 um 04:40 schrieb Yik San Chan <evan.chanyik...@gmail.com>:
 
Hi Giacomo,
 
I think you can try using Flink SQL connector. For JSON input such as {"a": 1, "b": {"c": 2, {"d": 3}}}, you can do:
 
CREATE TABLE data (
  a INT,
  b ROW<c INT, ROW<d INT>>
) WITH (...)
 
Let me know if that helps.
 
Best,
Yik San
 
On Wed, Apr 14, 2021 at 2:00 AM <g.g.m.5...@web.de> wrote:
Hi,
I'm new to Flink and I am trying to create a stream from locally downloaded tweets. The tweets are in json format, like in this example:
 
{"data":{"text":"Polsek Kakas Cegah Covid-19 https://t.co/ADjEgpt7bC","public_metrics":"retweet_count":0,"reply_count":0,"like_count":0,"quote_count":0},
"author_id":"1367839185764151302","id":"1378275866279469059","created_at":"2021-04-03T09:19:08.000Z","source":"Twitter for Android","lang":"in"},
"includes":{"users":[{"protected":false,"id":"1367839185764151302","name":"Nathan Pareda","created_at":"2021-03-05T14:07:56.000Z",
"public_metrics":{"followers_count":0,"following_count":0,"tweet_count":557,"listed_count":0},
"username":"NathanPareda"}]},"matching_rules":[{"id":1378112825051246596,"tag":"coronavirus"}]}
 
I would like to do it in Python using Pyflink, but could also use Java if there is no reasonable way to do it in Python. I've been looking at different options for loading these objects into a stream, but am not sure what to do. Here's my situation so far:
 
1. There doesn't seem to be a fitting connector. The filesystem-connector doesn't seem to support json format.
2. I've seen in the archive of this mailing list that some reccomend to use the Table API. But I am not sure if this is a viable option given how nested the json objects are.
3. I could of course try to implement a custom DataSource, but that seems to be quite difficult so I'd only consider this if there's no other way.

I'll be very grateful for any kind of input.
Cheers,
Giacomo
 

Reply via email to