Maybe I won’t try to broadcast my dataset after all : I finally found again 
what made me implement it with my own cloning flatmap + partitioning :

Quoted from 
https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/batch/index.html#broadcast-variables

Note: As the content of broadcast variables is kept in-memory on each node, it 
should not become too large. For simpler things like scalar values you can 
simply make parameters part of the closure of a function, or use the 
withParameters(...) method to pass in a configuration.

From: Gwenhael Pasquiers [mailto:gwenhael.pasqui...@ericsson.com]
Sent: vendredi 3 mars 2017 18:10
To: user@flink.apache.org
Subject: RE: Cross operation on two huge datasets

To answer Ankit,

It is a batch application.

Yes, I admit I did broadcasting by hand. I did it that way because the only 
other way I found to “broadcast” a DataSet was to use “withBroadcast”, and I 
was afraid that “withBroadcast” would make flink load the whole dataset in 
memory before broadcasting it rather than sending its elements 1 by 1.

I’ll try to use it, I’ll take anything that will make my code cleaner !

From: Gwenhael Pasquiers [mailto:gwenhael.pasqui...@ericsson.com]
Sent: vendredi 3 mars 2017 17:55
To: user@flink.apache.org<mailto:user@flink.apache.org>
Subject: RE: Cross operation on two huge datasets

I tried putting my structure in a dataset but when serializing  kryo went in an 
infinite recursive loop (crashed in StackOverflowException). So I’m staying 
with the static reference.

As for the partitioning, there is always the case of shapes overlapping on both 
right and left sections, I think it would take quite a bit of effort to 
implement. And it’s always better if I don’t have to manually set a frontier 
between the two (n) zones

From: Xingcan Cui [mailto:xingc...@gmail.com]
Sent: vendredi 3 mars 2017 02:40
To: user@flink.apache.org<mailto:user@flink.apache.org>
Subject: Re: Cross operation on two huge datasets

Hi Gwen,

