Hi Giacomo,

All the connectors supported in the Table & SQL connectors could be used in
PyFlink Table API and so you could use file system connector in PyFlink
Table API. AFAIK, it supports new line delimited json in the
filesystem connector in Flink 1.12. You could refer to [1] for more details.

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/filesystem.html#file-formats

Regards,
Dian

On Wed, Apr 14, 2021 at 3:09 PM Klemens Muthmann <klemens.muthm...@cyface.de>
wrote:

> Hi,
>
> We are loading our JSON from a Mongo Database. But we also found no
> readily available way to stream JSON Data into a Flink Pipeline. I guess
> this would be hard to implement since you have to know details about the
> JSON structure to do this. So I guess your best bet would be to implement
> your own input source, which can stream in your file and create results
> based on the JSON structure. We are not using Pyflink so I can not give any
> details about this, but it should not matter, which language you use.
>
> Just implement a Source reading your input and employ any JSON parser you
> like, creating for example domain objects with the same attributes as your
> JSON structure and forward those into your Flink Pipeline for further
> processing.
>
> Regards
> Klemens
>
> Am 14.04.2021 um 04:40 schrieb Yik San Chan <evan.chanyik...@gmail.com>:
>
> Hi Giacomo,
>
> I think you can try using Flink SQL connector. For JSON input such as
> {"a": 1, "b": {"c": 2, {"d": 3}}}, you can do:
>
> CREATE TABLE data (
>   a INT,
>   b ROW<c INT, ROW<d INT>>
> ) WITH (...)
>
> Let me know if that helps.
>
> Best,
> Yik San
>
> On Wed, Apr 14, 2021 at 2:00 AM <g.g.m.5...@web.de> wrote:
>
>> Hi,
>> I'm new to Flink and I am trying to create a stream from locally
>> downloaded tweets. The tweets are in json format, like in this example:
>>
>> {"data":{"text":"Polsek Kakas Cegah Covid-19 https://t.co/ADjEgpt7bC
>> <https://deref-web.de/mail/client/2UPTbOw73vE/dereferrer/?redirectUrl=https%3A%2F%2Ft.co%2FADjEgpt7bC>
>> ","public_metrics":"retweet_count":0,"reply_count":0,"like_count":0,"quote_count":0},
>> "author_id":"1367839185764151302","id":"1378275866279469059","created_at":"2021-04-03T09:19:08.000Z","source":"Twitter
>> for Android","lang":"in"},
>> "includes":{"users":[{"protected":false,"id":"1367839185764151302","name":"Nathan
>> Pareda","created_at":"2021-03-05T14:07:56.000Z",
>>
>> "public_metrics":{"followers_count":0,"following_count":0,"tweet_count":557,"listed_count":0},
>>
>> "username":"NathanPareda"}]},"matching_rules":[{"id":1378112825051246596,"tag":"coronavirus"}]}
>>
>> I would like to do it in Python using Pyflink, but could also use Java if
>> there is no reasonable way to do it in Python. I've been looking at
>> different options for loading these objects into a stream, but am not sure
>> what to do. Here's my situation so far:
>>
>> 1. There doesn't seem to be a fitting connector. The filesystem-connector
>> doesn't seem to support json format.
>> 2. I've seen in the archive of this mailing list that some reccomend to
>> use the Table API. But I am not sure if this is a viable option given how
>> nested the json objects are.
>> 3. I could of course try to implement a custom DataSource, but that seems
>> to be quite difficult so I'd only consider this if there's no other way.
>>
>> I'll be very grateful for any kind of input.
>> Cheers,
>> Giacomo
>>
>>
>
>

Reply via email to