Thanks Fabian for the explanation. Let me do some more reading so what you said 
can sync-in little more.

From: Fabian Hueske <fhue...@gmail.com>
Date: Monday, February 4, 2019 at 10:22 AM
To: "Aggarwal, Ajay" <ajay.aggar...@netapp.com>
Cc: Congxian Qiu <qcx978132...@gmail.com>, "user@flink.apache.org" 
<user@flink.apache.org>
Subject: Re: Reverse of KeyBy

Hi,

Subpartitions are just a logical concept. When you keyBy a stream, the next 
operator will be applied in a keyed context. After that, the data might still 
be partitioned, but the keyed context is gone.
Is this what you mean by automatic "joining of partitioned sub-streams"?

With the program that you shared before, the following happens:

(1) The records are partitioned on the LargeMessageId, i.e., all records with 
the same LargeMessageId are sent to the same task.
(2) The task collects all fragements of a large message in keyed state. The 
state is always scoped to the key (LargeMessageId). Once it collected all 
fragments, it emits a complete message.
(3) The completed messages are partitioned on MyKey, i.e., all messages with 
the same MyKey are sent to the same task.
(4) A function can collect and sort the messages to process them in order.

Since you shuffle the records twice you cannot (in general) expect the records 
to be still in order.

Best, Fabian


Am Mo., 4. Feb. 2019 um 16:08 Uhr schrieb Aggarwal, Ajay 
<ajay.aggar...@netapp.com<mailto:ajay.aggar...@netapp.com>>:
Yes, LargeMessageId is globally unique, so I shouldn’t need composite key.

So both of you are suggesting I do the following

InputStream
  (1)   .keyBy (LargeMessageId)
  (2)   .flatMap(new MyReassemblyFunction())
  (3)   .keyBy(MyKey)
  (4)   .???

Let me explain my doubt (perhaps due to lack of understanding). By the way I am 
expecting to run this job with parallelism > 1. My understanding of above is as 
below:

First operator (1): First KeyBy (LargeMessageId) will partition the input 
stream by LargeMessageId. Right here messages with same MyKey value will be 
spread across these partitions. Is it not a problem already?
Second operator (2) : run flatMap(new MyReassemblyFunction()) on these 
partitions. Here each one will produce exactly one LargeMessage.
Third operator (3):  At this point I don’t understand the point of second 
KeyBy(MyKey)? My understanding is that this will further partition the already 
partitioned input stream (from 1 above) and will not help me, as I need to 
process all LargeMessages for a given MyKey in order.

Is there an implicit assumption here that the flatMap operation (2) above will 
automatically join the partitioned sub-streams from first KeyBy into a single 
stream?

Ajay


From: Fabian Hueske <fhue...@gmail.com<mailto:fhue...@gmail.com>>
Date: Monday, February 4, 2019 at 9:17 AM
To: "Aggarwal, Ajay" <ajay.aggar...@netapp.com<mailto:ajay.aggar...@netapp.com>>
Cc: Congxian Qiu <qcx978132...@gmail.com<mailto:qcx978132...@gmail.com>>, 
"user@flink.apache.org<mailto:user@flink.apache.org>" 
<user@flink.apache.org<mailto:user@flink.apache.org>>
Subject: Re: Reverse of KeyBy

Hi,

Calling keyBy twice will not work, because the second call overrides the first.
You can keyBy on a composite key (MyKey, LargeMessageId).

You can do the following

InputStream
  .keyBy ( (MyKey, LargeMessageId) ) \\ use composite key
  .flatMap(new MyReassemblyFunction())
  .keyBy(MyKey)
  .???

If LargeMessageId is unique across MyKey (there are not two large messages with 
the same LargeMessageId and different MyKey values), you don't need a composite 
key but can use keyBy(LargeMessageId).

Best, Fabian


Am Mo., 4. Feb. 2019 um 15:05 Uhr schrieb Aggarwal, Ajay 
<ajay.aggar...@netapp.com<mailto:ajay.aggar...@netapp.com>>:
Thank you for your suggestion. But per my understanding if I KeyBy 
(LargeMessageId) first then I can’t guarantee order of LargeMessages per MyKey. 
Because MyKey messages will get spread over multiple partitions by 
LargeMessageId. Am I correct?


From: Congxian Qiu <qcx978132...@gmail.com<mailto:qcx978132...@gmail.com>>
Date: Sunday, February 3, 2019 at 6:40 AM
To: "Aggarwal, Ajay" <ajay.aggar...@netapp.com<mailto:ajay.aggar...@netapp.com>>
Cc: "user@flink.apache.org<mailto:user@flink.apache.org>" 
<user@flink.apache.org<mailto:user@flink.apache.org>>
Subject: Re: Reverse of KeyBy
Hi Aggarwal
   How about keyBy(LargeMessageID) first, then assemble these fragments back 
into LargeMessages, then keyBy(MeyKey)?

Best,
Congxian


Aggarwal, Ajay <ajay.aggar...@netapp.com<mailto:ajay.aggar...@netapp.com>> 
于2019年2月2日周六 上午5:42写道:

I am new to Flink. I am trying to figure out if there is an operator that 
provides reverse functionality of KeyBy.  Using KeyBy you can split a stream 
into disjoint partitions. Is there a way to bring those partitions back into a 
single stream?



Let me explain using my use case below.



My Input stream contains messages with following information

{

    MyKey

    LargeMessageId

    LargeMessageFragment

    LargeMessageTimestamp // yes same timestamp repeated with each fragment



    (… there are other fields, but I am leaving them out as they are not 
important for this discussion)

}





My LargeMessage is fragmented at source into fragments. I have 2 main 
requirements

  1.  Reassemble these fragments back into LargeMessages
  2.  For each MyKey value, process the LargeMessages in the order based on 
time associated with them.





So I am thinking



InputStream

  .KeyBy (MyKey)

  .KeyBy (LargeMessageId)

  .flatMap(new MyReassemblyFunction())

  . ???



At this point I need to throw all assembled LargeMessages for a given MyKey 
back into a common partition, so I can try to process them in order.  This is 
where I am stuck. Any help from the experts will be much appreciated.



Ajay

Reply via email to