Hi Gyula,

I understand more now how this thing might work and its fascinating.
Although I still have one question with the coflatmap function.

First, let me explain what I understand and whether its correct or not: 
1. The connected iterative stream ensures that the coflatmap function
receive the points and the centroids which are broadcasted on each iteration
defined by closewith.

2. So in the coflatmap function, on one map I get the points and on the
other map function i get the centroids which are broadcasted.

Now comes the part I am assuming a bit because I dont understand from the
theory.
3. Assuming I can use the broadcasted centroids, I calculate the nearest
centroid from the streamed point and I update the centroid and only use one
of the collectors to return the updated centroids list back.


The question here is, I am assuming that this operation is not done in
parallel as if streams are sent in parallel how would I ensure correct
update of the centroids as multiple points can try to update the same
centroid in parallel .

I hope I made myself clear with this.

Thanks and Regards
Biplob
Biplob Biswas wrote
> Hi Gyula,
> 
> I read your workaround and started reading about flink iterations,
> coflatmap operators and other things. Now, I do understand a few things
> but the solution you provided is not completely clear to me.
> 
> I understand the following things from your post.
> 1. You initially have a datastream of points, on which you iterate and the
> 'withFeedbackType' defines the type of the connected stream so rather than
> "Points" the type is  "Centroids" now.
> 
> 2.On this connected stream (which I understand, only have the streamed
> points right now), you run a flat map operator. And you mention 
/
> "MyCoFlatmap would be a CoFlatMapFunction which on 1 input receive events
> and update its local centroids (and periodically output the centroids) and
> on the other input would send centroids of other flatmaps and would merge
> them to the local."
/
> I dont understand this part completely, if i am not wrong, you are saying
> that the co flatmap function would have 2 map functions. Now i dont
> understand this part .. as to what specifically am i doing in each map
> function?
> 
> 3. lastly, the updated centroids which came back from the coflatmap
> function is fed back to the stream again and this is the part i get lost
> again ... how is this centroid fed back and if this is fed back what
> happens to the point stream? and if it does somehow is fed back, how do i
> catch it in the coflatmap function? 
> 
> 
> If I understand this a bit, then in your code the first set of centroids
> are created in the coflatmap function and you dont already have a list of
> centroids to start with? Am i assuming it correct?
> 
> I underwent the process of iteration in the Kmeans example from this
> following link:
> https://github.com/stratosphere/stratosphere/blob/master/stratosphere-examples/stratosphere-java-examples/src/main/java/eu/stratosphere/example/java/clustering/KMeans.java
> 
> and I understand how this is working .. but i am stil not clear how ur
> example is working. 
> 
> Could you please explain it a bit more? with some examples maybe?
> 
> Thanks a lot.
> Gyula Fóra-2 wrote
>> Hi Biplob,
>> 
>> I have implemented a similar algorithm as Aljoscha mentioned.
>> 
>> First things to clarify are the following:
>> There is currently no abstraction for keeping objects (in you case
>> centroids) in a centralized way that can be updated/read by all
>> operators.
>> This would probably be very costly and is actually not necessary in your
>> case.
>> 
>> Broadcast a stream in contrast with other partitioning methods mean that
>> the events will be replicated to all downstream operators. This not a
>> magical operator that will make state available among parallel instances.
>> 
>> Now let me explain what I think you want from Flink and how to do it :)
>> 
>> You have input data stream and a set of centroids to be updated based on
>> the incoming records. As you want to do this in parallel you have an
>> operator (let's say a flatmap) that keeps the centroids locally and
>> updates
>> it on it's inputs. Now you have a set of independently updated centroids,
>> so you want to merge them and update the centroids in each flatmap.
>> 
>> Let's see how to do this. Given that you have your centroids locally,
>> updating them is super easy, so I will not talk about that. The
>> problematic
>> part is periodically merging end "broadcasting" the centroids so all the
>> flatmaps eventually see the same (they don't have to always be the same
>> for
>> clustering probably). There is no operator for sending state (centroids)
>> between subtasks so you have to be clever here. We can actually use
>> cyclic
>> streams to solve this problem by sending the centroids as simple events
>> to
>> a CoFlatMap:
>> 
>> DataStream
>> <Point>
>>  input = ...
>> ConnectedIterativeStreams&lt;Point, Centroids&gt; inputsAndCentroids =
>> input.iterate().withFeedbackType(Centroids.class)
>> DataStream
>> <Centroids>
>>  updatedCentroids =
>> inputsAndCentroids.flatMap(MyCoFlatmap)
>> inputsAndCentroids.closeWith(updatedCentroids.broadcast())
>> 
>> MyCoFlatmap would be a CoFlatMapFunction which on 1 input receive events
>> and update its local centroids (and periodically output the centroids)
>> and
>> on the other input would send centroids of other flatmaps and would merge
>> them to the local.
>> 
>> This might be a lot to take in at first, so you might want to read up on
>> streaming iterations and connected streams before you start.
>> 
>> Let me know if this makes sense.
>> 
>> Cheers,
>> Gyula
>> 
>> 
>> Biplob Biswas &lt;

>> revolutionisme@

>> &gt; ezt írta (időpont: 2016. ápr. 28.,
>> Cs, 14:41):
>> 
>>> That would really be great, any example would help me proceed with my
>>> work.
>>> Thanks a lot.
>>>
>>>
>>> Aljoscha Krettek wrote
>>> > Hi Biplob,
>>> > one of our developers had a stream clustering example a while back. It
>>> was
>>> > using a broadcast feedback edge with a co-operator to update the
>>> > centroids.
>>> > I'll directly include him in the email so that he will notice and can
>>> send
>>> > you the example.
>>> >
>>> > Cheers,
>>> > Aljoscha
>>> >
>>> > On Thu, 28 Apr 2016 at 13:57 Biplob Biswas &lt;
>>>
>>> > revolutionisme@
>>>
>>> > &gt; wrote:
>>> >
>>> >> I am pretty new to flink systems, thus can anyone atleast give me an
>>> >> example
>>> >> of how datastream.broadcast() method works? From the documentation i
>>> get
>>> >> the
>>> >> following:
>>> >>
>>> >> broadcast()
>>> >> Sets the partitioning of the DataStream so that the output elements
>>> are
>>> >> broadcasted to every parallel instance of the next operation.
>>> >>
>>> >> If the output elements are broadcasted, then how are they retrieved?
>>> Or
>>> >> maybe I am looking at this method in a completely wrong way?
>>> >>
>>> >> Thanks
>>> >> Biplob Biswas
>>> >>
>>> >>
>>> >>
>>> >> --
>>> >> View this message in context:
>>> >>
>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Regarding-Broadcast-of-datasets-in-streaming-context-tp6456p6543.html
>>> >> Sent from the Apache Flink User Mailing List archive. mailing list
>>> >> archive
>>> >> at Nabble.com.
>>> >>
>>>
>>>
>>>
>>>
>>>
>>> --
>>> View this message in context:
>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Regarding-Broadcast-of-datasets-in-streaming-context-tp6456p6548.html
>>> Sent from the Apache Flink User Mailing List archive. mailing list
>>> archive
>>> at Nabble.com.
>>>





--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Regarding-Broadcast-of-datasets-in-streaming-context-tp6456p6612.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.

Reply via email to