Hi,

I was referring to

StreamExecutionEnvironment.readFile(
  FileInputFormat<OUT> inputFormat,
  String filePath,
  FileProcessingMode watchType,
  long interval)

Where you can specify whether the source should shutdown once all files have 
been had (PROCESS_ONCE) or whether the source should continue to monitor the 
input directory for new files (PROCESS_CONTINUOUSLY).

I think there is currently no built-in way of removing files from the input 
directory once they have been processed because it’s not possible to know when 
the contents of a given files have passed through the complete pipeline.

Best,
Aljoscha

> On 15. Jun 2017, at 20:00, Flavio Pompermaier <pomperma...@okkam.it> wrote:
> 
> Hi Aljosha,
> thanks for the great suggestions, I wasn't aware of 
> AsyncDataStream.unorderedWait and BucketingSink setBucketer().
> Most probably that's exactly what I was looking for...(I should just have the 
> time to test it. 
> Just one last question: what are you referring to with "you could use a 
> different readFile() method where you can specify  that you want to continue 
> monitoring the directory for new files"?  Is there a way to delete or move to 
> a backup dir the new input files once enriched?
> 
> Best Flavio
> 
> 
> 
> On Thu, Jun 15, 2017 at 2:30 PM, Aljoscha Krettek <aljos...@apache.org 
> <mailto:aljos...@apache.org>> wrote:
> Ok, just trying to make sure I understand everything: You have this:
> 
> 1. A bunch of data in HDFS that you want to enrich
> 2. An external service (Solr/ES) that you query for enriching the data rows 
> stored in 1.
> 3. You need to store the enriched rows in HDFS again
> 
> I think you could just do this (roughly):
> 
> StreamExecutionEnvironment env = …;
> 
> DataStream<Row> input = env.readFile(new RowCsvInputFormat(…), “<hdfs path>”);
> 
> DataStream<Row> enriched = input.flatMap(new MyEnricherThatCallsES());
> // or
> DataStream<Row> enriched = AsyncDataStream.unorderedWait(input, …) // yes, 
> the interface for this is a bit strange
> 
> BucketingSink sink = new BucketingSink(“<hdfs sink path>”);
> // this is responsible for putting files into buckets, so that you don’t have 
> to many small HDFS files
> sink.setBucketer(new MyBucketer());
> enriched.addSink(sink)
> 
> In this case, the file source will close once all files are read and the job 
> will finish. If you don’t want this you can also use a different readFile() 
> method where you can specify  that you want to continue monitoring the 
> directory for new files.
> 
> Best,
> Aljoscha
> 
>> On 6. Jun 2017, at 17:38, Flavio Pompermaier <pomperma...@okkam.it 
>> <mailto:pomperma...@okkam.it>> wrote:
>> 
>> Hi Aljosha,
>> thanks for getting back to me on this! I'll try to simplify the thread 
>> starting from what we want to achieve.
>> 
>> At the moment we execute some queries to a db and we store the data into 
>> Parquet directories (one for each query). 
>> Let's say we then create a DataStream<Row> from each dir, what we would like 
>> to achieve is to perform some sort of throttling of the queries to perfrom 
>> to this external service (in order to not overload it with too many 
>> queries...but we also need to run as much queries as possible in order to 
>> execute this process in a reasonable time). 
>> 
>> The current batch process has the downside that you must know at priori the 
>> right parallelism of the job while the streaming process should be able to 
>> rescale when needed [1] so it should be easier to tune the job parallelism 
>> without loosing all the already performed queries [2]. Moreover, it the job 
>> crash you loose all the work done up to that moment because there's no 
>> checkpointing...
>> My initial idea was to read from HDFS and put the data into Kafka to be able 
>> to change the number of consumers at runtime (accordingly to the maxmimum 
>> parallelism we can achieve with the external service) but maybe this could 
>> be done in a easier way (we've started using streaming from a few time so we 
>> can see things more complicated than they are).
>> 
>> Moreover, as the last step, we need to know when all the data has been 
>> enriched so we can stop this first streaming job and we can start with the 
>> next one (that cannot run if the acquisition job is still in progress 
>> because it can break referential integrity). Is there any example of such a 
>> use case?
>> 
>> [1] at the moment manually..maybe automatically in the future, right?
>> [2] with the batch job if we want to change the parallelism we need to stop 
>> it and relaunch it, loosing all the already enriched data because there's no 
>> checkpointing there
>> 
>> On Tue, Jun 6, 2017 at 4:46 PM, Aljoscha Krettek <aljos...@apache.org 
>> <mailto:aljos...@apache.org>> wrote:
>> Hi Flavio,
>> 
>> I’ll try and answer your questions:
>> 
>> Regarding 1. Why do you need to first read the data from HDFS into Kafka (or 
>> another queue)? Using StreamExecutionEnvironment.readFile(FileInputFormat, 
>> String, FileProcessingMode, long) you can monitor a directory in HDFS and 
>> process the files that are there and any newly arriving files. For batching 
>> your output, you could look into the BucketingSink which will write to files 
>> in HDFS (or some other DFS) and start new files (buckets) based on some 
>> criteria, for example number of processed elements or time.
>> 
>> Regarding 2. I didn’t completely understand this part. Could you maybe 
>> elaborate a bit, please?
>> 
>> Regarding 3. Yes, I think you can. You would use this to fire of your 
>> queries to solr/ES.
>> 
>> Best,
>> Aljoscha
>> 
>>> On 11. May 2017, at 15:06, Flavio Pompermaier <pomperma...@okkam.it 
>>> <mailto:pomperma...@okkam.it>> wrote:
>>> 
>>> Hi to all,
>>> we have a particular use case where we have a tabular dataset on HDFS (e.g. 
>>> a CSV) that we want to enrich filling some cells with the content returned 
>>> by a query to a reverse index (e.g. solr/elasticsearch). 
>>> Since we want to be able to make this process resilient and scalable we 
>>> thought that Flink streaming could be a good fit since we can control the 
>>> "pressure" on the index by adding/removing consumers dynamically and there 
>>> is automatic error recovery. 
>>> 
>>> Right now we developed 2 different solutions to the problem:
>>> move the dataset from HDFS to a queue/topic (like Kafka or RabbitMQ) and 
>>> then let the queue consumers do the real job (pull Rows from the queue, 
>>> enrich and then persist the enriched Rows). The questions here are:
>>> how to properly manage writing to HDFS ? if we read a set of rows, we 
>>> enrich them and we need to write the result back to HDFS, is it possible to 
>>> automatically compact files in order to avoid the "too many small files" 
>>> problem on HDFS? How to avoid file name collision (put each batch of rows 
>>> to a different file)?
>>> how to control the number dynamically? is it possible to change the 
>>> parallelism once the job has started?
>>> in order to avoid useless data transfer from HDFS to a queue/topic (since 
>>> we don't need all the Row fields to create the query..usually only 2/5 
>>> fields are needed) we can create a Flink job that put the queries into a 
>>> queue/topic and wait for the result. The problem with this approach is:
>>> how to correlate queries with their responses? creating a unique response 
>>> queue/topic implies that all consumers reads all messages (and discard 
>>> those that are not directed to them) while creating a queue/topic for each 
>>> sub-task could be expansive (in terms of resources and managment..but we 
>>> don't have any evidence/experience of this..it's just a possible problem).
>>> Maybe we can exploit Flink async/IO somehow...? But how? 
>>> 
>>> Any suggestion/drawbacks on the 2 approaches?
>>> 
>>> Thanks in advance,
>>> Flavio
>> 
> 

Reply via email to