[GitHub] samza pull request #208: SAMZA-1264: Make Operator Functions Closable

2017-05-30 Thread vjagadish1989
GitHub user vjagadish1989 opened a pull request:

https://github.com/apache/samza/pull/208

SAMZA-1264: Make Operator Functions Closable

- Added `close()` to the lifecycle of `OperatorImpl`s, and all `Function`s.
- Added unit tests to verify calls to `close()`

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/vjagadish1989/samza operator_functions

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/samza/pull/208.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #208


commit 1527f19b0c0c02cbbe9f20d663f27ee7255b8217
Author: vjagadish1989 
Date:   2017-05-31T04:09:29Z

SAMZA-1264: Make Operator Functions Closable




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] samza pull request #207: SAMZA-1312: Add Control Messages and Intermediate S...

2017-05-30 Thread xinyuiscool
GitHub user xinyuiscool opened a pull request:

https://github.com/apache/samza/pull/207

SAMZA-1312: Add Control Messages and Intermediate Stream Serde

In this patch, we add the control message types which includes:
* EndOfStreamMessage
* WatermarkMessage

To support in-band data and control messages, we provide a wrapper serde 
(IntermediateMessageSerde) to serialize/deserialize data/control messages based 
on message type byte (first byte in the intermediate stream message). The 
format of the message is defined in SAMZA-1312. The patch integrates this serde 
with SerdeManager.

Tested in example jobs deployed locally and works as expected.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/xinyuiscool/samza SAMZA-1312

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/samza/pull/207.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #207


commit f0bb67f26204fc5698f5d9b5fbdac8b90c70fabe
Author: Xinyu Liu 
Date:   2017-05-26T22:01:05Z

Add control message types and serde

commit 2d79ed7a8a6f4698904232899ef485c4f3a1e627
Author: Xinyu Liu 
Date:   2017-05-30T22:41:12Z

Clean up the ControlMessage class and some enhancements

commit f0e32d517942fb119253e2d4f06fc921c34ec0ba
Author: Xinyu Liu 
Date:   2017-05-31T00:52:49Z

Merge branch 'master' into SAMZA-1312




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


Re: [DISCUSS] SEP-6: Support Watermark Across Intermediate Streams for Batch Processing

2017-05-30 Thread xinyu liu
@Chris: thanks a lot for providing the definitions. The first equation is
exactly what I want to say about the watermark reconciliation. I haven't
got to the second equation yet. Will probably think it through once I get
there.

@Yi: I updated the SEP-6 based on your feedback. Some replies to your
questions are below:

>> the proposal is for all types of control messages, not just for
end-of-stream, right? Better to define the scope and layout the comment
requirements of control message delivery.

Right, the proposal is to support the general control messages. I added
more content in the problem description about watermarks and also listed
supporting watermark propagation as once of the goals.

>> in step-3, how does the consumer of intermediate streams know how many
EOS messages should be received? And we should make it clear that it should
be EOS / producer and the count of the downstream consumer is counting on
the number of unique EOS from all producers from the upstream.

The control message itself currently contains the count of upstream
producers (tasks). Chris suggested earlier if each processor has a global
view of the each job models, then we can remove the count field. Right now
we are using this field to keep track of the total count. I also updated
the description of this part.

>> In comparison table, “checkpoint the control messages received” ==> is it
referring to the partially accumulated upstream EOS messages?

Yes, that's correct. We will check point all the upstream tasks that has
reached end-of-stream for a streamId.

>> Please make a clear definition on “Watermark” and “EndOfStream”. Why are
they different? Are they both control messages that requires the same
delivery pattern (i.e. broadcast to downstream, reconcile at the consumer)?
If yes, should we make the “watermark” vs “EndOfStream” a sub-category in
control message?
They are different: watermark contains a timestamp from the producer task,
while EndOfStream message indicates the producer task has completely
processed a stream. They both are control messages which require same
delivery pattern. I updated the SEP to make it clearer they are
sub-category of control message.

>> As for the serde for intermediate stream, I assume that we will need an
envelope serde that is avro to wrap the user message and control message
in? So, user-defined serde now only applies to the “UserMessage”? And
what’s the message key in the message format?
The serde wrapper for the message is customized: the first byte indicates
the message type, and the following byte array is the actual message. For
user message, we will apply user provided serde. For control message, we
will use JSON. The key is the same. We do not need customized serde since
we can infer the serde from message.

