Hi, I was referring to
StreamExecutionEnvironment.readFile( FileInputFormat<OUT> inputFormat, String filePath, FileProcessingMode watchType, long interval) Where you can specify whether the source should shutdown once all files have been had (PROCESS_ONCE) or whether the source should continue to monitor the input directory for new files (PROCESS_CONTINUOUSLY). I think there is currently no built-in way of removing files from the input directory once they have been processed because it’s not possible to know when the contents of a given files have passed through the complete pipeline. Best, Aljoscha > On 15. Jun 2017, at 20:00, Flavio Pompermaier <pomperma...@okkam.it> wrote: > > Hi Aljosha, > thanks for the great suggestions, I wasn't aware of > AsyncDataStream.unorderedWait and BucketingSink setBucketer(). > Most probably that's exactly what I was looking for...(I should just have the > time to test it. > Just one last question: what are you referring to with "you could use a > different readFile() method where you can specify that you want to continue > monitoring the directory for new files"? Is there a way to delete or move to > a backup dir the new input files once enriched? > > Best Flavio > > > > On Thu, Jun 15, 2017 at 2:30 PM, Aljoscha Krettek <aljos...@apache.org > <mailto:aljos...@apache.org>> wrote: > Ok, just trying to make sure I understand everything: You have this: > > 1. A bunch of data in HDFS that you want to enrich > 2. An external service (Solr/ES) that you query for enriching the data rows > stored in 1. > 3. You need to store the enriched rows in HDFS again > > I think you could just do this (roughly): > > StreamExecutionEnvironment env = …; > > DataStream<Row> input = env.readFile(new RowCsvInputFormat(…), “<hdfs path>”); > > DataStream<Row> enriched = input.flatMap(new MyEnricherThatCallsES()); > // or > DataStream<Row> enriched = AsyncDataStream.unorderedWait(input, …) // yes, > the interface for this is a bit strange > > BucketingSink sink = new BucketingSink(“<hdfs sink path>”); > // this is responsible for putting files into buckets, so that you don’t have > to many small HDFS files > sink.setBucketer(new MyBucketer()); > enriched.addSink(sink) > > In this case, the file source will close once all files are read and the job > will finish. If you don’t want this you can also use a different readFile() > method where you can specify that you want to continue monitoring the > directory for new files. > > Best, > Aljoscha > >> On 6. Jun 2017, at 17:38, Flavio Pompermaier <pomperma...@okkam.it >> <mailto:pomperma...@okkam.it>> wrote: >> >> Hi Aljosha, >> thanks for getting back to me on this! I'll try to simplify the thread >> starting from what we want to achieve. >> >> At the moment we execute some queries to a db and we store the data into >> Parquet directories (one for each query). >> Let's say we then create a DataStream<Row> from each dir, what we would like >> to achieve is to perform some sort of throttling of the queries to perfrom >> to this external service (in order to not overload it with too many >> queries...but we also need to run as much queries as possible in order to >> execute this process in a reasonable time). >> >> The current batch process has the downside that you must know at priori the >> right parallelism of the job while the streaming process should be able to >> rescale when needed [1] so it should be easier to tune the job parallelism >> without loosing all the already performed queries [2]. Moreover, it the job >> crash you loose all the work done up to that moment because there's no >> checkpointing... >> My initial idea was to read from HDFS and put the data into Kafka to be able >> to change the number of consumers at runtime (accordingly to the maxmimum >> parallelism we can achieve with the external service) but maybe this could >> be done in a easier way (we've started using streaming from a few time so we >> can see things more complicated than they are). >> >> Moreover, as the last step, we need to know when all the data has been >> enriched so we can stop this first streaming job and we can start with the >> next one (that cannot run if the acquisition job is still in progress >> because it can break referential integrity). Is there any example of such a >> use case? >> >> [1] at the moment manually..maybe automatically in the future, right? >> [2] with the batch job if we want to change the parallelism we need to stop >> it and relaunch it, loosing all the already enriched data because there's no >> checkpointing there >> >> On Tue, Jun 6, 2017 at 4:46 PM, Aljoscha Krettek <aljos...@apache.org >> <mailto:aljos...@apache.org>> wrote: >> Hi Flavio, >> >> I’ll try and answer your questions: >> >> Regarding 1. Why do you need to first read the data from HDFS into Kafka (or >> another queue)? Using StreamExecutionEnvironment.readFile(FileInputFormat, >> String, FileProcessingMode, long) you can monitor a directory in HDFS and >> process the files that are there and any newly arriving files. For batching >> your output, you could look into the BucketingSink which will write to files >> in HDFS (or some other DFS) and start new files (buckets) based on some >> criteria, for example number of processed elements or time. >> >> Regarding 2. I didn’t completely understand this part. Could you maybe >> elaborate a bit, please? >> >> Regarding 3. Yes, I think you can. You would use this to fire of your >> queries to solr/ES. >> >> Best, >> Aljoscha >> >>> On 11. May 2017, at 15:06, Flavio Pompermaier <pomperma...@okkam.it >>> <mailto:pomperma...@okkam.it>> wrote: >>> >>> Hi to all, >>> we have a particular use case where we have a tabular dataset on HDFS (e.g. >>> a CSV) that we want to enrich filling some cells with the content returned >>> by a query to a reverse index (e.g. solr/elasticsearch). >>> Since we want to be able to make this process resilient and scalable we >>> thought that Flink streaming could be a good fit since we can control the >>> "pressure" on the index by adding/removing consumers dynamically and there >>> is automatic error recovery. >>> >>> Right now we developed 2 different solutions to the problem: >>> move the dataset from HDFS to a queue/topic (like Kafka or RabbitMQ) and >>> then let the queue consumers do the real job (pull Rows from the queue, >>> enrich and then persist the enriched Rows). The questions here are: >>> how to properly manage writing to HDFS ? if we read a set of rows, we >>> enrich them and we need to write the result back to HDFS, is it possible to >>> automatically compact files in order to avoid the "too many small files" >>> problem on HDFS? How to avoid file name collision (put each batch of rows >>> to a different file)? >>> how to control the number dynamically? is it possible to change the >>> parallelism once the job has started? >>> in order to avoid useless data transfer from HDFS to a queue/topic (since >>> we don't need all the Row fields to create the query..usually only 2/5 >>> fields are needed) we can create a Flink job that put the queries into a >>> queue/topic and wait for the result. The problem with this approach is: >>> how to correlate queries with their responses? creating a unique response >>> queue/topic implies that all consumers reads all messages (and discard >>> those that are not directed to them) while creating a queue/topic for each >>> sub-task could be expansive (in terms of resources and managment..but we >>> don't have any evidence/experience of this..it's just a possible problem). >>> Maybe we can exploit Flink async/IO somehow...? But how? >>> >>> Any suggestion/drawbacks on the 2 approaches? >>> >>> Thanks in advance, >>> Flavio >> >