Understood..Thanks anyway Aljoscha!

On Fri, Jun 16, 2017 at 11:55 AM, Aljoscha Krettek <aljos...@apache.org>
wrote:

> Hi,
>
> The problem with that is that the file is being read by (possibly, very
> likely) multiple operators in parallel. The file source works like this:
> there is a ContinuousFileMonitoringFunction (this is an actual Flink
> source) that monitors a directory and when a new file appears sends several
> (non overlapping) input splits downstream. After the source, there is an
> operator (ContinuousFileReaderOperator) that receives splits, reads the
> contents of the file at the split and sends it downstream. There is thus no
> central point where we would know that a file was completely processed.
>
> Best,
> Aljoscha
>
> On 16. Jun 2017, at 11:26, Flavio Pompermaier <pomperma...@okkam.it>
> wrote:
>
> Is it really necessary to wait for the file to reach the end of the
> pipeline? Isn't sufficient to know that it has been read and the source
> operator has been checkpointed (I don't know if I'm using this word
> correctly...I mean that all the file splits has been processed and Flink
> won't reprocess them anymore).
>
> Best,
> Flavio
>
> On Fri, Jun 16, 2017 at 11:22 AM, Aljoscha Krettek <aljos...@apache.org>
> wrote:
>
>> Hi,
>>
>> I was referring to
>>
>> StreamExecutionEnvironment.readFile(
>>   FileInputFormat<OUT> inputFormat,
>>   String filePath,
>>   FileProcessingMode watchType,
>>   long interval)
>>
>> Where you can specify whether the source should shutdown once all files
>> have been had (PROCESS_ONCE) or whether the source should continue to
>> monitor the input directory for new files (PROCESS_CONTINUOUSLY).
>>
>> I think there is currently no built-in way of removing files from the
>> input directory once they have been processed because it’s not possible to
>> know when the contents of a given files have passed through the complete
>> pipeline.
>>
>> Best,
>> Aljoscha
>>
>> On 15. Jun 2017, at 20:00, Flavio Pompermaier <pomperma...@okkam.it>
>> wrote:
>>
>> Hi Aljosha,
>> thanks for the great suggestions, I wasn't aware of
>> AsyncDataStream.unorderedWait and BucketingSink setBucketer().
>> Most probably that's exactly what I was looking for...(I should just have
>> the time to test it.
>> Just one last question: what are you referring to with "you could use a
>> different readFile() method where you can specify  that you want to
>> continue monitoring the directory for new files"?  Is there a way to delete
>> or move to a backup dir the new input files once enriched?
>>
>> Best Flavio
>>
>>
>>
>> On Thu, Jun 15, 2017 at 2:30 PM, Aljoscha Krettek <aljos...@apache.org>
>> wrote:
>>
>>> Ok, just trying to make sure I understand everything: You have this:
>>>
>>> 1. A bunch of data in HDFS that you want to enrich
>>> 2. An external service (Solr/ES) that you query for enriching the data
>>> rows stored in 1.
>>> 3. You need to store the enriched rows in HDFS again
>>>
>>> I think you could just do this (roughly):
>>>
>>> StreamExecutionEnvironment env = …;
>>>
>>> DataStream<Row> input = env.readFile(new RowCsvInputFormat(…), “<hdfs
>>> path>”);
>>>
>>> DataStream<Row> enriched = input.flatMap(new MyEnricherThatCallsES());
>>> // or
>>> DataStream<Row> enriched = AsyncDataStream.unorderedWait(input, …) //
>>> yes, the interface for this is a bit strange
>>>
>>> BucketingSink sink = new BucketingSink(“<hdfs sink path>”);
>>> // this is responsible for putting files into buckets, so that you don’t
>>> have to many small HDFS files
>>> sink.setBucketer(new MyBucketer());
>>> enriched.addSink(sink)
>>>
>>> In this case, the file source will close once all files are read and the
>>> job will finish. If you don’t want this you can also use a different
>>> readFile() method where you can specify  that you want to continue
>>> monitoring the directory for new files.
>>>
>>> Best,
>>> Aljoscha
>>>
>>> On 6. Jun 2017, at 17:38, Flavio Pompermaier <pomperma...@okkam.it>
>>> wrote:
>>>
>>> Hi Aljosha,
>>> thanks for getting back to me on this! I'll try to simplify the thread
>>> starting from what we want to achieve.
>>>
>>> At the moment we execute some queries to a db and we store the data into
>>> Parquet directories (one for each query).
>>> Let's say we then create a DataStream<Row> from each dir, what we would
>>> like to achieve is to perform some sort of throttling of the queries to
>>> perfrom to this external service (in order to not overload it with too many
>>> queries...but we also need to run as much queries as possible in order to
>>> execute this process in a reasonable time).
>>>
>>> The current batch process has the downside that you must know at priori
>>> the right parallelism of the job while the streaming process should be able
>>> to rescale when needed [1] so it should be easier to tune the job
>>> parallelism without loosing all the already performed queries [2].
>>> Moreover, it the job crash you loose all the work done up to that moment
>>> because there's no checkpointing...
>>> My initial idea was to read from HDFS and put the data into Kafka to be
>>> able to change the number of consumers at runtime (accordingly to the
>>> maxmimum parallelism we can achieve with the external service) but maybe
>>> this could be done in a easier way (we've started using streaming from a
>>> few time so we can see things more complicated than they are).
>>>
>>> Moreover, as the last step, we need to know when all the data has been
>>> enriched so we can stop this first streaming job and we can start with the
>>> next one (that cannot run if the acquisition job is still in progress
>>> because it can break referential integrity). Is there any example of such a
>>> use case?
>>>
>>> [1] at the moment manually..maybe automatically in the future, right?
>>> [2] with the batch job if we want to change the parallelism we need to
>>> stop it and relaunch it, loosing all the already enriched data because
>>> there's no checkpointing there
>>>
>>> On Tue, Jun 6, 2017 at 4:46 PM, Aljoscha Krettek <aljos...@apache.org>
>>> wrote:
>>>
>>>> Hi Flavio,
>>>>
>>>> I’ll try and answer your questions:
>>>>
>>>> Regarding 1. Why do you need to first read the data from HDFS into
>>>> Kafka (or another queue)? Using 
>>>> StreamExecutionEnvironment.readFile(FileInputFormat,
>>>> String, FileProcessingMode, long) you can monitor a directory in HDFS and
>>>> process the files that are there and any newly arriving files. For batching
>>>> your output, you could look into the BucketingSink which will write to
>>>> files in HDFS (or some other DFS) and start new files (buckets) based on
>>>> some criteria, for example number of processed elements or time.
>>>>
>>>> Regarding 2. I didn’t completely understand this part. Could you maybe
>>>> elaborate a bit, please?
>>>>
>>>> Regarding 3. Yes, I think you can. You would use this to fire of your
>>>> queries to solr/ES.
>>>>
>>>> Best,
>>>> Aljoscha
>>>>
>>>> On 11. May 2017, at 15:06, Flavio Pompermaier <pomperma...@okkam.it>
>>>> wrote:
>>>>
>>>> Hi to all,
>>>> we have a particular use case where we have a tabular dataset on HDFS
>>>> (e.g. a CSV) that we want to enrich filling some cells with the content
>>>> returned by a query to a reverse index (e.g. solr/elasticsearch).
>>>> Since we want to be able to make this process resilient and scalable we
>>>> thought that Flink streaming could be a good fit since we can control the
>>>> "pressure" on the index by adding/removing consumers dynamically and there
>>>> is automatic error recovery.
>>>>
>>>> Right now we developed 2 different solutions to the problem:
>>>>
>>>>    1. *move the dataset from HDFS to a queue/topic* (like Kafka or
>>>>    RabbitMQ) and then let the queue consumers do the real job (pull Rows 
>>>> from
>>>>    the queue, enrich and then persist the enriched Rows). The questions 
>>>> here
>>>>    are:
>>>>       1. how to properly manage writing to HDFS ? if we read a set of
>>>>       rows, we enrich them and we need to write the result back to HDFS, 
>>>> is it
>>>>       possible to automatically compact files in order to avoid the "too 
>>>> many
>>>>       small files" problem on HDFS? How to avoid file name collision (put 
>>>> each
>>>>       batch of rows to a different file)?
>>>>       2. how to control the number dynamically? is it possible to
>>>>       change the parallelism once the job has started?
>>>>       2. in order to avoid useless data transfer from HDFS to a
>>>>    queue/topic (since we don't need all the Row fields to create the
>>>>    query..usually only 2/5 fields are needed) we can create a Flink job 
>>>> that
>>>>    put the q*ueries into a queue/topic *and wait for the result. The
>>>>    problem with this approach is:
>>>>       1. how to correlate queries with their responses? creating a
>>>>       unique response queue/topic implies that all consumers reads all 
>>>> messages
>>>>       (and discard those that are not directed to them) while creating a
>>>>       queue/topic for each sub-task could be expansive (in terms of 
>>>> resources and
>>>>       managment..but we don't have any evidence/experience of this..it's 
>>>> just a
>>>>       possible problem).
>>>>    3. Maybe we can exploit *Flink async/IO *somehow...? But how?
>>>>
>>>>
>>>> Any suggestion/drawbacks on the 2 approaches?
>>>>
>>>> Thanks in advance,
>>>> Flavio
>>>>
>>>>
>>>>
>>
>>
>
>

Reply via email to