In my use case I though to persist the dataset to reuse on Tachyon in order to speed up its reading..do you think it could help?
On Tue, Feb 16, 2016 at 10:28 PM, Saliya Ekanayake <esal...@gmail.com> wrote: > Thank you. I'll check this > > On Tue, Feb 16, 2016 at 4:01 PM, Fabian Hueske <fhue...@gmail.com> wrote: > >> Broadcasted DataSets are stored on the JVM heap of each task manager (but >> shared among multiple slots on the same TM), hence the size restriction. >> >> There are two ways to retrieve a DataSet (such as the result of a reduce). >> 1) if you want to fetch the result into your client program use >> DataSet.collect(). This immediately triggers an execution and fetches the >> result from the cluster. >> 2) if you want to use the result for a computation in the cluster use >> broadcast sets as described above. >> >> 2016-02-16 21:54 GMT+01:00 Saliya Ekanayake <esal...@gmail.com>: >> >>> Thank you, yes, this makes sense. The broadcasted data in my case would >>> a large array of 3D coordinates, >>> >>> On a side note, how can I take the output from a reduce function? I can >>> see methods to write it to a given output, but is it possible to retrieve >>> the reduced result back to the program - like a double value representing >>> the average in the previous example. >>> >>> >>> On Tue, Feb 16, 2016 at 3:47 PM, Fabian Hueske <fhue...@gmail.com> >>> wrote: >>> >>>> You can use so-called BroadcastSets to send any sufficiently small >>>> DataSet (such as a computed average) to any other function and use it >>>> there. >>>> However, in your case you'll end up with a data flow that branches (at >>>> the source) and merges again (when the average is send to the second map). >>>> Such patterns can cause deadlocks and can therefore not be pipelined >>>> which means that the data before the branch is written to disk and read >>>> again. >>>> In your case it might be even better to read the data twice instead of >>>> reading, writing, and reading it. >>>> >>>> Fabian >>>> >>>> 2016-02-16 21:15 GMT+01:00 Saliya Ekanayake <esal...@gmail.com>: >>>> >>>>> I looked at the samples and I think what you meant is clear, but I >>>>> didn't find a solution for my need. In my case, I want to use the result >>>>> from first map operation before I can apply the second map on the >>>>> *same* data set. For simplicity, let's say I've a bunch of short >>>>> values represented as my data set. Then I need to find their average, so I >>>>> use a map and reduce. Then I want to map these short values with another >>>>> function, but it needs that average computed in the beginning to work >>>>> correctly. >>>>> >>>>> Is this possible without doing multiple reads of the input data to >>>>> create the same dataset? >>>>> >>>>> Thank you, >>>>> saliya >>>>> >>>>> On Tue, Feb 16, 2016 at 12:03 PM, Fabian Hueske <fhue...@gmail.com> >>>>> wrote: >>>>> >>>>>> Yes, if you implement both maps in a single job, data is read once. >>>>>> >>>>>> 2016-02-16 15:53 GMT+01:00 Saliya Ekanayake <esal...@gmail.com>: >>>>>> >>>>>>> Fabian, >>>>>>> >>>>>>> I've a quick follow-up question on what you suggested. When >>>>>>> streaming the same data through different maps, were you implying that >>>>>>> everything goes as single job in Flink, so data read happens only once? >>>>>>> >>>>>>> Thanks, >>>>>>> Saliya >>>>>>> >>>>>>> On Mon, Feb 15, 2016 at 3:58 PM, Fabian Hueske <fhue...@gmail.com> >>>>>>> wrote: >>>>>>> >>>>>>>> It is not possible to "pin" data sets in memory, yet. >>>>>>>> However, you can stream the same data set through two different >>>>>>>> mappers at the same time. >>>>>>>> >>>>>>>> For instance you can have a job like: >>>>>>>> >>>>>>>> /---> Map 1 --> SInk1 >>>>>>>> Source --< >>>>>>>> \---> Map 2 --> SInk2 >>>>>>>> >>>>>>>> and execute it at once. >>>>>>>> For that you define you data flow and call execute once after all >>>>>>>> sinks have been created. >>>>>>>> >>>>>>>> Best, Fabian >>>>>>>> >>>>>>>> 2016-02-15 21:32 GMT+01:00 Saliya Ekanayake <esal...@gmail.com>: >>>>>>>> >>>>>>>>> Fabian, >>>>>>>>> >>>>>>>>> count() was just an example. What I would like to do is say run >>>>>>>>> two map operations on the dataset (ds). Each map will have it's own >>>>>>>>> reduction, so is there a way to avoid creating two jobs for such >>>>>>>>> scenario? >>>>>>>>> >>>>>>>>> The reason is, reading these binary matrices are expensive. In our >>>>>>>>> current MPI implementation, I am using memory maps for faster loading >>>>>>>>> and >>>>>>>>> reuse. >>>>>>>>> >>>>>>>>> Thank you, >>>>>>>>> Saliya >>>>>>>>> >>>>>>>>> On Mon, Feb 15, 2016 at 3:15 PM, Fabian Hueske <fhue...@gmail.com> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> Hi, >>>>>>>>>> >>>>>>>>>> it looks like you are executing two distinct Flink jobs. >>>>>>>>>> DataSet.count() triggers the execution of a new job. If you have >>>>>>>>>> an execute() call in your program, this will lead to two Flink jobs >>>>>>>>>> being >>>>>>>>>> executed. >>>>>>>>>> It is not possible to share state among these jobs. >>>>>>>>>> >>>>>>>>>> Maybe you should add a custom count implementation (using a >>>>>>>>>> ReduceFunction) which is executed in the same program as the other >>>>>>>>>> ReduceFunction. >>>>>>>>>> >>>>>>>>>> Best, Fabian >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> 2016-02-15 21:05 GMT+01:00 Saliya Ekanayake <esal...@gmail.com>: >>>>>>>>>> >>>>>>>>>>> Hi, >>>>>>>>>>> >>>>>>>>>>> I see that an InputFormat's open() and nextRecord() methods get >>>>>>>>>>> called for each terminal operation on a given dataset using that >>>>>>>>>>> particular >>>>>>>>>>> InputFormat. Is it possible to avoid this - possibly using some >>>>>>>>>>> caching >>>>>>>>>>> technique in Flink? >>>>>>>>>>> >>>>>>>>>>> For example, I've some code like below and I see for both the >>>>>>>>>>> last two statements (reduce() and count()) the above methods in the >>>>>>>>>>> input >>>>>>>>>>> format get called. Btw. this is a custom input format I wrote to >>>>>>>>>>> represent >>>>>>>>>>> a binary matrix stored as Short values. >>>>>>>>>>> >>>>>>>>>>> ShortMatrixInputFormat smif = new ShortMatrixInputFormat(); >>>>>>>>>>> >>>>>>>>>>> DataSet<Short[]> ds = env.createInput(smif, >>>>>>>>>>> BasicArrayTypeInfo.SHORT_ARRAY_TYPE_INFO); >>>>>>>>>>> >>>>>>>>>>> MapOperator<Short[], DoubleStatistics> op = ds.map(...) >>>>>>>>>>> >>>>>>>>>>> *op.reduce(...)* >>>>>>>>>>> >>>>>>>>>>> *op.count(...)* >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> Thank you, >>>>>>>>>>> Saliya >>>>>>>>>>> -- >>>>>>>>>>> Saliya Ekanayake >>>>>>>>>>> Ph.D. Candidate | Research Assistant >>>>>>>>>>> School of Informatics and Computing | Digital Science Center >>>>>>>>>>> Indiana University, Bloomington >>>>>>>>>>> Cell 812-391-4914 >>>>>>>>>>> http://saliya.org >>>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> -- >>>>>>>>> Saliya Ekanayake >>>>>>>>> Ph.D. Candidate | Research Assistant >>>>>>>>> School of Informatics and Computing | Digital Science Center >>>>>>>>> Indiana University, Bloomington >>>>>>>>> Cell 812-391-4914 >>>>>>>>> http://saliya.org >>>>>>>>> >>>>>>>> >>>>>>>> >>>>>>> >>>>>>> >>>>>>> -- >>>>>>> Saliya Ekanayake >>>>>>> Ph.D. Candidate | Research Assistant >>>>>>> School of Informatics and Computing | Digital Science Center >>>>>>> Indiana University, Bloomington >>>>>>> Cell 812-391-4914 >>>>>>> http://saliya.org >>>>>>> >>>>>> >>>>>> >>>>> >>>>> >>>>> -- >>>>> Saliya Ekanayake >>>>> Ph.D. Candidate | Research Assistant >>>>> School of Informatics and Computing | Digital Science Center >>>>> Indiana University, Bloomington >>>>> Cell 812-391-4914 >>>>> http://saliya.org >>>>> >>>> >>>> >>> >>> >>> -- >>> Saliya Ekanayake >>> Ph.D. Candidate | Research Assistant >>> School of Informatics and Computing | Digital Science Center >>> Indiana University, Bloomington >>> Cell 812-391-4914 >>> http://saliya.org >>> >> >> > > > -- > Saliya Ekanayake > Ph.D. Candidate | Research Assistant > School of Informatics and Computing | Digital Science Center > Indiana University, Bloomington > Cell 812-391-4914 > http://saliya.org >