in my view, indexing and searching are two isolated processes and they should 
be separated. Maybe you should take the RTree structure as a new dataset 
(fortunately it's static, right?) and store it to a distributed cache or DFS 
that can be accessed by operators from any nodes. That will make the mapping 
from index partition to operator consistent (regardless of the locality 
problem).

Besides, you can make a "weak" index first, e.g., partitioning the points and 
shapes to "left" and "right", and in that way you do not need to broadcast the 
points to all index nodes (only left to left and right to right).

Best,
Xingcan

On Fri, Mar 3, 2017 at 1:49 AM, Jain, Ankit 
<ankit.j...@here.com<mailto:ankit.j...@here.com>> wrote:
If I understood correctly, you have just implemented flink broadcasting by hand 
☺.

You are still sending out the whole points dataset to each shape partition – 
right?

I think this could be optimized by using a keyBy or custom partition which is 
common across shapes & points – that should make sure a given point always go 
to same shape node.

I didn’t understand why Till Rohrmann said “you don’t know where Flink will 
schedule the new operator instance” – new operators are created when flink job 
is started – right? So, there should be no more new operators once the job is 
running and if you use consistent hash partitioning, same input should always 
end at same task manager node.

You could store the output as Flink State – that would be more fault tolerant 
but storing it as cache in JVM should work too.

Is this a batch job or streaming?

Between I am a newbee to Flink, still only learning – so take my suggestions 
with caution ☺

Thanks
Ankit

From: Gwenhael Pasquiers 
<gwenhael.pasqui...@ericsson.com<mailto:gwenhael.pasqui...@ericsson.com>>
Date: Thursday, March 2, 2017 at 7:28 AM
To: "user@flink.apache.org<mailto:user@flink.apache.org>" 
<user@flink.apache.org<mailto:user@flink.apache.org>>
Subject: RE: Cross operation on two huge datasets

I made it so that I don’t care where the next operator will be scheduled.

I configured taskslots = 1 and parallelism = yarnnodes so that :

•         Each node contains 1/N th  of the shapes (simple repartition() of the 
shapes dataset).

•         The points will be cloned so that each partition of the points 
dataset will contain the whole original dataset

o   Flatmap creates “#parallelism” clones of each entry

o   Custom partitioning so that each clone of each entry is sent to a different 
partition

That way, whatever flink choses to do, each point will be compared to each 
shape. That’s why I think that in my case I can keep it in the JVM without 
issues. I’d prefer to avoid ser/deser-ing that structure.

I tried to use join (all items have same key) but it looks like flink tried to 
serialize the RTree anyway and it went in StackOverflowError (locally with only 
1 parititon, not even on yarn).


From: Till Rohrmann [mailto:trohrm...@apache.org<mailto:trohrm...@apache.org>]
Sent: jeudi 2 mars 2017 15:40
To: user@flink.apache.org<mailto:user@flink.apache.org>
Subject: Re: Cross operation on two huge datasets


Yes you’re right about the “split” and broadcasting.

Storing it in the JVM is not a good approach, since you don’t know where Flink 
will schedule the new operator instance. It might be the case that an operator 
responsible for another partition gets scheduled to this JVM and then it has 
the wrong RTree information. Maybe you can model the set of RTrees as a 
DataSet[(PartitionKey, RTree)] and then join with the partitioned point data 
set.

Cheers,
Till

On Thu, Mar 2, 2017 at 3:29 PM, Gwenhael Pasquiers 
[gwenhael.pasqui...@ericsson.com](mailto:gwenhael.pasqui...@ericsson.com)<http://mailto:%5bgwenhael.pasqui...@ericsson.com%5D(mailto:gwenhael.pasqui...@ericsson.com)>
 wrote:
The best for me would be to make it “persist” inside of the JVM heap in some 
map since I don’t even know if the structure is Serializable (I could try). But 
I understand.

As for broadcasting, wouldn’t broadcasting the variable cancel the efforts I 
did to “split” the dataset parsing over the nodes ?


From: Till Rohrmann [mailto:trohrm...@apache.org<mailto:trohrm...@apache.org>]
Sent: jeudi 2 mars 2017 14:42

To: user@flink.apache.org<mailto:user@flink.apache.org>
Subject: Re: Cross operation on two huge datasets


Hi Gwenhael,

if you want to persist operator state, then you would have to persist it (e.g. 
writing to a shared directory or emitting the model and using one of Flink’s 
sinks) and when creating the new operators you have to reread it from there 
(usually in the open method or from a Flink source as part of a broadcasted 
data set).

If you want to give a data set to all instances of an operator, then you should 
broadcast this data set. You can do something like

DataSet<Integer> input = ...

DataSet<Integer> broadcastSet = ...



input.flatMap(new RichFlatMapFunction<Integer, Integer>() {

    List<Integer> broadcastSet;



    @Override

    public void open(Configuration configuration) {

        broadcastSet = getRuntimeContext().getBroadcastVariable("broadcast");

    }



    @Override

    public void flatMap(Integer integer, Collector<Integer> collector) throws 
Exception {



    }

}).withBroadcastSet(broadcastSet, "broadcast");

Cheers,
Till
​

On Thu, Mar 2, 2017 at 12:12 PM, Gwenhael Pasquiers 
<gwenhael.pasqui...@ericsson.com<mailto:gwenhael.pasqui...@ericsson.com>> wrote:
I (almost) made it work the following way:

1rst job : Read all the shapes, repartition() them equally on my N nodes, then 
on each node fill a static RTree (thanks for the tip).

2nd job : Read all the points, use a flatmap + custom partitioner to “clone” 
the dataset to all nodes, then apply a simple flatmap that will use the 
previously initialized static RTree, adding the Shape information to the point. 
Then do a groupBy to merge the points that were inside of multiple shapes.

This works very well in a local runtime but fails on yarn because it seems that 
the taskmanager reloads the jar file between two jobs, making me lose my static 
RTree (I guess that newly loaded class overwrites the older one).

I have two questions :

-          Is there a way to avoid that jar reload // can I store my RTree 
somewhere in jdk or flink, locally to the taskmanager, in a way that it 
wouldn’t be affected by the jar reload (since it would not be stored in any 
class from MY jar)?

o   I could also try to do it in a single job, but I don’t know how to ensure 
that some operations are done (parsing of shape) BEFORE starting others 
handling the points.

-          Is there a way to do that in a clean way using flink operators ? I’d 
need to be able to use the result of the iteration of a dataset inside of my 
map.

Something like :

datasetA.flatmap(new MyMapOperator(datasetB))…

And In my implementation I would be able to iterate the whole datasetB BEFORE 
doing any operation in datasetA. That way I could parse all my shapes in an 
RTree before handling my points, without relying on static

Or any other way that would allow me to do something similar.

Thanks in advance for your insight.

Gwen’

From: Jain, Ankit [mailto:ankit.j...@here.com<mailto:ankit.j...@here.com>]
Sent: jeudi 23 février 2017 19:21
To: user@flink.apache.org<mailto:user@flink.apache.org>
Cc: Fabian Hueske <fhue...@gmail.com<mailto:fhue...@gmail.com>>

Subject: Re: Cross operation on two huge datasets

Hi Gwen,
I would recommend looking into a data structure called RTree that is designed 
specifically for this use case, i.e matching point to a region.

Thanks
Ankit

From: Fabian Hueske <fhue...@gmail.com<mailto:fhue...@gmail.com>>
Date: Wednesday, February 22, 2017 at 2:41 PM
To: <user@flink.apache.org<mailto:user@flink.apache.org>>
Subject: Re: Cross operation on two huge datasets

Hi Gwen,
Flink usually performs a block nested loop join to cross two data sets.
This algorithm spills one input to disk and streams the other input. For each 
input it fills a memory buffer and to perform the cross. Then the buffer of the 
spilled input is refilled with spilled records and records are again crossed. 
This is done until one iteration over the spill records is done. Then the other 
buffer of the streamed input is filled with the next records.
You should be aware that cross is a super expensive operation, especially if 
you evaluate a complex condition for each pair of records. So cross can be 
easily too expensive to compute.
For such use cases it is usually better to apply a coarse-grained spatial 
partitioning and do a key-based join on the partitions. Within each partition 
you'd perform a cross.
Best, Fabian


2017-02-21 18:34 GMT+01:00 Gwenhael Pasquiers 
<gwenhael.pasqui...@ericsson.com<mailto:gwenhael.pasqui...@ericsson.com>>:
Hi,

I need (or at least I think I do) to do a cross operation between two huge 
datasets. One dataset is a list of points. The other one is a list of shapes 
(areas).

I want to know, for each point, the areas (they might overlap so a point can be 
in multiple areas) it belongs to so I thought I’d “cross” my points and areas 
since I need to test each point against each area.

I tried it and my job stucks seems to work for some seconds then, at some 
point, it stucks.

I’m wondering if Flink, for cross operations, tries to load one of the two 
datasets into RAM or if it’s able to split the job in multiple iterations (even 
if it means reading one of the two datasets multiple times).

Or maybe I’m going at it the wrong way, or missing some parameters, feel free 
to correct me ☺

I’m using flink 1.0.1.

Thanks in advance

Gwen’


​

Reply via email to