Current design:

Window Data Manager - Stores the Kafka partitions offsets.
Kafka Key - Used by the operator = AppID#OperatorId

During recovery. Partially written window is re-created using the following
 approach:

Tuples between the largest recovery offsets and the current offset are
checked. Based on the key, tuples written by the other entities are
discarded.

Only tuples which are not in the recovered set are emitted.

Here is the first cut of the design
https://github.com/apache/incubator-apex-malhar/pull/298

Please give your feedbacks on the design.

@Bright,
Recovery data needs to be present in the Key, to distinguish the tuple
coming from the different instances of the output operator or external
applications.

Thanks

On Fri, May 13, 2016 at 2:14 PM Bright Chen <bri...@datatorrent.com> wrote:

> Hi Sandesh,
> I think it’s maybe better to keep it into Jira.
>
> Do you mean keep the key in other Kafka topic or the key is in fact the
> key of Kafka Message which represent user tuple?
> If it  is separate key, how to keep the relation between key and value?
> If Key is the key of Kafka message, basically, it will change the expected
> data. As I understand, the key here is just used for recovery, it’s not the
> data user required. And the data which write to the Kafka probably need to
> be decided by the customer logic.
>
> Think about a customer build two applications with our operator, the first
> application write data to Kafka, the second one read data from Kafka. And
> at the very beginning, the first application was implemented by a
> none-exactly once output operator, and then changed to exactly once
> operator. I think the customer don’t expect to change the second
> application. But the second application has to be changed if it’s logic
> depended on key.
>
> thanks
> Bright
>
> > On May 13, 2016, at 12:37 PM, Sandesh Hegde <sand...@datatorrent.com>
> wrote:
> >
> > Hi All,
> >
> > I am working on Kafka 0.9 output operator and one of the requirement is
> to
> > implement Exactly Once Output operator. Here is the one possible idea,
> > please give your feedback or suggest new design.
> >
> >
> -------------------------------------------------------------------------------------------------------------------------
> >
> > Use *Key* to store meta information which is used during recovery.
> >
> > Operator users will use *Value* to store their key-value pair and
> implement
> > the Kafka partitioning accordingly.
> >
> > Format of the *Key* is as specified below:
> >
> >
> >
> > Key = 1. OperatorName#ApexPartitionId#WindowId#Message#MessageId ( During
> > message write )
> >
> >         2. OperatorName#ApexPartitionId#WindowId#CheckPoint ( During end
> > Window )
> >
> > During End window, checkpoint marker is written to all the Kafka
> partitions
> > of the topic.
> >
> > Every message is given a message id, counter-reset every window, and then
> > written to Kafka.
> >
> > During recovery, Kafka partitions are read until the last checkpoint
> > message from this operator is reached and the partially written window is
> > constructed.
> >
> >
> --------------------------------------------------------------------------------
> >
> > Note: Existing Kafka exactly once operator, ( Kafka 0.8 ) also needs to
> be
> > re-written.
> >
> > Thanks
>
>

Reply via email to