Understood..Thanks anyway Aljoscha! On Fri, Jun 16, 2017 at 11:55 AM, Aljoscha Krettek <aljos...@apache.org> wrote:
> Hi, > > The problem with that is that the file is being read by (possibly, very > likely) multiple operators in parallel. The file source works like this: > there is a ContinuousFileMonitoringFunction (this is an actual Flink > source) that monitors a directory and when a new file appears sends several > (non overlapping) input splits downstream. After the source, there is an > operator (ContinuousFileReaderOperator) that receives splits, reads the > contents of the file at the split and sends it downstream. There is thus no > central point where we would know that a file was completely processed. > > Best, > Aljoscha > > On 16. Jun 2017, at 11:26, Flavio Pompermaier <pomperma...@okkam.it> > wrote: > > Is it really necessary to wait for the file to reach the end of the > pipeline? Isn't sufficient to know that it has been read and the source > operator has been checkpointed (I don't know if I'm using this word > correctly...I mean that all the file splits has been processed and Flink > won't reprocess them anymore). > > Best, > Flavio > > On Fri, Jun 16, 2017 at 11:22 AM, Aljoscha Krettek <aljos...@apache.org> > wrote: > >> Hi, >> >> I was referring to >> >> StreamExecutionEnvironment.readFile( >> FileInputFormat<OUT> inputFormat, >> String filePath, >> FileProcessingMode watchType, >> long interval) >> >> Where you can specify whether the source should shutdown once all files >> have been had (PROCESS_ONCE) or whether the source should continue to >> monitor the input directory for new files (PROCESS_CONTINUOUSLY). >> >> I think there is currently no built-in way of removing files from the >> input directory once they have been processed because it’s not possible to >> know when the contents of a given files have passed through the complete >> pipeline. >> >> Best, >> Aljoscha >> >> On 15. Jun 2017, at 20:00, Flavio Pompermaier <pomperma...@okkam.it> >> wrote: >> >> Hi Aljosha, >> thanks for the great suggestions, I wasn't aware of >> AsyncDataStream.unorderedWait and BucketingSink setBucketer(). >> Most probably that's exactly what I was looking for...(I should just have >> the time to test it. >> Just one last question: what are you referring to with "you could use a >> different readFile() method where you can specify that you want to >> continue monitoring the directory for new files"? Is there a way to delete >> or move to a backup dir the new input files once enriched? >> >> Best Flavio >> >> >> >> On Thu, Jun 15, 2017 at 2:30 PM, Aljoscha Krettek <aljos...@apache.org> >> wrote: >> >>> Ok, just trying to make sure I understand everything: You have this: >>> >>> 1. A bunch of data in HDFS that you want to enrich >>> 2. An external service (Solr/ES) that you query for enriching the data >>> rows stored in 1. >>> 3. You need to store the enriched rows in HDFS again >>> >>> I think you could just do this (roughly): >>> >>> StreamExecutionEnvironment env = …; >>> >>> DataStream<Row> input = env.readFile(new RowCsvInputFormat(…), “<hdfs >>> path>”); >>> >>> DataStream<Row> enriched = input.flatMap(new MyEnricherThatCallsES()); >>> // or >>> DataStream<Row> enriched = AsyncDataStream.unorderedWait(input, …) // >>> yes, the interface for this is a bit strange >>> >>> BucketingSink sink = new BucketingSink(“<hdfs sink path>”); >>> // this is responsible for putting files into buckets, so that you don’t >>> have to many small HDFS files >>> sink.setBucketer(new MyBucketer()); >>> enriched.addSink(sink) >>> >>> In this case, the file source will close once all files are read and the >>> job will finish. If you don’t want this you can also use a different >>> readFile() method where you can specify that you want to continue >>> monitoring the directory for new files. >>> >>> Best, >>> Aljoscha >>> >>> On 6. Jun 2017, at 17:38, Flavio Pompermaier <pomperma...@okkam.it> >>> wrote: >>> >>> Hi Aljosha, >>> thanks for getting back to me on this! I'll try to simplify the thread >>> starting from what we want to achieve. >>> >>> At the moment we execute some queries to a db and we store the data into >>> Parquet directories (one for each query). >>> Let's say we then create a DataStream<Row> from each dir, what we would >>> like to achieve is to perform some sort of throttling of the queries to >>> perfrom to this external service (in order to not overload it with too many >>> queries...but we also need to run as much queries as possible in order to >>> execute this process in a reasonable time). >>> >>> The current batch process has the downside that you must know at priori >>> the right parallelism of the job while the streaming process should be able >>> to rescale when needed [1] so it should be easier to tune the job >>> parallelism without loosing all the already performed queries [2]. >>> Moreover, it the job crash you loose all the work done up to that moment >>> because there's no checkpointing... >>> My initial idea was to read from HDFS and put the data into Kafka to be >>> able to change the number of consumers at runtime (accordingly to the >>> maxmimum parallelism we can achieve with the external service) but maybe >>> this could be done in a easier way (we've started using streaming from a >>> few time so we can see things more complicated than they are). >>> >>> Moreover, as the last step, we need to know when all the data has been >>> enriched so we can stop this first streaming job and we can start with the >>> next one (that cannot run if the acquisition job is still in progress >>> because it can break referential integrity). Is there any example of such a >>> use case? >>> >>> [1] at the moment manually..maybe automatically in the future, right? >>> [2] with the batch job if we want to change the parallelism we need to >>> stop it and relaunch it, loosing all the already enriched data because >>> there's no checkpointing there >>> >>> On Tue, Jun 6, 2017 at 4:46 PM, Aljoscha Krettek <aljos...@apache.org> >>> wrote: >>> >>>> Hi Flavio, >>>> >>>> I’ll try and answer your questions: >>>> >>>> Regarding 1. Why do you need to first read the data from HDFS into >>>> Kafka (or another queue)? Using >>>> StreamExecutionEnvironment.readFile(FileInputFormat, >>>> String, FileProcessingMode, long) you can monitor a directory in HDFS and >>>> process the files that are there and any newly arriving files. For batching >>>> your output, you could look into the BucketingSink which will write to >>>> files in HDFS (or some other DFS) and start new files (buckets) based on >>>> some criteria, for example number of processed elements or time. >>>> >>>> Regarding 2. I didn’t completely understand this part. Could you maybe >>>> elaborate a bit, please? >>>> >>>> Regarding 3. Yes, I think you can. You would use this to fire of your >>>> queries to solr/ES. >>>> >>>> Best, >>>> Aljoscha >>>> >>>> On 11. May 2017, at 15:06, Flavio Pompermaier <pomperma...@okkam.it> >>>> wrote: >>>> >>>> Hi to all, >>>> we have a particular use case where we have a tabular dataset on HDFS >>>> (e.g. a CSV) that we want to enrich filling some cells with the content >>>> returned by a query to a reverse index (e.g. solr/elasticsearch). >>>> Since we want to be able to make this process resilient and scalable we >>>> thought that Flink streaming could be a good fit since we can control the >>>> "pressure" on the index by adding/removing consumers dynamically and there >>>> is automatic error recovery. >>>> >>>> Right now we developed 2 different solutions to the problem: >>>> >>>> 1. *move the dataset from HDFS to a queue/topic* (like Kafka or >>>> RabbitMQ) and then let the queue consumers do the real job (pull Rows >>>> from >>>> the queue, enrich and then persist the enriched Rows). The questions >>>> here >>>> are: >>>> 1. how to properly manage writing to HDFS ? if we read a set of >>>> rows, we enrich them and we need to write the result back to HDFS, >>>> is it >>>> possible to automatically compact files in order to avoid the "too >>>> many >>>> small files" problem on HDFS? How to avoid file name collision (put >>>> each >>>> batch of rows to a different file)? >>>> 2. how to control the number dynamically? is it possible to >>>> change the parallelism once the job has started? >>>> 2. in order to avoid useless data transfer from HDFS to a >>>> queue/topic (since we don't need all the Row fields to create the >>>> query..usually only 2/5 fields are needed) we can create a Flink job >>>> that >>>> put the q*ueries into a queue/topic *and wait for the result. The >>>> problem with this approach is: >>>> 1. how to correlate queries with their responses? creating a >>>> unique response queue/topic implies that all consumers reads all >>>> messages >>>> (and discard those that are not directed to them) while creating a >>>> queue/topic for each sub-task could be expansive (in terms of >>>> resources and >>>> managment..but we don't have any evidence/experience of this..it's >>>> just a >>>> possible problem). >>>> 3. Maybe we can exploit *Flink async/IO *somehow...? But how? >>>> >>>> >>>> Any suggestion/drawbacks on the 2 approaches? >>>> >>>> Thanks in advance, >>>> Flavio >>>> >>>> >>>> >> >> > >