In my use case I though to persist the dataset to reuse on Tachyon in order
to speed up its reading..do you think it could help?

On Tue, Feb 16, 2016 at 10:28 PM, Saliya Ekanayake <esal...@gmail.com>
wrote:

> Thank you. I'll check this
>
> On Tue, Feb 16, 2016 at 4:01 PM, Fabian Hueske <fhue...@gmail.com> wrote:
>
>> Broadcasted DataSets are stored on the JVM heap of each task manager (but
>> shared among multiple slots on the same TM), hence the size restriction.
>>
>> There are two ways to retrieve a DataSet (such as the result of a reduce).
>> 1) if you want to fetch the result into your client program use
>> DataSet.collect(). This immediately triggers an execution and fetches the
>> result from the cluster.
>> 2) if you want to use the result for a computation in the cluster use
>> broadcast sets as described above.
>>
>> 2016-02-16 21:54 GMT+01:00 Saliya Ekanayake <esal...@gmail.com>:
>>
>>> Thank you, yes, this makes sense. The broadcasted data in my case would
>>> a large array of 3D coordinates,
>>>
>>> On a side note, how can I take the output from a reduce function? I can
>>> see methods to write it to a given output, but is it possible to retrieve
>>> the reduced result back to the program - like a double value representing
>>> the average in the previous example.
>>>
>>>
>>> On Tue, Feb 16, 2016 at 3:47 PM, Fabian Hueske <fhue...@gmail.com>
>>> wrote:
>>>
>>>> You can use so-called BroadcastSets to send any sufficiently small
>>>> DataSet (such as a computed average) to any other function and use it 
>>>> there.
>>>> However, in your case you'll end up with a data flow that branches (at
>>>> the source) and merges again (when the average is send to the second map).
>>>> Such patterns can cause deadlocks and can therefore not be pipelined
>>>> which means that the data before the branch is written to disk and read
>>>> again.
>>>> In your case it might be even better to read the data twice instead of
>>>> reading, writing, and reading it.
>>>>
>>>> Fabian
>>>>
>>>> 2016-02-16 21:15 GMT+01:00 Saliya Ekanayake <esal...@gmail.com>:
>>>>
>>>>> I looked at the samples and I think what you meant is clear, but I
>>>>> didn't find a solution for my need. In my case, I want to use the result
>>>>> from first map operation before I can apply the second map on the
>>>>> *same* data set. For simplicity, let's say I've a bunch of short
>>>>> values represented as my data set. Then I need to find their average, so I
>>>>> use a map and reduce. Then I want to map these short values with another
>>>>> function, but it needs that average computed in the beginning to work
>>>>> correctly.
>>>>>
>>>>> Is this possible without doing multiple reads of the input data to
>>>>> create the same dataset?
>>>>>
>>>>> Thank you,
>>>>> saliya
>>>>>
>>>>> On Tue, Feb 16, 2016 at 12:03 PM, Fabian Hueske <fhue...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Yes, if you implement both maps in a single job, data is read once.
>>>>>>
>>>>>> 2016-02-16 15:53 GMT+01:00 Saliya Ekanayake <esal...@gmail.com>:
>>>>>>
>>>>>>> Fabian,
>>>>>>>
>>>>>>> I've a quick follow-up question on what you suggested. When
>>>>>>> streaming the same data through different maps, were you implying that
>>>>>>> everything goes as single job in Flink, so data read happens only once?
>>>>>>>
>>>>>>> Thanks,
>>>>>>> Saliya
>>>>>>>
>>>>>>> On Mon, Feb 15, 2016 at 3:58 PM, Fabian Hueske <fhue...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> It is not possible to "pin" data sets in memory, yet.
>>>>>>>> However, you can stream the same data set through two different
>>>>>>>> mappers at the same time.
>>>>>>>>
>>>>>>>> For instance you can have a job like:
>>>>>>>>
>>>>>>>>                  /---> Map 1 --> SInk1
>>>>>>>> Source --<
>>>>>>>>                  \---> Map 2 --> SInk2
>>>>>>>>
>>>>>>>> and execute it at once.
>>>>>>>> For that you define you data flow and call execute once after all
>>>>>>>> sinks have been created.
>>>>>>>>
>>>>>>>> Best, Fabian
>>>>>>>>
>>>>>>>> 2016-02-15 21:32 GMT+01:00 Saliya Ekanayake <esal...@gmail.com>:
>>>>>>>>
>>>>>>>>> Fabian,
>>>>>>>>>
>>>>>>>>> count() was just an example. What I would like to do is say run
>>>>>>>>> two map operations on the dataset (ds). Each map will have it's own
>>>>>>>>> reduction, so is there a way to avoid creating two jobs for such 
>>>>>>>>> scenario?
>>>>>>>>>
>>>>>>>>> The reason is, reading these binary matrices are expensive. In our
>>>>>>>>> current MPI implementation, I am using memory maps for faster loading 
>>>>>>>>> and
>>>>>>>>> reuse.
>>>>>>>>>
>>>>>>>>> Thank you,
>>>>>>>>> Saliya
>>>>>>>>>
>>>>>>>>> On Mon, Feb 15, 2016 at 3:15 PM, Fabian Hueske <fhue...@gmail.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Hi,
>>>>>>>>>>
>>>>>>>>>> it looks like you are executing two distinct Flink jobs.
>>>>>>>>>> DataSet.count() triggers the execution of a new job. If you have
>>>>>>>>>> an execute() call in your program, this will lead to two Flink jobs 
>>>>>>>>>> being
>>>>>>>>>> executed.
>>>>>>>>>> It is not possible to share state among these jobs.
>>>>>>>>>>
>>>>>>>>>> Maybe you should add a custom count implementation (using a
>>>>>>>>>> ReduceFunction) which is executed in the same program as the other
>>>>>>>>>> ReduceFunction.
>>>>>>>>>>
>>>>>>>>>> Best, Fabian
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> 2016-02-15 21:05 GMT+01:00 Saliya Ekanayake <esal...@gmail.com>:
>>>>>>>>>>
>>>>>>>>>>> Hi,
>>>>>>>>>>>
>>>>>>>>>>> I see that an InputFormat's open() and nextRecord() methods get
>>>>>>>>>>> called for each terminal operation on a given dataset using that 
>>>>>>>>>>> particular
>>>>>>>>>>> InputFormat. Is it possible to avoid this - possibly using some 
>>>>>>>>>>> caching
>>>>>>>>>>> technique in Flink?
>>>>>>>>>>>
>>>>>>>>>>> For example, I've some code like below and I see for both the
>>>>>>>>>>> last two statements (reduce() and count()) the above methods in the 
>>>>>>>>>>> input
>>>>>>>>>>> format get called. Btw. this is a custom input format I wrote to 
>>>>>>>>>>> represent
>>>>>>>>>>> a binary matrix stored as Short values.
>>>>>>>>>>>
>>>>>>>>>>> ShortMatrixInputFormat smif = new ShortMatrixInputFormat();
>>>>>>>>>>>
>>>>>>>>>>> DataSet<Short[]> ds = env.createInput(smif, 
>>>>>>>>>>> BasicArrayTypeInfo.SHORT_ARRAY_TYPE_INFO);
>>>>>>>>>>>
>>>>>>>>>>> MapOperator<Short[], DoubleStatistics> op = ds.map(...)
>>>>>>>>>>>
>>>>>>>>>>> *op.reduce(...)*
>>>>>>>>>>>
>>>>>>>>>>> *op.count(...)*
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> Thank you,
>>>>>>>>>>> Saliya
>>>>>>>>>>> --
>>>>>>>>>>> Saliya Ekanayake
>>>>>>>>>>> Ph.D. Candidate | Research Assistant
>>>>>>>>>>> School of Informatics and Computing | Digital Science Center
>>>>>>>>>>> Indiana University, Bloomington
>>>>>>>>>>> Cell 812-391-4914
>>>>>>>>>>> http://saliya.org
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> --
>>>>>>>>> Saliya Ekanayake
>>>>>>>>> Ph.D. Candidate | Research Assistant
>>>>>>>>> School of Informatics and Computing | Digital Science Center
>>>>>>>>> Indiana University, Bloomington
>>>>>>>>> Cell 812-391-4914
>>>>>>>>> http://saliya.org
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> Saliya Ekanayake
>>>>>>> Ph.D. Candidate | Research Assistant
>>>>>>> School of Informatics and Computing | Digital Science Center
>>>>>>> Indiana University, Bloomington
>>>>>>> Cell 812-391-4914
>>>>>>> http://saliya.org
>>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> Saliya Ekanayake
>>>>> Ph.D. Candidate | Research Assistant
>>>>> School of Informatics and Computing | Digital Science Center
>>>>> Indiana University, Bloomington
>>>>> Cell 812-391-4914
>>>>> http://saliya.org
>>>>>
>>>>
>>>>
>>>
>>>
>>> --
>>> Saliya Ekanayake
>>> Ph.D. Candidate | Research Assistant
>>> School of Informatics and Computing | Digital Science Center
>>> Indiana University, Bloomington
>>> Cell 812-391-4914
>>> http://saliya.org
>>>
>>
>>
>
>
> --
> Saliya Ekanayake
> Ph.D. Candidate | Research Assistant
> School of Informatics and Computing | Digital Science Center
> Indiana University, Bloomington
> Cell 812-391-4914
> http://saliya.org
>

Reply via email to