>> A big question regarding to the watermark propagation: “When Samza
receives watermark messages, it will emit a watermark with the earliest
event time across all the stream partitions. No emission if the earliest
event time doesn’t change.” Does the watermark propagation requires
synchronization/coordination between all producers at the source? Say, if
the task taking one input source emits watermark at 1min interval and the
task taking another input source emits watermark at 5min interval, how does
the downstream consumer reconcile the watermarks?

Watermark propagation does not require synchronization. Chris's equations
are very accurate about how the calculations work. Please take a look.

>> In the checkpoint message format, it seems that it is only design for
watermark messages? Any streamId info that EoS is carrying over?

Sorry, I forgot to add the Eos checkpoint there. I updated the SEP for it.
Now the EOS checkpoint has the streamId.

Thanks,
Xinyu

On Tue, May 30, 2017 at 11:03 AM, Chris Pettitt <
cpett...@linkedin.com.invalid> wrote:

> FWIW, there is a Beam presentation that has a very crisp set of rules
> around watermarks. From memory it boils down to something like:
>
> InputWatermark(stage) = min { OutputWatermark(stage') for stage' in
> Upstream(stage) }
> OutputWatermark(stage) = min { InputWatermark(stage), OldestWork(stage) }
>
> OldestWork(stage) is the oldest message that has been received by the stage
> but not yet processed.
>
> - Chris
>
> On Tue, May 30, 2017 at 1:39 PM, Yi Pan  wrote:
>
> > Hi, Xinyu,
> >
> > Thanks for the proposal. I took a quick pass and had the following
> > questions/comments:
> >
> > - message shuffling ==> data shuffling???
> >
> > - the proposal is for all types of control messages, not just for
> > end-of-stream, right? Better to define the scope and layout the comment
> > requirements of control message delivery.
> >
> > - dropped option should go to “Rejected alternatives”
> >
> > - “Samza finds out the following intermediate streams that all the inputs
> > have been end-of-stream” what does it mean? The task consuming the input
> > stream(s) reconcile all EoS from all input stream partitions and then
> > propagate Eo

Re: [DISCUSS] SEP-6: Support Watermark Across Intermediate Streams for Batch Processing

2017-05-30 Thread Chris Pettitt
FWIW, there is a Beam presentation that has a very crisp set of rules
around watermarks. From memory it boils down to something like:

InputWatermark(stage) = min { OutputWatermark(stage') for stage' in
Upstream(stage) }
OutputWatermark(stage) = min { InputWatermark(stage), OldestWork(stage) }

OldestWork(stage) is the oldest message that has been received by the stage
but not yet processed.

- Chris

On Tue, May 30, 2017 at 1:39 PM, Yi Pan  wrote:

> Hi, Xinyu,
>
> Thanks for the proposal. I took a quick pass and had the following
> questions/comments:
>
> - message shuffling ==> data shuffling???
>
> - the proposal is for all types of control messages, not just for
> end-of-stream, right? Better to define the scope and layout the comment
> requirements of control message delivery.
>
> - dropped option should go to “Rejected alternatives”
>
> - “Samza finds out the following intermediate streams that all the inputs
> have been end-of-stream” what does it mean? The task consuming the input
> stream(s) reconcile all EoS from all input stream partitions and then
> propagate EoS messages to all partitions in intermediate streams? This is
> not super clear to me.
>
> - in step-3, how does the consumer of intermediate streams know how many
> EOS messages should be received? And we should make it clear that it should
> be EOS / producer and the count of the downstream consumer is counting on
> the number of unique EOS from all producers from the upstream.
>
> - In comparison table, “checkpoint the control messages received” ==> is it
> referring to the partially accumulated upstream EOS messages?
>
> - Please make a clear definition on “Watermark” and “EndOfStream”. Why are
> they different? Are they both control messages that requires the same
> delivery pattern (i.e. broadcast to downstream, reconcile at the consumer)?
> If yes, should we make the “watermark” vs “EndOfStream” a sub-category in
> control message?
>
> - As for the serde for intermediate stream, I assume that we will need an
> envelope serde that is avro to wrap the user message and control message
> in? So, user-defined serde now only applies to the “UserMessage”? And
> what’s the message key in the message format?
>
> - A big question regarding to the watermark propagation: “When Samza
> receives watermark messages, it will emit a watermark with the earliest
> event time across all the stream partitions. No emission if the earliest
> event time doesn’t change.” Does the watermark propagation requires
> synchronization/coordination between all producers at the source? Say, if
> the task taking one input source emits watermark at 1min interval and the
> task taking another input source emits watermark at 5min interval, how does
> the downstream consumer reconcile the watermarks?
>
> - In the checkpoint message format, it seems that it is only design for
> watermark messages? Any streamId info that EoS is carrying over?
>
>
> Thanks a lot!
>
>
> -Yi
>
> On Tue, May 30, 2017 at 9:46 AM, xinyu liu  wrote:
>
> > Makes sense. I noticed that too and I dropped the ControlMessage type in
> my
> > code. I also moved taskName, taskCount to the parent ControlMessage
> class.
> > Just updated the SEP-6. Please take a look again.
> >
> > Thanks,
> > Xinyu
> >
> > On Tue, May 30, 2017 at 9:12 AM, Chris Pettitt <
> > cpett...@linkedin.com.invalid> wrote:
> >
> > > MessageType and ControlMessage.Type look redundant. You could either
> use
> > > "ControlMessage" as the type in MessageType or drop
> ControlMessage.Type.
> > >
> > > On Fri, May 26, 2017 at 5:14 PM, xinyu liu 
> > wrote:
> > >
> > > > Thanks a lot for the comments. I updated the SEP with more details
> and
> > > > clarification. Please let me know if you have further questions.
> > > >
> > > > Thanks,
> > > > Xinyu
> > > >
> > > > On Thu, May 25, 2017 at 11:19 AM, Prateek Maheshwari <
> > > > pmaheshw...@linkedin.com.invalid> wrote:
> > > >
> > > > > Hi Xinyu,
> > > > >
> > > > > Thanks for the proposal. Some requests for clarifications. Let's
> > update
> > > > the
> > > > > SEP directly instead of replying here.
> > > > >
> > > > > E.g., in "For any following intermediate stream whose input streams
> > are
> > > > all
> > > > > end-of-stream, it will be marked as pending EOS" - Should clarify
> > that
> > > > > (IIUC) something is injecting EOS messages in all intermediate
> stream
> > > > > partitions once it receives EOS from all input stream partitions
> it's
> > > > > consuming. Should also clarify what is that something.
> > > > > Same for "declare end of stream once all the EOS messages have been
> > > > > received." - What does this declaration involve and who is doing
> > this?
> > > > >
> > > > > In pro for approach 2: Not clear what this means - "The watermark
> can
> > > > > conclude the input messages before this watermark have been
> > complete."
> > > > >
> > > > > For the cons of approach 2: "Complicated failure scenario of the
> > second
> > > > > job. It needs to checkpoint all th

Re: [DISCUSS] SEP-6: Support Watermark Across Intermediate Streams for Batch Processing

2017-05-30 Thread Yi Pan
Hi, Xinyu,

Thanks for the proposal. I took a quick pass and had the following
questions/comments:

- message shuffling ==> data shuffling???

- the proposal is for all types of control messages, not just for
end-of-stream, right? Better to define the scope and layout the comment
requirements of control message delivery.

- dropped option should go to “Rejected alternatives”

- “Samza finds out the following intermediate streams that all the inputs
have been end-of-stream” what does it mean? The task consuming the input
stream(s) reconcile all EoS from all input stream partitions and then
propagate EoS messages to all partitions in intermediate streams? This is
not super clear to me.

- in step-3, how does the consumer of intermediate streams know how many
EOS messages should be received? And we should make it clear that it should
be EOS / producer and the count of the downstream consumer is counting on
the number of unique EOS from all producers from the upstream.

- In comparison table, “checkpoint the control messages received” ==> is it
referring to the partially accumulated upstream EOS messages?

- Please make a clear definition on “Watermark” and “EndOfStream”. Why are
they different? Are they both control messages that requires the same
delivery pattern (i.e. broadcast to downstream, reconcile at the consumer)?
If yes, should we make the “watermark” vs “EndOfStream” a sub-category in
control message?

- As for the serde for intermediate stream, I assume that we will need an
envelope serde that is avro to wrap the user message and control message
in? So, user-defined serde now only applies to the “UserMessage”? And
what’s the message key in the message format?

- A big question regarding to the watermark propagation: “When Samza
receives watermark messages, it will emit a watermark with the earliest
event time across all the stream partitions. No emission if the earliest
event time doesn’t change.” Does the watermark propagation requires
synchronization/coordination between all producers at the source? Say, if
the task taking one input source emits watermark at 1min interval and the
task taking another input source emits watermark at 5min interval, how does
the downstream consumer reconcile the watermarks?

- In the checkpoint message format, it seems that it is only design for
watermark messages? Any streamId info that EoS is carrying over?


Thanks a lot!


-Yi

On Tue, May 30, 2017 at 9:46 AM, xinyu liu  wrote:

> Makes sense. I noticed that too and I dropped the ControlMessage type in my
> code. I also moved taskName, taskCount to the parent ControlMessage class.
> Just updated the SEP-6. Please take a look again.
>
> Thanks,
> Xinyu
>
> On Tue, May 30, 2017 at 9:12 AM, Chris Pettitt <
> cpett...@linkedin.com.invalid> wrote:
>
> > MessageType and ControlMessage.Type look redundant. You could either use
> > "ControlMessage" as the type in MessageType or drop ControlMessage.Type.
> >
> > On Fri, May 26, 2017 at 5:14 PM, xinyu liu 
> wrote:
> >
> > > Thanks a lot for the comments. I updated the SEP with more details and
> > > clarification. Please let me know if you have further questions.
> > >
> > > Thanks,
> > > Xinyu
> > >
> > > On Thu, May 25, 2017 at 11:19 AM, Prateek Maheshwari <
> > > pmaheshw...@linkedin.com.invalid> wrote:
> > >
> > > > Hi Xinyu,
> > > >
> > > > Thanks for the proposal. Some requests for clarifications. Let's
> update
> > > the
> > > > SEP directly instead of replying here.
> > > >
> > > > E.g., in "For any following intermediate stream whose input streams
> are
> > > all
> > > > end-of-stream, it will be marked as pending EOS" - Should clarify
> that
> > > > (IIUC) something is injecting EOS messages in all intermediate stream
> > > > partitions once it receives EOS from all input stream partitions it's
> > > > consuming. Should also clarify what is that something.
> > > > Same for "declare end of stream once all the EOS messages have been
> > > > received." - What does this declaration involve and who is doing
> this?
> > > >
> > > > In pro for approach 2: Not clear what this means - "The watermark can
> > > > conclude the input messages before this watermark have been
> complete."
> > > >
> > > > For the cons of approach 2: "Complicated failure scenario of the
> second
> > > > job. It needs to checkpoint all the watermark messages received, so
> > when
> > > it
> > > > recovered from failure, it can still count." - How is this related to
> > > EOS?
> > > > How is this related to the checkpoint watermark section?
> > > > Also, what is the "more messages required to write.. " referring to?
> > > >
> > > > "Samza needs to reconcile based on the task counts." - Please explain
> > > what
> > > > reconciliation means, why it needs to happen, and why we need to
> track
> > > the
> > > > producer task and total task count in the watermark message to do
> this.
> > > >
> > > > Checkpoint watermarks section is also unclear. What problem are we
> > trying
> > > > to solve here?
>

Re: [DISCUSS] SEP-6: Support Watermark Across Intermediate Streams for Batch Processing

2017-05-30 Thread xinyu liu
Makes sense. I noticed that too and I dropped the ControlMessage type in my
code. I also moved taskName, taskCount to the parent ControlMessage class.
Just updated the SEP-6. Please take a look again.

Thanks,
Xinyu

On Tue, May 30, 2017 at 9:12 AM, Chris Pettitt <
cpett...@linkedin.com.invalid> wrote:

> MessageType and ControlMessage.Type look redundant. You could either use
> "ControlMessage" as the type in MessageType or drop ControlMessage.Type.
>
> On Fri, May 26, 2017 at 5:14 PM, xinyu liu  wrote:
>
> > Thanks a lot for the comments. I updated the SEP with more details and
> > clarification. Please let me know if you have further questions.
> >
> > Thanks,
> > Xinyu
> >
> > On Thu, May 25, 2017 at 11:19 AM, Prateek Maheshwari <
> > pmaheshw...@linkedin.com.invalid> wrote:
> >
> > > Hi Xinyu,
> > >
> > > Thanks for the proposal. Some requests for clarifications. Let's update
> > the
> > > SEP directly instead of replying here.
> > >
> > > E.g., in "For any following intermediate stream whose input streams are
> > all
> > > end-of-stream, it will be marked as pending EOS" - Should clarify that
> > > (IIUC) something is injecting EOS messages in all intermediate stream
> > > partitions once it receives EOS from all input stream partitions it's
> > > consuming. Should also clarify what is that something.
> > > Same for "declare end of stream once all the EOS messages have been
> > > received." - What does this declaration involve and who is doing this?
> > >
> > > In pro for approach 2: Not clear what this means - "The watermark can
> > > conclude the input messages before this watermark have been complete."
> > >
> > > For the cons of approach 2: "Complicated failure scenario of the second
> > > job. It needs to checkpoint all the watermark messages received, so
> when
> > it
> > > recovered from failure, it can still count." - How is this related to
> > EOS?
> > > How is this related to the checkpoint watermark section?
> > > Also, what is the "more messages required to write.. " referring to?
> > >
> > > "Samza needs to reconcile based on the task counts." - Please explain
> > what
> > > reconciliation means, why it needs to happen, and why we need to track
> > the
> > > producer task and total task count in the watermark message to do this.
> > >
> > > Checkpoint watermarks section is also unclear. What problem are we
> trying
> > > to solve here?
> > >
> > > Should also move the message format and the watermark message interface
> > > sections to the bottom, since they depend on details in the event time
> > and
> > > checkpoint watermark sections.
> > >
> > > Thanks,
> > > Prateek
> > >
> > >
> > > On Wed, May 24, 2017 at 11:30 AM, xinyu liu 
> > wrote:
> > >
> > > > Hi all,
> > > >
> > > > I created SEP-6 for SAMZA-1260
> > > > : Support
> Watermark
> > > > Across Intermediate Streams for Batch Processing. The link to the SEP
> > is
> > > > here:
> > > >
> > > > https://cwiki.apache.org/confluence/display/SAMZA/SEP-
> > > > 6+Support+Watermark+Across+Intermediate+Streams+for+Batch+Processing
> > > >
> > > > Please review and comments are welcome!
> > > >
> > > > Thanks,
> > > > Xinyu
> > > >
> > >
> >
>


Re: [DISCUSS] SEP-6: Support Watermark Across Intermediate Streams for Batch Processing

2017-05-30 Thread Chris Pettitt
MessageType and ControlMessage.Type look redundant. You could either use
"ControlMessage" as the type in MessageType or drop ControlMessage.Type.

On Fri, May 26, 2017 at 5:14 PM, xinyu liu  wrote:

> Thanks a lot for the comments. I updated the SEP with more details and
> clarification. Please let me know if you have further questions.
>
> Thanks,
> Xinyu
>
> On Thu, May 25, 2017 at 11:19 AM, Prateek Maheshwari <
> pmaheshw...@linkedin.com.invalid> wrote:
>
> > Hi Xinyu,
> >
> > Thanks for the proposal. Some requests for clarifications. Let's update
> the
> > SEP directly instead of replying here.
> >
> > E.g., in "For any following intermediate stream whose input streams are
> all
> > end-of-stream, it will be marked as pending EOS" - Should clarify that
> > (IIUC) something is injecting EOS messages in all intermediate stream
> > partitions once it receives EOS from all input stream partitions it's
> > consuming. Should also clarify what is that something.
> > Same for "declare end of stream once all the EOS messages have been
> > received." - What does this declaration involve and who is doing this?
> >
> > In pro for approach 2: Not clear what this means - "The watermark can
> > conclude the input messages before this watermark have been complete."
> >
> > For the cons of approach 2: "Complicated failure scenario of the second
> > job. It needs to checkpoint all the watermark messages received, so when
> it
> > recovered from failure, it can still count." - How is this related to
> EOS?
> > How is this related to the checkpoint watermark section?
> > Also, what is the "more messages required to write.. " referring to?
> >
> > "Samza needs to reconcile based on the task counts." - Please explain
> what
> > reconciliation means, why it needs to happen, and why we need to track
> the
> > producer task and total task count in the watermark message to do this.
> >
> > Checkpoint watermarks section is also unclear. What problem are we trying
> > to solve here?
> >
> > Should also move the message format and the watermark message interface
> > sections to the bottom, since they depend on details in the event time
> and
> > checkpoint watermark sections.
> >
> > Thanks,
> > Prateek
> >
> >
> > On Wed, May 24, 2017 at 11:30 AM, xinyu liu 
> wrote:
> >
> > > Hi all,
> > >
> > > I created SEP-6 for SAMZA-1260
> > > : Support Watermark
> > > Across Intermediate Streams for Batch Processing. The link to the SEP
> is
> > > here:
> > >
> > > https://cwiki.apache.org/confluence/display/SAMZA/SEP-
> > > 6+Support+Watermark+Across+Intermediate+Streams+for+Batch+Processing
> > >
> > > Please review and comments are welcome!
> > >
> > > Thanks,
> > > Xinyu
> > >
> >
>