Hi John, I found an example on SO [1] in Scala.
[1] https://stackoverflow.com/a/52093079/10299342 On Tue, Jul 28, 2020 at 4:29 PM John Smith <java.dev....@gmail.com> wrote: > Hi, is there an example on how RowCsvInputFormat is initialized? > > On Tue, 28 Jul 2020 at 04:00, Jingsong Li <jingsongl...@gmail.com> wrote: > >> - `env.readCsvFile` is in DataSet, just read the full amount of data once >> in batch mode. >> - `streamEnv.readFile(RowCsvInputFormat, filePath, >> FileProcessingMode.PROCESS_CONTINUOUSLY, monitorInterval)` can monitor >> directory, and continue reading in streaming mode. >> >> On Tue, Jul 28, 2020 at 3:54 PM John Smith <java.dev....@gmail.com> >> wrote: >> >>> Also this where I find the docs confusing in the "connectors" section. >>> File system isn't under Data streaming but env.readCsvFile seems like it >>> can do the trick? >>> >>> On Tue., Jul. 28, 2020, 3:46 a.m. John Smith, <java.dev....@gmail.com> >>> wrote: >>> >>>> Bassically I want to "monitor" a bucket on S3 and every file that gets >>>> created in that bucket read it and stream it. >>>> >>>> If I understand correctly, I can just use env.readCsvFile() and config >>>> to continuously read a folder path? >>>> >>>> >>>> On Tue., Jul. 28, 2020, 1:38 a.m. Jingsong Li, <jingsongl...@gmail.com> >>>> wrote: >>>> >>>>> Hi John, >>>>> >>>>> Do you mean you want to read S3 CSV files using >>>>> partition/bucket pruning? >>>>> >>>>> If just using the DataSet API, you can use CsvInputFormat to read csv >>>>> files. >>>>> >>>>> If you want to use Table/Sql API, In 1.10, Csv format in table not >>>>> support partitioned table. So the only way is specific the >>>>> partition/bucket >>>>> path, and read single directory. >>>>> >>>>> In 1.11, the Table/Sql filesystem connector with csv format supports >>>>> partitioned table, complete support partition semantics. >>>>> >>>>> [1] >>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/filesystem.html >>>>> >>>>> Best, >>>>> Jingsong >>>>> >>>>> On Mon, Jul 27, 2020 at 10:54 PM John Smith <java.dev....@gmail.com> >>>>> wrote: >>>>> >>>>>> Hi, using Flink 1.10 >>>>>> >>>>>> 1- How do we go about reading CSV files that are copied to s3 buckets? >>>>>> 2- Is there a source that can tail S3 and start reading a CSV when it >>>>>> is copied to S3? >>>>>> 3- Is that part of the table APIs? >>>>>> >>>>> >>>>> >>>>> -- >>>>> Best, Jingsong Lee >>>>> >>>> >> >> -- >> Best, Jingsong Lee >> > -- Arvid Heise | Senior Java Developer <https://www.ververica.com/> Follow us @VervericaData -- Join Flink Forward <https://flink-forward.org/> - The Apache Flink Conference Stream Processing | Event Driven | Real Time -- Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany -- Ververica GmbH Registered at Amtsgericht Charlottenburg: HRB 158244 B Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng