Hi John,

I found an example on SO [1] in Scala.

[1] https://stackoverflow.com/a/52093079/10299342

On Tue, Jul 28, 2020 at 4:29 PM John Smith <java.dev....@gmail.com> wrote:

> Hi, is there an example on how RowCsvInputFormat is initialized?
>
> On Tue, 28 Jul 2020 at 04:00, Jingsong Li <jingsongl...@gmail.com> wrote:
>
>> - `env.readCsvFile` is in DataSet, just read the full amount of data once
>> in batch mode.
>> - `streamEnv.readFile(RowCsvInputFormat, filePath,
>> FileProcessingMode.PROCESS_CONTINUOUSLY, monitorInterval)` can monitor
>> directory, and continue reading in streaming mode.
>>
>> On Tue, Jul 28, 2020 at 3:54 PM John Smith <java.dev....@gmail.com>
>> wrote:
>>
>>> Also this where I find the docs confusing in the "connectors" section.
>>> File system isn't under Data streaming but env.readCsvFile seems like it
>>> can do the trick?
>>>
>>> On Tue., Jul. 28, 2020, 3:46 a.m. John Smith, <java.dev....@gmail.com>
>>> wrote:
>>>
>>>> Bassically I want to "monitor" a bucket on S3 and every file that gets
>>>> created in that bucket read it and stream it.
>>>>
>>>> If I understand correctly, I can just use env.readCsvFile() and config
>>>> to continuously read a folder path?
>>>>
>>>>
>>>> On Tue., Jul. 28, 2020, 1:38 a.m. Jingsong Li, <jingsongl...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi John,
>>>>>
>>>>> Do you mean you want to read S3 CSV files using
>>>>> partition/bucket pruning?
>>>>>
>>>>> If just using the DataSet API, you can use CsvInputFormat to read csv
>>>>> files.
>>>>>
>>>>> If you want to use Table/Sql API, In 1.10, Csv format in table not
>>>>> support partitioned table. So the only way is specific the 
>>>>> partition/bucket
>>>>> path, and read single directory.
>>>>>
>>>>> In 1.11, the Table/Sql filesystem connector with csv format supports
>>>>> partitioned table, complete support partition semantics.
>>>>>
>>>>> [1]
>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/filesystem.html
>>>>>
>>>>> Best,
>>>>> Jingsong
>>>>>
>>>>> On Mon, Jul 27, 2020 at 10:54 PM John Smith <java.dev....@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi, using Flink 1.10
>>>>>>
>>>>>> 1- How do we go about reading CSV files that are copied to s3 buckets?
>>>>>> 2- Is there a source that can tail S3 and start reading a CSV when it
>>>>>> is copied to S3?
>>>>>> 3- Is that part of the table APIs?
>>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> Best, Jingsong Lee
>>>>>
>>>>
>>
>> --
>> Best, Jingsong Lee
>>
>

-- 

Arvid Heise | Senior Java Developer

<https://www.ververica.com/>

Follow us @VervericaData

--

Join Flink Forward <https://flink-forward.org/> - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng

Reply via email to