Re: [DISCUSS] SEP-2: ApplicationRunner Design

2017-06-21 Thread Chris Pettitt
Now that I look at the whole list again, I see we could adequately
address #2 by first addressing #3. If instead of building aggregation
directly into the window operator we had aggregation operators like
sum then the lambas would be more intuitive. It would probably even be
nicer if the first lambda were a value instead of a lambda or if there
were such an alternative.

On Tue, Jun 20, 2017 at 12:25 PM, Prateek Maheshwari
<prateek...@gmail.com> wrote:
> 1. +1 for not requiring explicit type information unless its unavoidable.
> This one seems easy to fix, but there are other places we should address
> too (OutputStream, Window operator).
>
> We should probably discuss the Window API separately from this discussion,
> but to Chris' point:
>
> 2. Re: 2 lambdas, the first is an initial value Java 'Supplier', the second
> is a Samza 'FoldFunction'. It might be clearer to create a new class (e.g.,
> 'Aggregator') with 'getInitialValue' and 'aggregate' methods that users
> should implement instead of these 2 lambdas. They'll lose the ability to
> provide these functions as lambdas, but I think overall it'll help
> readability and understandability. We can then provide default
> implementations for this class for common aggregations: accumulation (add
> to collection), sum, max, min, average, percentiles etc. Is this what you
> meant Chris?
>
> 3. I'm also strongly in favor of making the window APIs composable so that
> we don't have so many variants and parameters. Otherwise, as we add more
> parameters for event time support and type information for serdes, it'll
> get really difficult to both discover the appropriate window variant and
> understand a window specification once written.
>
> - Prateek
>
> On Tue, Jun 20, 2017 at 10:15 AM, Chris Pettitt <
> cpett...@linkedin.com.invalid> wrote:
>
>> Feedback for PageViewCounterStreamSpecExample:
>>
>> https://github.com/nickpan47/samza/blob/new-api-v2/samza-
>> core/src/test/java/org/apache/samza/example/PageViewCounterStreamSpecExamp
>> le.java#L65:
>>
>> When we set up the input we had the message type, but it looks like we
>> are not propagating it via StreamIO.Input thus require a cast here.
>> The fix seems to be to generify StreamIO.Input.
>>
>> https://github.com/nickpan47/samza/blob/new-api-v2/samza-
>> core/src/test/java/org/apache/samza/example/PageViewCounterStreamSpecExamp
>> le.java#L67:
>>
>> The two lambdas here are not very intuitive. I'm assuming based on
>> some previous discussion that these are setting up a fold function? I
>> would suggest making this more explicit, probably with a fold function
>> type.
>>
>> https://github.com/nickpan47/samza/blob/new-api-v2/samza-
>> core/src/test/java/org/apache/samza/example/PageViewCounterStreamSpecExamp
>> le.java#L94:
>>
>> The generic type parameters for WindowPane suggest that WindowPanes
>> must be keyed. This is a reasonable assumption for
>> "keyedTumblingWindow" but might not make sense for other cases, like a
>> global combine operation. Beam separates the two ideas into: 1) a
>> typed WindowValue, basically the primitive for values propagating
>> through the graph, contains a value and a windowing information and 2)
>> KV for keyed values.
>>
>> FWIW, the larger windowing and grouping concepts are also separated in
>> Beam. It looks like this is the case with Flink as well. In examples
>> from both the user specifies the windowing first and then separately
>> specify the aggregation operation (e.g. group by key, combine, sum,
>> etc.). This saves from the combinatorial explosion of keyed / global,
>> unwindowed (batch) / session / fixed / tumbling / etc., GBK / sum /
>> count / etc.
>>
>> - Chris
>>
>> On Tue, Jun 20, 2017 at 10:46 AM, Chris Pettitt <cpett...@linkedin.com>
>> wrote:
>> > Yi,
>> >
>> > What examples should we be looking at for new-api-v2?
>> >
>> > 1. samza/samza-core/src/test/java/org/apache/samza/example/
>> PageViewCounterStreamSpecExample.java
>> >
>> > others?
>> >
>> > - Chris
>> >
>> > On Mon, Jun 19, 2017 at 5:29 PM, Yi Pan <nickpa...@gmail.com> wrote:
>> >> Hi, all,
>> >>
>> >> Here is the promised code examples for the revised API, and the related
>> >> change to how we specify serdes in the API:
>> >>
>> >> - User example for the new API chagne:
>> >> https://github.com/nickpan47/samza/tree/new-api-v2
>> >>
>> >> - Prateek’s PR for the proposed schema registry change:
>> >>

Re: [DISCUSS] SEP-2: ApplicationRunner Design

2017-06-20 Thread Chris Pettitt
Feedback for PageViewCounterStreamSpecExample:

https://github.com/nickpan47/samza/blob/new-api-v2/samza-core/src/test/java/org/apache/samza/example/PageViewCounterStreamSpecExample.java#L65:

When we set up the input we had the message type, but it looks like we
are not propagating it via StreamIO.Input thus require a cast here.
The fix seems to be to generify StreamIO.Input.

https://github.com/nickpan47/samza/blob/new-api-v2/samza-core/src/test/java/org/apache/samza/example/PageViewCounterStreamSpecExample.java#L67:

The two lambdas here are not very intuitive. I'm assuming based on
some previous discussion that these are setting up a fold function? I
would suggest making this more explicit, probably with a fold function
type.

https://github.com/nickpan47/samza/blob/new-api-v2/samza-core/src/test/java/org/apache/samza/example/PageViewCounterStreamSpecExample.java#L94:

The generic type parameters for WindowPane suggest that WindowPanes
must be keyed. This is a reasonable assumption for
"keyedTumblingWindow" but might not make sense for other cases, like a
global combine operation. Beam separates the two ideas into: 1) a
typed WindowValue, basically the primitive for values propagating
through the graph, contains a value and a windowing information and 2)
KV for keyed values.

FWIW, the larger windowing and grouping concepts are also separated in
Beam. It looks like this is the case with Flink as well. In examples
from both the user specifies the windowing first and then separately
specify the aggregation operation (e.g. group by key, combine, sum,
etc.). This saves from the combinatorial explosion of keyed / global,
unwindowed (batch) / session / fixed / tumbling / etc., GBK / sum /
count / etc.

- Chris

On Tue, Jun 20, 2017 at 10:46 AM, Chris Pettitt <cpett...@linkedin.com> wrote:
> Yi,
>
> What examples should we be looking at for new-api-v2?
>
> 1. 
> samza/samza-core/src/test/java/org/apache/samza/example/PageViewCounterStreamSpecExample.java
>
> others?
>
> - Chris
>
> On Mon, Jun 19, 2017 at 5:29 PM, Yi Pan <nickpa...@gmail.com> wrote:
>> Hi, all,
>>
>> Here is the promised code examples for the revised API, and the related
>> change to how we specify serdes in the API:
>>
>> - User example for the new API chagne:
>> https://github.com/nickpan47/samza/tree/new-api-v2
>>
>> - Prateek’s PR for the proposed schema registry change:
>> https://github.com/nickpan47/samza/pull/2/files
>>
>> Please feel free to comment and provide feedbacks!
>>
>>
>> Thanks!
>>
>>
>> -Yi
>>
>> On Tue, Jun 6, 2017 at 11:16 AM, Yi Pan <nickpa...@gmail.com> wrote:
>>
>>> Hi, all,
>>>
>>> Thanks for all the inputs! Finally I got some time to go through the
>>> discussion thread and digest most of the points made above. Here is my
>>> personal summary:
>>>
>>> Consensus on requirements:
>>>
>>>1. ApplicationRunner needs async APIs.
>>>2. ApplicationRunner can be hidden from user (except maybe in config)
>>>3. StreamApplication is the direct wrapper for the programming
>>>interface (i.e. removing StreamGraph from the user API and allow users to
>>>call input() and output() from the StreamApplication) in main()
>>>4. There has to be a serialization format of the StreamApplication
>>>itself, s.t. the tasks can just deserialize and create the user logic
>>>included in StreamApplication in multiple TaskContext.
>>>5. JobRunner seems to be a very thin layer on-top-of StreamProcessor
>>>or YarnJob, and it is always a LocalJob in a LocalApplitionRunner and a
>>>RemoteJob in a RemoteApplicationRunner. There is a desire to remove it
>>>6. StreamApplication needs to have some methods to allow user-injected
>>>global objects for the whole application, such as JmxServer,
>>>MetricsReporter, etc.
>>>
>>>
>>> Some additional discussion points:
>>>
>>>1. In StreamApplication#input()/output(), what should be the input /
>>>output parameter? The StreamSpec? Or the actual implementation I/O object
>>>to provide messages (i.e. similar to socket reader/file reader object)? 
>>> In
>>>the later case, we will need to define an abstract layer of StreamReader
>>>and StreamWriter in the user-facing API that supports read/write of
>>>partitioned streams on top of the 
>>> SystemConsumer/SystemProducer/SystemAdmin
>>>objects. Also, the number of I/O streams via the 
>>> StreamReader/StreamWriter
>>>can not be pre-determined (i.e. depending on input str

Re: [DISCUSS] SEP-2: ApplicationRunner Design

2017-06-20 Thread Chris Pettitt
Yi,

What examples should we be looking at for new-api-v2?

1. 
samza/samza-core/src/test/java/org/apache/samza/example/PageViewCounterStreamSpecExample.java

others?

- Chris

On Mon, Jun 19, 2017 at 5:29 PM, Yi Pan <nickpa...@gmail.com> wrote:
> Hi, all,
>
> Here is the promised code examples for the revised API, and the related
> change to how we specify serdes in the API:
>
> - User example for the new API chagne:
> https://github.com/nickpan47/samza/tree/new-api-v2
>
> - Prateek’s PR for the proposed schema registry change:
> https://github.com/nickpan47/samza/pull/2/files
>
> Please feel free to comment and provide feedbacks!
>
>
> Thanks!
>
>
> -Yi
>
> On Tue, Jun 6, 2017 at 11:16 AM, Yi Pan <nickpa...@gmail.com> wrote:
>
>> Hi, all,
>>
>> Thanks for all the inputs! Finally I got some time to go through the
>> discussion thread and digest most of the points made above. Here is my
>> personal summary:
>>
>> Consensus on requirements:
>>
>>1. ApplicationRunner needs async APIs.
>>2. ApplicationRunner can be hidden from user (except maybe in config)
>>3. StreamApplication is the direct wrapper for the programming
>>interface (i.e. removing StreamGraph from the user API and allow users to
>>call input() and output() from the StreamApplication) in main()
>>4. There has to be a serialization format of the StreamApplication
>>itself, s.t. the tasks can just deserialize and create the user logic
>>included in StreamApplication in multiple TaskContext.
>>5. JobRunner seems to be a very thin layer on-top-of StreamProcessor
>>or YarnJob, and it is always a LocalJob in a LocalApplitionRunner and a
>>RemoteJob in a RemoteApplicationRunner. There is a desire to remove it
>>6. StreamApplication needs to have some methods to allow user-injected
>>global objects for the whole application, such as JmxServer,
>>MetricsReporter, etc.
>>
>>
>> Some additional discussion points:
>>
>>1. In StreamApplication#input()/output(), what should be the input /
>>output parameter? The StreamSpec? Or the actual implementation I/O object
>>to provide messages (i.e. similar to socket reader/file reader object)? In
>>the later case, we will need to define an abstract layer of StreamReader
>>and StreamWriter in the user-facing API that supports read/write of
>>partitioned streams on top of the 
>> SystemConsumer/SystemProducer/SystemAdmin
>>objects. Also, the number of I/O streams via the StreamReader/StreamWriter
>>can not be pre-determined (i.e. depending on input stream partitions and
>>the groupers). Hence, I am leaning toward to expose StreamSpec in the API
>>and let user builds the StreamSpec via SpecBuilder. The actual I/O objects
>>will be instantiated when SystemConsumer/SystemProducer are instantiated,
>>with the number of physical partitions in each container.
>>2. There is a need to support task-level programs via the same launch
>>model as well.
>>
>>
>> Some ideas to implement the above requirements:
>>
>>1. StreamGraph#write() should be used internally to generate and
>>persist the serialized format of user logic. Then, StreamGraph#read()
>>should give back a deserialized version of user logic. This would implies
>>that the user functions defined in APIs are mandated to be serializable.
>>2. StreamApplication should include a SpecBuilder provides the
>>instantiation of MessageStream/Stores, which is passed to
>>StreamApplication#input() / StreamApplication#output()
>>3. StreamApplication should also include an internal ApplicationRunner
>>instance (config driven, class loaded) to be able to switch between local
>>vs remote execution
>>4. Implementation of LocalApplicationRunner should directly
>>instantiate and manage StreamProcessor instances for each job, removing 
>> the
>>LocalJobRunnner
>>5. Implementation of RemoteApplicationRunner should instantiate a
>>remote JobFactory, create the remote job and submitted it for each job,
>>removing the current JobRunner interface
>>6. We also need a StreamTaskApplication class that allows user to
>>create task-level applications, by mandate the constructor with a 
>> parameter
>>of StreamTaskFactory
>>
>>
>> One more opinion around the status and the waitForFinish():  I would think
>> that waitForFinish() is just waiting for the local Runtime to complete, not
>> to wait for the remote job to be compl

Re: [DISCUSS] SEP-6: Support Watermark Across Intermediate Streams for Batch Processing

2017-06-01 Thread Chris Pettitt
e consumer)?
>> If yes, should we make the “watermark” vs “EndOfStream” a sub-category in
>> control message?
>> They are different: watermark contains a timestamp from the producer task,
>> while EndOfStream message indicates the producer task has completely
>> processed a stream. They both are control messages which require same
>> delivery pattern. I updated the SEP to make it clearer they are
>> sub-category of control message.
>>
>> >> As for the serde for intermediate stream, I assume that we will need an
>> envelope serde that is avro to wrap the user message and control message
>> in? So, user-defined serde now only applies to the “UserMessage”? And
>> what’s the message key in the message format?
>> The serde wrapper for the message is customized: the first byte indicates
>> the message type, and the following byte array is the actual message. For
>> user message, we will apply user provided serde. For control message, we
>> will use JSON. The key is the same. We do not need customized serde since
>> we can infer the serde from message.
>>
>> >> A big question regarding to the watermark propagation: “When Samza
>> receives watermark messages, it will emit a watermark with the earliest
>> event time across all the stream partitions. No emission if the earliest
>> event time doesn’t change.” Does the watermark propagation requires
>> synchronization/coordination between all producers at the source? Say, if
>> the task taking one input source emits watermark at 1min interval and the
>> task taking another input source emits watermark at 5min interval, how does
>> the downstream consumer reconcile the watermarks?
>>
>> Watermark propagation does not require synchronization. Chris's equations
>> are very accurate about how the calculations work. Please take a look.
>>
>> >> In the checkpoint message format, it seems that it is only design for
>> watermark messages? Any streamId info that EoS is carrying over?
>>
>> Sorry, I forgot to add the Eos checkpoint there. I updated the SEP for it.
>> Now the EOS checkpoint has the streamId.
>>
>> Thanks,
>> Xinyu
>>
>> On Tue, May 30, 2017 at 11:03 AM, Chris Pettitt <
>> cpett...@linkedin.com.invalid> wrote:
>>
>> > FWIW, there is a Beam presentation that has a very crisp set of rules
>> > around watermarks. From memory it boils down to something like:
>> >
>> > InputWatermark(stage) = min { OutputWatermark(stage') for stage' in
>> > Upstream(stage) }
>> > OutputWatermark(stage) = min { InputWatermark(stage), OldestWork(stage) }
>> >
>> > OldestWork(stage) is the oldest message that has been received by the
>> stage
>> > but not yet processed.
>> >
>> > - Chris
>> >
>> > On Tue, May 30, 2017 at 1:39 PM, Yi Pan <nickpa...@gmail.com> wrote:
>> >
>> > > Hi, Xinyu,
>> > >
>> > > Thanks for the proposal. I took a quick pass and had the following
>> > > questions/comments:
>> > >
>> > > - message shuffling ==> data shuffling???
>> > >
>> > > - the proposal is for all types of control messages, not just for
>> > > end-of-stream, right? Better to define the scope and layout the comment
>> > > requirements of control message delivery.
>> > >
>> > > - dropped option should go to “Rejected alternatives”
>> > >
>> > > - “Samza finds out the following intermediate streams that all the
>> inputs
>> > > have been end-of-stream” what does it mean? The task consuming the
>> input
>> > > stream(s) reconcile all EoS from all input stream partitions and then
>> > > propagate EoS messages to all partitions in intermediate streams? This
>> is
>> > > not super clear to me.
>> > >
>> > > - in step-3, how does the consumer of intermediate streams know how
>> many
>> > > EOS messages should be received? And we should make it clear that it
>> > should
>> > > be EOS / producer and the count of the downstream consumer is counting
>> on
>> > > the number of unique EOS from all producers from the upstream.
>> > >
>> > > - In comparison table, “checkpoint the control messages received” ==>
>> is
>> > it
>> > > referring to the partially accumulated upstream EOS messages?
>> > >
>> > > - Please make a clear definition on “Watermark” and “EndOfStream”. Why
>> > are

Re: [DISCUSS] SEP-6: Support Watermark Across Intermediate Streams for Batch Processing

2017-05-30 Thread Chris Pettitt
FWIW, there is a Beam presentation that has a very crisp set of rules
around watermarks. From memory it boils down to something like:

InputWatermark(stage) = min { OutputWatermark(stage') for stage' in
Upstream(stage) }
OutputWatermark(stage) = min { InputWatermark(stage), OldestWork(stage) }

OldestWork(stage) is the oldest message that has been received by the stage
but not yet processed.

- Chris

On Tue, May 30, 2017 at 1:39 PM, Yi Pan <nickpa...@gmail.com> wrote:

> Hi, Xinyu,
>
> Thanks for the proposal. I took a quick pass and had the following
> questions/comments:
>
> - message shuffling ==> data shuffling???
>
> - the proposal is for all types of control messages, not just for
> end-of-stream, right? Better to define the scope and layout the comment
> requirements of control message delivery.
>
> - dropped option should go to “Rejected alternatives”
>
> - “Samza finds out the following intermediate streams that all the inputs
> have been end-of-stream” what does it mean? The task consuming the input
> stream(s) reconcile all EoS from all input stream partitions and then
> propagate EoS messages to all partitions in intermediate streams? This is
> not super clear to me.
>
> - in step-3, how does the consumer of intermediate streams know how many
> EOS messages should be received? And we should make it clear that it should
> be EOS / producer and the count of the downstream consumer is counting on
> the number of unique EOS from all producers from the upstream.
>
> - In comparison table, “checkpoint the control messages received” ==> is it
> referring to the partially accumulated upstream EOS messages?
>
> - Please make a clear definition on “Watermark” and “EndOfStream”. Why are
> they different? Are they both control messages that requires the same
> delivery pattern (i.e. broadcast to downstream, reconcile at the consumer)?
> If yes, should we make the “watermark” vs “EndOfStream” a sub-category in
> control message?
>
> - As for the serde for intermediate stream, I assume that we will need an
> envelope serde that is avro to wrap the user message and control message
> in? So, user-defined serde now only applies to the “UserMessage”? And
> what’s the message key in the message format?
>
> - A big question regarding to the watermark propagation: “When Samza
> receives watermark messages, it will emit a watermark with the earliest
> event time across all the stream partitions. No emission if the earliest
> event time doesn’t change.” Does the watermark propagation requires
> synchronization/coordination between all producers at the source? Say, if
> the task taking one input source emits watermark at 1min interval and the
> task taking another input source emits watermark at 5min interval, how does
> the downstream consumer reconcile the watermarks?
>
> - In the checkpoint message format, it seems that it is only design for
> watermark messages? Any streamId info that EoS is carrying over?
>
>
> Thanks a lot!
>
>
> -Yi
>
> On Tue, May 30, 2017 at 9:46 AM, xinyu liu <xinyuliu...@gmail.com> wrote:
>
> > Makes sense. I noticed that too and I dropped the ControlMessage type in
> my
> > code. I also moved taskName, taskCount to the parent ControlMessage
> class.
> > Just updated the SEP-6. Please take a look again.
> >
> > Thanks,
> > Xinyu
> >
> > On Tue, May 30, 2017 at 9:12 AM, Chris Pettitt <
> > cpett...@linkedin.com.invalid> wrote:
> >
> > > MessageType and ControlMessage.Type look redundant. You could either
> use
> > > "ControlMessage" as the type in MessageType or drop
> ControlMessage.Type.
> > >
> > > On Fri, May 26, 2017 at 5:14 PM, xinyu liu <xinyuliu...@gmail.com>
> > wrote:
> > >
> > > > Thanks a lot for the comments. I updated the SEP with more details
> and
> > > > clarification. Please let me know if you have further questions.
> > > >
> > > > Thanks,
> > > > Xinyu
> > > >
> > > > On Thu, May 25, 2017 at 11:19 AM, Prateek Maheshwari <
> > > > pmaheshw...@linkedin.com.invalid> wrote:
> > > >
> > > > > Hi Xinyu,
> > > > >
> > > > > Thanks for the proposal. Some requests for clarifications. Let's
> > update
> > > > the
> > > > > SEP directly instead of replying here.
> > > > >
> > > > > E.g., in "For any following intermediate stream whose input streams
> > are
> > > > all
> > > > > end-of-stream, it will be marked as pending EOS" - Should clarify
> > that
> > > > > (

Re: [DISCUSS] SEP-6: Support Watermark Across Intermediate Streams for Batch Processing

2017-05-30 Thread Chris Pettitt
MessageType and ControlMessage.Type look redundant. You could either use
"ControlMessage" as the type in MessageType or drop ControlMessage.Type.

On Fri, May 26, 2017 at 5:14 PM, xinyu liu  wrote:

> Thanks a lot for the comments. I updated the SEP with more details and
> clarification. Please let me know if you have further questions.
>
> Thanks,
> Xinyu
>
> On Thu, May 25, 2017 at 11:19 AM, Prateek Maheshwari <
> pmaheshw...@linkedin.com.invalid> wrote:
>
> > Hi Xinyu,
> >
> > Thanks for the proposal. Some requests for clarifications. Let's update
> the
> > SEP directly instead of replying here.
> >
> > E.g., in "For any following intermediate stream whose input streams are
> all
> > end-of-stream, it will be marked as pending EOS" - Should clarify that
> > (IIUC) something is injecting EOS messages in all intermediate stream
> > partitions once it receives EOS from all input stream partitions it's
> > consuming. Should also clarify what is that something.
> > Same for "declare end of stream once all the EOS messages have been
> > received." - What does this declaration involve and who is doing this?
> >
> > In pro for approach 2: Not clear what this means - "The watermark can
> > conclude the input messages before this watermark have been complete."
> >
> > For the cons of approach 2: "Complicated failure scenario of the second
> > job. It needs to checkpoint all the watermark messages received, so when
> it
> > recovered from failure, it can still count." - How is this related to
> EOS?
> > How is this related to the checkpoint watermark section?
> > Also, what is the "more messages required to write.. " referring to?
> >
> > "Samza needs to reconcile based on the task counts." - Please explain
> what
> > reconciliation means, why it needs to happen, and why we need to track
> the
> > producer task and total task count in the watermark message to do this.
> >
> > Checkpoint watermarks section is also unclear. What problem are we trying
> > to solve here?
> >
> > Should also move the message format and the watermark message interface
> > sections to the bottom, since they depend on details in the event time
> and
> > checkpoint watermark sections.
> >
> > Thanks,
> > Prateek
> >
> >
> > On Wed, May 24, 2017 at 11:30 AM, xinyu liu 
> wrote:
> >
> > > Hi all,
> > >
> > > I created SEP-6 for SAMZA-1260
> > > : Support Watermark
> > > Across Intermediate Streams for Batch Processing. The link to the SEP
> is
> > > here:
> > >
> > > https://cwiki.apache.org/confluence/display/SAMZA/SEP-
> > > 6+Support+Watermark+Across+Intermediate+Streams+for+Batch+Processing
> > >
> > > Please review and comments are welcome!
> > >
> > > Thanks,
> > > Xinyu
> > >
> >
>


Re: [DISCUSS] SEP-2: ApplicationRunner Design

2017-05-03 Thread Chris Pettitt
Hi Xinyu,

I took a second look at the registerStore API. Would it be possible to call
register storeDirectly on the app, similar to what we're doing with
app.input (possible with the restriction registerStore must be called
before we add an operator that uses the store)? Otherwise we'll end up
having to do two passes on the graph again - similar to the way we had to
do a pass to init stream config and then hook up the graph.

Thanks,
Chris


On Fri, Apr 28, 2017 at 8:55 PM, xinyu liu <xinyuliu...@gmail.com> wrote:

> Right, option #2 seems redundant for defining streams after further
> discussion here. StreamSpec itself is flexible enough to achieve both
> static and programmatic specification of the stream. Agree it's not
> convenient for now (pretty obvious after looking at your bsr
> beam.runners.samza.wrapper), and we should provide similar predefined
> convenient wrappers for user to create the StreamSpec. In your case
> something like BoundedStreamSpec.file() which will generate the system
> and serialize the data as you did.
>
> We're still thinking the callback proposed in #2 can be useful for
> requirement #6: injecting other user objects in run time, such as stores
> and metrics. To simplify the user understanding further, I think we might
> hide the ApplicationRunner and expose the StreamApplication instead, which
> will make requirement #3 not user facing. So the API becomes like:
>
>   StreamApplication app = StreamApplication.local(config)
> .init (env -> {
>env.registerStore("my-store", new MyStoreFactory());
>env.registerMetricsReporter("my-reporte", new
> MyMetricsReporterFactory());
> })
> .withLifeCycleListener(myListener);
>
>   app.input(BoundedStreamSpec.create("/sample/input.txt"))
> .map(...)
> .window(...)
>
>   app.run();
>
> For requirement #5, I add a .withLifeCycleListener() in the API, which can
> trigger the callbacks with life cycle events.
>
> For #4: distribution of the jars will be what we have today using the Yarn
> localization with a remote store like artifactory or http server. We
> discussed where to put the graph serialization. The current thinking is to
> define a general interface which can backed by a remote store, like Kafka,
> artifactory or http server. For Kafka, it's straightforward but we will
> have the size limit or cut it by ourselves. For the other two, we need to
> investigate whether we can easily upload jars to our artifactory and
> localizing it with Yarn. Any opinions on this?
>
> Thanks,
> Xinyu
>
> On Fri, Apr 28, 2017 at 11:34 AM, Chris Pettitt <
> cpett...@linkedin.com.invalid> wrote:
>
> > Your proposal for #1 looks good.
> >
> > I'm not quite how to reconcile the proposals for #1 and #2. In #1 you add
> > the stream spec straight onto the runner while in #2 you do it in a
> > callback. If it is either-or, #1 looks a lot better for my purposes.
> >
> > For #4 what mechanism are you using to distribute the JARs? Can you use
> the
> > same mechanism to distribute the serialized graph?
> >
> > On Fri, Apr 28, 2017 at 12:14 AM, xinyu liu <xinyuliu...@gmail.com>
> wrote:
> >
> > > btw, I will get to SAMZA-1246 as soon as possible.
> > >
> > > Thanks,
> > > Xinyu
> > >
> > > On Thu, Apr 27, 2017 at 9:11 PM, xinyu liu <xinyuliu...@gmail.com>
> > wrote:
> > >
> > > > Let me try to capture the updated requirements:
> > > >
> > > > 1. Set up input streams outside StreamGraph, and treat graph building
> > as
> > > a
> > > > library (*Fluent, Beam*).
> > > >
> > > > 2. Improve ease of use for ApplicationRunner to avoid complex
> > > > configurations such as zkCoordinator, zkCoordinationService.
> > > (*Standalone*).
> > > > Provide some programmatic way to tweak them in the API.
> > > >
> > > > 3. Clean up ApplicationRunner into a single interface (*Fluent*). We
> > can
> > > > have one or more implementations but it's hidden from the users.
> > > >
> > > > 4. Separate StreamGraph from runtime environment so it can be
> > serialized
> > > (*Beam,
> > > > Yarn*)
> > > >
> > > > 5. Better life cycle management of application, parity with
> > > > StreamProcessor (*Standalone, Beam*). Stats should include exception
> in
> > > > case of failure (tracked in SAMZA-1246).
> > > >
> > > > 6. Support injecting user-defined objects into ApplicationRunner.
> > > >
> > > > Prateek

Re: [DISCUSS] SEP-2: ApplicationRunner Design

2017-04-28 Thread Chris Pettitt
Your proposal for #1 looks good.

I'm not quite how to reconcile the proposals for #1 and #2. In #1 you add
the stream spec straight onto the runner while in #2 you do it in a
callback. If it is either-or, #1 looks a lot better for my purposes.

For #4 what mechanism are you using to distribute the JARs? Can you use the
same mechanism to distribute the serialized graph?

On Fri, Apr 28, 2017 at 12:14 AM, xinyu liu <xinyuliu...@gmail.com> wrote:

> btw, I will get to SAMZA-1246 as soon as possible.
>
> Thanks,
> Xinyu
>
> On Thu, Apr 27, 2017 at 9:11 PM, xinyu liu <xinyuliu...@gmail.com> wrote:
>
> > Let me try to capture the updated requirements:
> >
> > 1. Set up input streams outside StreamGraph, and treat graph building as
> a
> > library (*Fluent, Beam*).
> >
> > 2. Improve ease of use for ApplicationRunner to avoid complex
> > configurations such as zkCoordinator, zkCoordinationService.
> (*Standalone*).
> > Provide some programmatic way to tweak them in the API.
> >
> > 3. Clean up ApplicationRunner into a single interface (*Fluent*). We can
> > have one or more implementations but it's hidden from the users.
> >
> > 4. Separate StreamGraph from runtime environment so it can be serialized
> (*Beam,
> > Yarn*)
> >
> > 5. Better life cycle management of application, parity with
> > StreamProcessor (*Standalone, Beam*). Stats should include exception in
> > case of failure (tracked in SAMZA-1246).
> >
> > 6. Support injecting user-defined objects into ApplicationRunner.
> >
> > Prateek and I iterate on the ApplilcationRunner API based on these
> > requirements. To support #1, we can set up input streams on the runner
> > level, which returns the MessageStream and allows graph building
> > afterwards. The code looks like below:
> >
> >   ApplicationRunner runner = ApplicationRunner.local();
> >   runner.input(streamSpec)
> > .map(..)
> > .window(...)
> >   runner.run();
> >
> > StreamSpec is the building block for setting up streams here. It can be
> > set up in different ways:
> >
> >   - Direct creation of stream spec, like runner.input(new StreamSpec(id,
> > system, stream))
> >   - Load from streamId from env or config, like
> runner.input(runner.env().
> > getStreamSpec(id))
> >   - Canned Spec which generates the StreamSpec with id, system and stream
> > to minimize the configuration. For example, CollectionSpec.create(new
> > ArrayList[]{1,2,3,4}), which will auto generate the system and stream in
> > the spec.
> >
> > To support #2, we need to be able to set up StreamSpec-related objects
> and
> > factories programmatically in env. Suppose we have the following before
> > runner.input(...):
> >
> >   runner.setup(env /* a writable interface of env*/ -> {
> > env.setStreamSpec(streamId, streamSpec);
> > env.setSystem(systemName, systemFactory);
> >   })
> >
> > runner.setup(->) also provides setup for stores and other runtime stuff
> > needed for the execution. The setup should be able to serialized to
> config.
> > For #6, I haven't figured out a good way to inject user-defined objects
> > here yet.
> >
> > With this API, we should be able to also support #4. For remote
> > runner.run(), the operator user classes/lamdas in the StreamGraph need to
> > be serialized. As today, the existing option is to serialize to a stream,
> > either the coordinator stream or the pipeline control stream, which will
> > have the size limit per message. Do you see RPC as an option?
> >
> > For this version of API, seems we don't need the StreamApplication
> wrapper
> > as well as exposing the StreamGraph. Do you think we are on the right
> path?
> >
> > Thanks,
> > Xinyu
> >
> >
> > On Thu, Apr 27, 2017 at 6:09 AM, Chris Pettitt <
> > cpett...@linkedin.com.invalid> wrote:
> >
> >> That should have been:
> >>
> >> For #1, Beam doesn't have a hard requirement...
> >>
> >> On Thu, Apr 27, 2017 at 9:07 AM, Chris Pettitt <cpett...@linkedin.com>
> >> wrote:
> >>
> >> > For #1, I doesn't have a hard requirement for any change from Samza. A
> >> > very nice to have would be to allow the input systems to be set up at
> >> the
> >> > same time as the rest of the StreamGraph. An even nicer to have would
> >> be to
> >> > do away with the callback based approach and treat graph building as a
> >> > library, a la Beam and Flink.
> >> >

Re: [DISCUSS] SEP-2: ApplicationRunner Design

2017-04-27 Thread Chris Pettitt
That should have been:

For #1, Beam doesn't have a hard requirement...

On Thu, Apr 27, 2017 at 9:07 AM, Chris Pettitt <cpett...@linkedin.com>
wrote:

> For #1, I doesn't have a hard requirement for any change from Samza. A
> very nice to have would be to allow the input systems to be set up at the
> same time as the rest of the StreamGraph. An even nicer to have would be to
> do away with the callback based approach and treat graph building as a
> library, a la Beam and Flink.
>
> For the moment I've worked around the two pass requirement (once for
> config, once for StreamGraph) by introducing an IR layer between Beam and
> the Samza Fluent translation. The IR layer is convenient independent of
> this problem because it makes it easier to switch between the Fluent and
> low-level APIs.
>
>
> For #4, if we had parity with StreamProcessor for lifecycle we'd be in
> great shape. One additional issue with the status call that I may not have
> mentioned is that it provides you no way to get at the cause of failure.
> The StreamProcessor API does allow this via the callback.
>
>
> Re. #2 and #3, I'm a big fan of getting rid of the extra configuration
> indirection you currently have to jump through (this is also related to
> system consumer configuration from #1. It makes it much easier to discover
> what the configurable parameters are too, if we provide some programmatic
> way to tweak them in the API - which can turn into config under the hood.
>
> On Wed, Apr 26, 2017 at 9:20 PM, xinyu liu <xinyuliu...@gmail.com> wrote:
>
>> Let me give a shot to summarize the requirements for ApplicationRunner we
>> have discussed so far:
>>
>> - Support environment for passing in user-defined objects (streams
>> potentially) into ApplicationRunner (*Beam*)
>>
>> - Improve ease of use for ApplicationRunner to avoid complex
>> configurations
>> such as zkCoordinator, zkCoordinationService. (*Standalone*)
>>
>> - Clean up ApplicationRunner into a single interface (*Fluent*). We can
>> have one or more implementations but it's hidden from the users.
>>
>> - Separate StreamGraph from environment so it can be serializable (*Beam,
>> Yarn*)
>>
>> - Better life cycle management of application, including
>> start/stop/stats (*Standalone,
>> Beam*)
>>
>>
>> One way to address 2 and 3 is to provide pre-packaged runner using static
>> factory methods, and the return type will be the ApplicationRunner
>> interface. So we can have:
>>
>>   ApplicationRunner runner = ApplicationRunner.zk() /
>> ApplicationRunner.local()
>> / ApplicationRunner.remote() / ApplicationRunner.test().
>>
>> Internally we will package the right configs and run-time environment with
>> the runner. For example, ApplicationRunner.zk() will define all the
>> configs
>> needed for zk coordination.
>>
>> To support 1 and 4, can we pass in a lambda function in the runner, and
>> then we can run the stream graph? Like the following:
>>
>>   ApplicationRunner.zk().env(config -> environment).run(streamGraph);
>>
>> Then we need a way to pass the environment into the StreamGraph. This can
>> be done by either adding an extra parameter to each operator, or have a
>> getEnv() function in the MessageStream, which seems to be pretty hacky.
>>
>> What do you think?
>>
>> Thanks,
>> Xinyu
>>
>>
>>
>>
>>
>> On Sun, Apr 23, 2017 at 11:01 PM, Prateek Maheshwari <
>> pmaheshw...@linkedin.com.invalid> wrote:
>>
>> > Thanks for putting this together Yi!
>> >
>> > I agree with Jake, it does seem like there are a few too many moving
>> parts
>> > here. That said, the problem being solved is pretty broad, so let me
>> try to
>> > summarize my current understanding of the requirements. Please correct
>> me
>> > if I'm wrong or missing something.
>> >
>> > ApplicationRunner and JobRunner first, ignoring test environment for the
>> > moment.
>> > ApplicationRunner:
>> > 1. Create execution plan: Same in Standalone and Yarn
>> > 2. Create intermediate streams: Same logic but different leader election
>> > (ZK-based or pre-configured in standalone, AM in Yarn).
>> > 3. Run jobs: In JVM in standalone. Submit to the cluster in Yarn.
>> >
>> > JobRunner:
>> > 1. Run the StreamProcessors: Same process in Standalone & Test. Remote
>> host
>> > in Yarn.
>> >
>> > To get a single ApplicationRunner implementation, like Jake suggested,
>> we
>> > need 

Re: [DISCUSS] SEP-2: ApplicationRunner Design

2017-04-27 Thread Chris Pettitt
For #1, I doesn't have a hard requirement for any change from Samza. A very
nice to have would be to allow the input systems to be set up at the same
time as the rest of the StreamGraph. An even nicer to have would be to do
away with the callback based approach and treat graph building as a
library, a la Beam and Flink.

For the moment I've worked around the two pass requirement (once for
config, once for StreamGraph) by introducing an IR layer between Beam and
the Samza Fluent translation. The IR layer is convenient independent of
this problem because it makes it easier to switch between the Fluent and
low-level APIs.


For #4, if we had parity with StreamProcessor for lifecycle we'd be in
great shape. One additional issue with the status call that I may not have
mentioned is that it provides you no way to get at the cause of failure.
The StreamProcessor API does allow this via the callback.


Re. #2 and #3, I'm a big fan of getting rid of the extra configuration
indirection you currently have to jump through (this is also related to
system consumer configuration from #1. It makes it much easier to discover
what the configurable parameters are too, if we provide some programmatic
way to tweak them in the API - which can turn into config under the hood.

On Wed, Apr 26, 2017 at 9:20 PM, xinyu liu  wrote:

> Let me give a shot to summarize the requirements for ApplicationRunner we
> have discussed so far:
>
> - Support environment for passing in user-defined objects (streams
> potentially) into ApplicationRunner (*Beam*)
>
> - Improve ease of use for ApplicationRunner to avoid complex configurations
> such as zkCoordinator, zkCoordinationService. (*Standalone*)
>
> - Clean up ApplicationRunner into a single interface (*Fluent*). We can
> have one or more implementations but it's hidden from the users.
>
> - Separate StreamGraph from environment so it can be serializable (*Beam,
> Yarn*)
>
> - Better life cycle management of application, including
> start/stop/stats (*Standalone,
> Beam*)
>
>
> One way to address 2 and 3 is to provide pre-packaged runner using static
> factory methods, and the return type will be the ApplicationRunner
> interface. So we can have:
>
>   ApplicationRunner runner = ApplicationRunner.zk() /
> ApplicationRunner.local()
> / ApplicationRunner.remote() / ApplicationRunner.test().
>
> Internally we will package the right configs and run-time environment with
> the runner. For example, ApplicationRunner.zk() will define all the configs
> needed for zk coordination.
>
> To support 1 and 4, can we pass in a lambda function in the runner, and
> then we can run the stream graph? Like the following:
>
>   ApplicationRunner.zk().env(config -> environment).run(streamGraph);
>
> Then we need a way to pass the environment into the StreamGraph. This can
> be done by either adding an extra parameter to each operator, or have a
> getEnv() function in the MessageStream, which seems to be pretty hacky.
>
> What do you think?
>
> Thanks,
> Xinyu
>
>
>
>
>
> On Sun, Apr 23, 2017 at 11:01 PM, Prateek Maheshwari <
> pmaheshw...@linkedin.com.invalid> wrote:
>
> > Thanks for putting this together Yi!
> >
> > I agree with Jake, it does seem like there are a few too many moving
> parts
> > here. That said, the problem being solved is pretty broad, so let me try
> to
> > summarize my current understanding of the requirements. Please correct me
> > if I'm wrong or missing something.
> >
> > ApplicationRunner and JobRunner first, ignoring test environment for the
> > moment.
> > ApplicationRunner:
> > 1. Create execution plan: Same in Standalone and Yarn
> > 2. Create intermediate streams: Same logic but different leader election
> > (ZK-based or pre-configured in standalone, AM in Yarn).
> > 3. Run jobs: In JVM in standalone. Submit to the cluster in Yarn.
> >
> > JobRunner:
> > 1. Run the StreamProcessors: Same process in Standalone & Test. Remote
> host
> > in Yarn.
> >
> > To get a single ApplicationRunner implementation, like Jake suggested, we
> > need to make leader election and JobRunner implementation pluggable.
> > There's still the question of whether ApplicationRunner#run API should be
> > blocking or non-blocking. It has to be non-blocking in YARN. We want it
> to
> > be blocking in standalone, but seems like the main reason is ease of use
> > when launched from main(). I'd prefer making it consitently non-blocking
> > instead, esp. since in embedded standalone mode (where the processor is
> > running in another container) a blocking API would not be user-friendly
> > either. If not, we can add both run and runBlocking.
> >
> > Coming to RuntimeEnvironment, which is the least clear to me so far:
> > 1. I don't think RuntimeEnvironment should be responsible for providing
> > StreamSpecs for streamIds - they can be obtained with a config/util
> class.
> > The StreamProcessor should only know about logical streamIds and the
> > streamId <-> actual stream mapping should happen 

Re: [DISCUSS] SEP-2: ApplicationRunner Design

2017-04-21 Thread Chris Pettitt
I'm playing with ApplicationRunner, so I'll probably have more feedback.
For now, in addition to async run we also need async notification of
completion or failure. Also, ApplicationStatus should be able to give me
the cause of failure (e.g. via an Exception), not just a failure state.

On Thu, Apr 20, 2017 at 3:52 PM, Chris Pettitt <cpett...@linkedin.com>
wrote:

> It might be worth taking a look at how Beam does test streams. The API is
> more powerful than just passing in a queue, e.g.:
>
> TestStream source = TestStream.create(StringUtf8Coder.of())
> .addElements(TimestampedValue.of("this", start))
> .addElements(TimestampedValue.of("that", start))
> .addElements(TimestampedValue.of("future", 
> start.plus(Duration.standardMinutes(1
> .advanceProcessingTime(Duration.standardMinutes(3))
> .advanceWatermarkTo(start.plus(Duration.standardSeconds(30)))
> .advanceWatermarkTo(start.plus(Duration.standardMinutes(1)))
> .advanceWatermarkToInfinity();
>
> ---
>
> BTW, have we given up on the idea of a simpler input system, e.g. one that
> assumes all input messages are keyed? It seems it would be possible to
> support legacy "system streams" via an adapter that mapped K, V -> V' and
> could open the possibility of inputs in whatever for users want, e.g.
> (again from Beam):
>
> final Create.Values values = Create.of("test", "one", "two", "three");
>
> final TextIO.Read.Bound from = 
> TextIO.Read.from("src/main/resources/words.txt");
>
> final KafkaIO.Read<Integer, Long> reader = KafkaIO.<Integer, Long>read()
>
> .withBootstrapServers("myServer1:9092,myServer2:9092")
>
> .withTopics(topics)
>
> .withConsumerFactoryFn(new ConsumerFactoryFn(
>
> topics, 10, numElements, OffsetResetStrategy.EARLIEST))
>
> .withKeyCoder(BigEndianIntegerCoder.of())
>
> .withValueCoder(BigEndianLongCoder.of())
>
> .withMaxNumRecords(numElements);
> Ideally, such a simple input system specification would be useable in 
> production as well as test. At that point I don't know if we need a separate 
> TestApplicationRunner except perhaps as a hint to what we've been calling an 
> Environment?
>
> ---
>
> Aren't we supposed to be able to run applications without blocking (e.g.
> for embedded cases)? The API suggests that run is going to be a blocking
> call?
>
> - Chris
>
>
> On Thu, Apr 20, 2017 at 1:06 PM, Jacob Maes <jacob.m...@gmail.com> wrote:
>
>> Thanks for the SEP!
>>
>> +1 on introducing these new components
>> -1 on the current definition of their roles (see Design feedback below)
>>
>> *Design*
>>
>>- If LocalJobRunner and RemoteJobRunner handle the different methods of
>>launching a Job, what additional value do the different types of
>>ApplicationRunner and RuntimeEnvironment provide? It seems like a red
>> flag
>>that all 3 would need to change from environment to environment. It
>>indicates that they don't have proper modularity. The
>> call-sequence-figures
>>support this; LocalApplicationRunner and RemoteApplicationRunner make
>> the
>>same calls and the diagram only varies after jobRunner.start()
>>- As far as I can tell, the only difference between Local and Remote
>>ApplicationRunner is that one is blocking and the other is
>> non-blocking. If
>>that's all they're for then either the names should be changed to
>> reflect
>>this, or they should be combined into one ApplicationRunner and just
>> expose
>>separate methods for run() and runBlocking()
>>- There isn't much detail on why the main() methods for Local/Remote
>>have such different implementations, how they receive the Application
>>(direct vs config), and concretely how the deployment scripts, if any,
>>should interact with them.
>>
>>
>> *Style*
>>
>>- nit: None of the 11 uses of the word "actual" in the doc are
>> *actually*
>>needed. :-)
>>- nit: Colors of the runtime blocks in the diagrams are unconventional
>>and a little distracting. Reminds me of nai won bao. Now I'm hungry.
>> :-)
>>- Prefer the name "ExecutionEnvironment" over "RuntimeEnvironment". The
>>term "execution environment" is used
>>- The code comparisons for the ApplicationRunners are not
>> apples-apples.
>>The local runner example is an application that USES the local runner.
>> The
>>remote 

Re: [DISCUSS] SEP-2: ApplicationRunner Design

2017-04-20 Thread Chris Pettitt
It might be worth taking a look at how Beam does test streams. The API is
more powerful than just passing in a queue, e.g.:

TestStream source = TestStream.create(StringUtf8Coder.of())
.addElements(TimestampedValue.of("this", start))
.addElements(TimestampedValue.of("that", start))
.addElements(TimestampedValue.of("future",
start.plus(Duration.standardMinutes(1
.advanceProcessingTime(Duration.standardMinutes(3))
.advanceWatermarkTo(start.plus(Duration.standardSeconds(30)))
.advanceWatermarkTo(start.plus(Duration.standardMinutes(1)))
.advanceWatermarkToInfinity();

---

BTW, have we given up on the idea of a simpler input system, e.g. one that
assumes all input messages are keyed? It seems it would be possible to
support legacy "system streams" via an adapter that mapped K, V -> V' and
could open the possibility of inputs in whatever for users want, e.g.
(again from Beam):

final Create.Values values = Create.of("test", "one", "two", "three");

final TextIO.Read.Bound from = TextIO.Read.from("src/main/resources/words.txt");

final KafkaIO.Read reader = KafkaIO.read()

.withBootstrapServers("myServer1:9092,myServer2:9092")

.withTopics(topics)

.withConsumerFactoryFn(new ConsumerFactoryFn(

topics, 10, numElements, OffsetResetStrategy.EARLIEST))

.withKeyCoder(BigEndianIntegerCoder.of())

.withValueCoder(BigEndianLongCoder.of())

.withMaxNumRecords(numElements);
Ideally, such a simple input system specification would be useable in
production as well as test. At that point I don't know if we need a
separate TestApplicationRunner except perhaps as a hint to what we've
been calling an Environment?

---

Aren't we supposed to be able to run applications without blocking (e.g.
for embedded cases)? The API suggests that run is going to be a blocking
call?

- Chris


On Thu, Apr 20, 2017 at 1:06 PM, Jacob Maes  wrote:

> Thanks for the SEP!
>
> +1 on introducing these new components
> -1 on the current definition of their roles (see Design feedback below)
>
> *Design*
>
>- If LocalJobRunner and RemoteJobRunner handle the different methods of
>launching a Job, what additional value do the different types of
>ApplicationRunner and RuntimeEnvironment provide? It seems like a red
> flag
>that all 3 would need to change from environment to environment. It
>indicates that they don't have proper modularity. The
> call-sequence-figures
>support this; LocalApplicationRunner and RemoteApplicationRunner make
> the
>same calls and the diagram only varies after jobRunner.start()
>- As far as I can tell, the only difference between Local and Remote
>ApplicationRunner is that one is blocking and the other is
> non-blocking. If
>that's all they're for then either the names should be changed to
> reflect
>this, or they should be combined into one ApplicationRunner and just
> expose
>separate methods for run() and runBlocking()
>- There isn't much detail on why the main() methods for Local/Remote
>have such different implementations, how they receive the Application
>(direct vs config), and concretely how the deployment scripts, if any,
>should interact with them.
>
>
> *Style*
>
>- nit: None of the 11 uses of the word "actual" in the doc are
> *actually*
>needed. :-)
>- nit: Colors of the runtime blocks in the diagrams are unconventional
>and a little distracting. Reminds me of nai won bao. Now I'm hungry. :-)
>- Prefer the name "ExecutionEnvironment" over "RuntimeEnvironment". The
>term "execution environment" is used
>- The code comparisons for the ApplicationRunners are not apples-apples.
>The local runner example is an application that USES the local runner.
> The
>remote runner example is the just the runner code itself. So, it's not
>readily apparent that we're comparing the main() methods and not the
>application itself.
>
>
> On Mon, Apr 17, 2017 at 5:02 PM, Yi Pan  wrote:
>
> > Made some updates to clarify the role and functions of RuntimeEnvironment
> > in SEP-2.
> >
> > On Fri, Apr 14, 2017 at 9:30 AM, Yi Pan  wrote:
> >
> > > Hi, everyone,
> > >
> > > In light of new features such as fluent API and standalone that
> introduce
> > > new deployment / application launch models in Samza, I created a new
> > SEP-2
> > > to address the new use cases. SEP-2 link: https://cwiki.apache.
> > > org/confluence/display/SAMZA/SEP-2%3A+ApplicationRunner+Design
> > >
> > > Please take a look and give feedbacks!
> > >
> > > Thanks!
> > >
> > > -Yi
> > >
> >
>


Re: Review Request 53282: SAMZA-1043: Samza performance improvements

2016-11-02 Thread Chris Pettitt

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/53282/#review154611
---


Fix it, then Ship it!




Can you get the numbers with all fixes but with instrumentation turned on? I 
don't think we'd ever run without the instrumentation on at LI. If 
instrumentation is a significant problem we should look at alternatives to 
tracking it or consider changing the granularity.


samza-core/src/main/java/org/apache/samza/util/TimerClock.java (line 25)
<https://reviews.apache.org/r/53282/#comment224228>

We have a HighResolutionClock that does this. I think you can use it here.


- Chris Pettitt


On Nov. 2, 2016, 5:56 p.m., Xinyu Liu wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/53282/
> ---
> 
> (Updated Nov. 2, 2016, 5:56 p.m.)
> 
> 
> Review request for samza, Chris Pettitt, Jake Maes, and Navina Ramesh.
> 
> 
> Repository: samza
> 
> 
> Description
> ---
> 
> In the recent experiments of samza batch job (consuming hdfs data on hadoop), 
> the results are subpar to map/reduce and spark. By looking at the metrics 
> closely, we found two basic problems:
> 
> 1) Not enough data to process. This is spotted as the unprocessed message 
> queue length was zero for quite a lot of times.
> 
> 2) Not process fast enough. We found samza performed closely in both median 
> size records (100B) and small record (10B), while spark can scale very well 
> in the small record (over 1M/s).
> 
> The first problem is solved by increasing the buffer size. This ticket is to 
> address the second problem, which contains three major improvements:
> 
> - Option to turn off timer metrics calculation: one of the main time spent in 
> samza processing turns out to be just keeping the timer metrics. While it is 
> useful in debugging, it becomes a bottleneck when running a stable job with 
> high performance. In my testing job which consumes 8M mock data, it took 30 
> secs with timer metrics on. After turning it off, it only took 14 secs.
> 
> - Java coding improvements: The AsyncRunLoop code can be further optimized 
> for efficiency. Some of the thread-safe data structure I am using is not for 
> optimal performance (Collections.synchronizedSet). I switched to use 
> CopyOnWriteArraySet, which has far better performance due to more reads and 
> small set size.
> 
> - Specific handling for in-order processing improvements: AsyncRunLoop 
> handles the callbacks regardless of whether it's in-order or out-of-order 
> (max concurrency > 1), which incurs quite some cost. By simplying the logic 
> for in-order handling, the performance gains.
> 
> After all three improvements, my test job with mock input (8M messages) can 
> be processed within 8 sec (down from org 30 secs), so it's 1M/s for one cpu 
> core.
> 
> For the performance benchmark jobs running in Hadoop, we also see a 4 times 
> improvement with all the fixes above. Please take a look at the attached 
> spreedsheet (see the numbers with fix(turn off the timing metrics) and 
> fix2(all three together).
> 
> 
> Diffs
> -
> 
>   samza-core/src/main/java/org/apache/samza/container/RunLoopFactory.java 
> 609a956a1f2fa97419c2f66fe2fb6876aaaeecd0 
>   samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java 
> 8fac8155c7f64e67d4a39ec6943f98da1e1d63d9 
>   samza-core/src/main/java/org/apache/samza/task/CoordinatorRequests.java 
> 052b3b91ec609ca6288662cfa2d3e71b0273d020 
>   samza-core/src/main/java/org/apache/samza/task/TaskCallbackImpl.java 
> 9b700998d2af040c6734289f7f28bbd78c36bd2c 
>   samza-core/src/main/java/org/apache/samza/task/TaskCallbackManager.java 
> 132cf59eb593524a4cac134aeceeeb37a4c74b1f 
>   samza-core/src/main/java/org/apache/samza/util/TimerClock.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/util/Utils.java 
> 472e0a59d5aa992b136292c8a3347c311e2cd606 
>   samza-core/src/main/scala/org/apache/samza/config/MetricsConfig.scala 
> c3fd8bfb2e16a4c5146d34682d04cb1d4e9bbe72 
>   samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 
> e0468ee89c89fd720834461771ebb36475475bcb 
>   samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala 
> e2aed5b1c2e77a914268963b21809380972037b6 
>   samza-core/src/main/scala/org/apache/samza/util/Util.scala 
> c4836f202f7eda1d4e71eac94fd48e46207b0316 
>   samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java 
> 6000ffaf2b8723d48a72e58b571f242a42dc8128 
>   samza-c

Re: Review Request 52403: SAMZA-1028: Moving logline before closing kafka producer and making exception thrown AtomicReference

2016-09-29 Thread Chris Pettitt

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/52403/#review150907
---



You're not using an atomic compare and set (CAS), so you don't need an atomic 
ref - a volatile would be sufficient. However, if the code can be run in a 
multi-threaded path, you would indeed want to use CAS to dequeue the exception 
in one operation.

- Chris Pettitt


On Sept. 29, 2016, 7:04 p.m., Xinyu Liu wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/52403/
> ---
> 
> (Updated Sept. 29, 2016, 7:04 p.m.)
> 
> 
> Review request for samza, Navina Ramesh and Jagadish Venkatraman.
> 
> 
> Repository: samza
> 
> 
> Description
> ---
> 
> Current the error log happens after produce close and reset the exception in 
> later callbacks, which caused the trouble shooting to be harder in cases of 
> multithreading. We should log error before closing and keep atomic reference 
> of the initial exception.
> 
> 
> Diffs
> -
> 
>   
> samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala
>  5ff6d3caf54ed148aa40c7c752c587e556a4f34a 
> 
> Diff: https://reviews.apache.org/r/52403/diff/
> 
> 
> Testing
> ---
> 
> Tested in jobs deployed in Yarn cluster.
> 
> 
> Thanks,
> 
> Xinyu Liu
> 
>



Re: Review Request 47835: SAMZA-914: Initial draft for Java programming APIs on operators supporting DAGs

2016-09-14 Thread Chris Pettitt


> On Sept. 14, 2016, 7:03 p.m., Chris Pettitt wrote:
> > This review turned out not to be so massive as I expected (lots of 
> > deletes). I don't see any serious issues. There are some minor cosmetic 
> > issues and some subjective stuff that you can take or leave. I tried to not 
> > mark subjective stuff as an issue.

BTW, I should have mentioned that this is looking pretty good. Nice work!


- Chris


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/47835/#review148908
---


On Sept. 14, 2016, 8:53 a.m., Yi Pan (Data Infrastructure) wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/47835/
> ---
> 
> (Updated Sept. 14, 2016, 8:53 a.m.)
> 
> 
> Review request for samza, Boris Shkolnik, Chris Pettitt, Chinmay Soman, Jake 
> Maes, Navina Ramesh, Jagadish Venkatraman, and Xinyu Liu.
> 
> 
> Bugs: SAMZA-914
> https://issues.apache.org/jira/browse/SAMZA-914
> 
> 
> Repository: samza
> 
> 
> Description
> ---
> 
> SAMZA-914: initial draft of operator programming API. Design doc attached to 
> SAMZA-914: 
> https://issues.apache.org/jira/secure/attachment/12821524/SAMZA-914_%20operator%20Java%20programming%20API%20-%20Google%20Docs.pdf
> 
> 
> Diffs
> -
> 
>   build.gradle 16facbbf4dff378c561461786ff186bd9eed 
>   gradle/dependency-versions.gradle 52e25aa53a1edc85d478b48898621b26508ad4bb 
>   samza-api/src/test/java/org/apache/samza/config/TestConfig.java 
> 5d066c5867e9df9e94e60bde825dedf10703b399 
>   
> samza-operator/src/main/java/org/apache/samza/operators/api/MessageStream.java
>  PRE-CREATION 
>   
> samza-operator/src/main/java/org/apache/samza/operators/api/MessageStreams.java
>  PRE-CREATION 
>   samza-operator/src/main/java/org/apache/samza/operators/api/Triggers.java 
> PRE-CREATION 
>   
> samza-operator/src/main/java/org/apache/samza/operators/api/WindowState.java 
> PRE-CREATION 
>   samza-operator/src/main/java/org/apache/samza/operators/api/Windows.java 
> PRE-CREATION 
>   
> samza-operator/src/main/java/org/apache/samza/operators/api/data/Message.java 
> PRE-CREATION 
>   
> samza-operator/src/main/java/org/apache/samza/operators/api/data/WindowOutput.java
>  PRE-CREATION 
>   
> samza-operator/src/main/java/org/apache/samza/operators/api/internal/Operators.java
>  PRE-CREATION 
>   
> samza-operator/src/main/java/org/apache/samza/operators/api/internal/Trigger.java
>  PRE-CREATION 
>   
> samza-operator/src/main/java/org/apache/samza/operators/api/internal/Window.java
>  PRE-CREATION 
>   
> samza-operator/src/main/java/org/apache/samza/operators/impl/ChainedOperators.java
>  PRE-CREATION 
>   
> samza-operator/src/main/java/org/apache/samza/operators/impl/OperatorBaseImpl.java
>  PRE-CREATION 
>   
> samza-operator/src/main/java/org/apache/samza/operators/impl/ProcessorContext.java
>  PRE-CREATION 
>   
> samza-operator/src/main/java/org/apache/samza/operators/impl/SimpleOperatorImpl.java
>  PRE-CREATION 
>   
> samza-operator/src/main/java/org/apache/samza/operators/impl/StateStoreImpl.java
>  PRE-CREATION 
>   
> samza-operator/src/main/java/org/apache/samza/operators/impl/window/SessionWindowImpl.java
>  PRE-CREATION 
>   
> samza-operator/src/main/java/org/apache/samza/task/StreamOperatorAdaptorTask.java
>  PRE-CREATION 
>   samza-operator/src/main/java/org/apache/samza/task/StreamOperatorTask.java 
> PRE-CREATION 
>   
> samza-operator/src/test/java/org/apache/samza/operators/api/data/TestMessageStream.java
>  PRE-CREATION 
>   
> samza-operator/src/test/java/org/apache/samza/task/AssembleCallGraphTask.java 
> PRE-CREATION 
>   
> samza-operator/src/test/java/org/apache/samza/task/BroadcastOperatorTask.java 
> PRE-CREATION 
>   
> samza-operator/src/test/java/org/apache/samza/task/InputAvroSystemMessage.java
>  PRE-CREATION 
>   samza-operator/src/test/java/org/apache/samza/task/JoinOperatorTask.java 
> PRE-CREATION 
>   
> samza-operator/src/test/java/org/apache/samza/task/TestStreamOperatorTasks.java
>  PRE-CREATION 
>   samza-operator/src/test/java/org/apache/samza/task/WindowOperatorTask.java 
> PRE-CREATION 
>   
> samza-sql-calcite/src/test/java/org/apache/samza/sql/calcite/schema/TestAvroSchemaConverter.java
>  PRE-CREATION 
>   samza-sql-core/README.md PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Data.java 
> PRE-CREATION 
>   samza-sql-core/src/m

Re: Review Request 47835: SAMZA-914: Initial draft for Java programming APIs on operators supporting DAGs

2016-09-14 Thread Chris Pettitt

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/47835/#review148908
---



This review turned out not to be so massive as I expected (lots of deletes). I 
don't see any serious issues. There are some minor cosmetic issues and some 
subjective stuff that you can take or leave. I tried to not mark subjective 
stuff as an issue.


samza-operator/src/main/java/org/apache/samza/operators/api/MessageStream.java 
(line 53)
<https://reviews.apache.org/r/47835/#comment216420>

Minor: you have inline other transform functions below, so it's not obvious 
why this was not inlined. Inlined might be slightly nicer (for review at least 
:)) as it keeps the code in one place. Your call.



samza-operator/src/main/java/org/apache/samza/operators/api/MessageStream.java 
(line 67)
<https://reviews.apache.org/r/47835/#comment216415>

Doc and code do not agree. The code has unbounded type parameters, while 
the doc suggests specific types.

If the doc is correct, we should be able to satisfy it without a generic 
type.



samza-operator/src/main/java/org/apache/samza/operators/api/MessageStreams.java 
(line 32)
<https://reviews.apache.org/r/47835/#comment216424>

Private constructor. I explain the benefits either above or below (been 
jumping around a bit).



samza-operator/src/main/java/org/apache/samza/operators/api/Triggers.java (line 
30)
<https://reviews.apache.org/r/47835/#comment216431>

It's probably worth noting that if there are multiple triggers the 
aggregate value is the disjunction of the individual values.



samza-operator/src/main/java/org/apache/samza/operators/api/Triggers.java (line 
39)
<https://reviews.apache.org/r/47835/#comment216429>

Maybe TriggerBuilder as this differs a bit from the typical class of static 
factory methods implied by the class name?



samza-operator/src/main/java/org/apache/samza/operators/api/Windows.java (line 
48)
<https://reviews.apache.org/r/47835/#comment216485>

Does the concrete class need to be exposed? Interfaces are generally nicer 
to program against for extensibility and testing. Though in this case it looks 
like Window is ABC not an interface.



samza-operator/src/main/java/org/apache/samza/operators/api/internal/Operators.java
 (line 44)
<https://reviews.apache.org/r/47835/#comment216417>

You can use a private constructor to prevent subclassing / instantiation.



samza-operator/src/main/java/org/apache/samza/operators/api/internal/Operators.java
 (line 332)
<https://reviews.apache.org/r/47835/#comment216418>

Can you drop input here as it is actually unused? I found my way here due 
to seeing something else handing this the "this" pointer, which looked 
suspicious. Turns out it is not used - maybe for type parameter bounds 
checking? but is so, I'd be interested to see if we could find a more direct 
and obvious approach.

---

Also, as far as I can tell, this looks like it can be moved straight into 
MessageStream (perhaps as a private method). I didn't see anything else using 
it, but maybe I missed it.



samza-operator/src/main/java/org/apache/samza/operators/api/internal/Window.java
 (line 39)
<https://reviews.apache.org/r/47835/#comment216486>

IIUC given this interface Window is basically an entirely opaque object to 
the client code?



samza-operator/src/main/java/org/apache/samza/task/StreamOperatorTask.java 
(line 39)
<https://reviews.apache.org/r/47835/#comment216487>

Minor: abstract not necessary here. (Technically public isn't either :)).



samza-operator/src/test/java/org/apache/samza/operators/api/data/TestMessageStream.java
 (lines 42 - 61)
<https://reviews.apache.org/r/47835/#comment216488>

This doesn't appear to be testing anything (other than that types check).



samza-operator/src/test/java/org/apache/samza/task/AssembleCallGraphTask.java 
(line 141)
<https://reviews.apache.org/r/47835/#comment216489>

Minor: indentation is off here.


- Chris Pettitt


On Sept. 14, 2016, 8:53 a.m., Yi Pan (Data Infrastructure) wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/47835/
> ---
> 
> (Updated Sept. 14, 2016, 8:53 a.m.)
> 
> 
> Review request for samza, Boris Shkolnik, Chris Pettitt, Chinmay Soman, Jake 
> Maes, Navina Ramesh, Jagadish Venkatraman, and Xinyu Liu.
> 
> 
> Bugs: SAMZA-914
> https://issues.apache.org/jira/browse/SAMZA-914
> 
> 
> Repository: samza
> 
> 
> Description
> ---
> 
> SAMZA-914: initial draft of operator programming API. Design doc attached to 
> S

Re: Review Request 51346: SAMZA-974 - Support finite datasources in Samza that have a notion of End-Of-Stream

2016-09-01 Thread Chris Pettitt


> On Aug. 26, 2016, 7:51 p.m., Chris Pettitt wrote:
> > samza-api/src/main/java/org/apache/samza/system/IncomingMessageEnvelope.java,
> >  line 31
> > <https://reviews.apache.org/r/51346/diff/4/?file=1486375#file1486375line31>
> >
> > How likely are we to collide with this? Thats the problem with using a 
> > user definable token. I see two options:
> > 
> > If null is not supported (and thus not usable by user-defined 
> > implementations) I would use that and mark it as reserved.
> > 
> > Otherwise I would probably do something more to make this unlikely to 
> > collide (call me paranoid). Something like use a NUL byte as the first 
> > character and document that offsets with such an encoding are reserved. I 
> > would also check that this sort of string doesn't make it to user code in 
> > the task.
> 
> Jagadish Venkatraman wrote:
> Returning a null is not possible (because a null offset could mean that 
> we don't have messages at this moment instead of meaning end-of-stream. While 
> we should poll again when a consumer returns null, we should not for the 
> END_OF_STREAM case.) Hence, I was hoping to use a special offset.
> 
> I like your suggestion of using a NUL byte as the first character (and 
> calling that out). I'll update the RB with that.
> 
> Jagadish Venkatraman wrote:
> There seem to be inter-operability versions of strings in Java vs strings 
> in scala (esp - around handling NUL bytes in the string- Scala appears to 
> strip out NUL bytes). Hence, I've used a "SAMZA_INTERNAL_END_OF_STREAM" as a 
> string. Let me know if there's a better way to handle this.

Scala should definitely not be dropping any bytes. How could it safely do so?

FWIW, you can verify:

```
% scala
scala> "\u".length
res0: Int = 1
```


- Chris


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/51346/#review147015
---


On Aug. 30, 2016, 12:32 a.m., Jagadish Venkatraman wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/51346/
> ---
> 
> (Updated Aug. 30, 2016, 12:32 a.m.)
> 
> 
> Review request for samza, Boris Shkolnik, Chris Pettitt, Yi Pan (Data 
> Infrastructure), Navina Ramesh, and Xinyu Liu.
> 
> 
> Repository: samza
> 
> 
> Description
> ---
> 
> Samza currently works with unbounded data sources (kafka streams). However, 
> for bounded data sources like HDFS files, snapshot files which are not 
> infinite, we need a notion of 'end-of-stream'. 
> 
> This is a step towards realizing a 'finite' Samza job that terminates once 
> data processing is complete.(as opposed to an infinite stream job that keeps 
> running)  
> 
> RB changes:
> - New interface for EndOfStreamListener
> - New 'end-of-stream' state in the state-machine of AsyncStreamTask 
> (Invariant: When end-of-stream is reached there are no buffered messages, 
> no-callbacks are in-flight and no-window or commit call shall be in progress)
> - Changes to allow clean shut-downs of the tasks/container/job for 
> end-of-stream.
> 
> Design Doc and Implementation Notes: 
> https://issues.apache.org/jira/secure/attachment/12825119/ProposalforEndofStreaminSamza.pdf
> 
> 
> Diffs
> -
> 
>   build.gradle 1d4eb74b1294318db8454631ddd0901596121ab2 
>   
> samza-api/src/main/java/org/apache/samza/system/IncomingMessageEnvelope.java 
> cc860cf7eb4d514736913c1dceaa80534b61d71a 
>   
> samza-api/src/main/java/org/apache/samza/system/SystemStreamPartitionIterator.java
>  a8f858aa7e4f4ce436f450cf439fe1a102983c64 
>   samza-api/src/main/java/org/apache/samza/task/EndOfStreamListenerTask.java 
> PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java 
> a510bb0c5914c772438930d27f100b4d360c1296 
>   samza-core/src/main/java/org/apache/samza/task/AsyncStreamTaskAdapter.java 
> 1fc645673b7547b642830df5639c0b4fcd11c0d5 
>   samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala 
> 89f6857014489aba2db4129bc2e26dfec5b10652 
>   samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java 
> ca913dea79fecbcecdfd1010dc794318055c5764 
> 
> Diff: https://reviews.apache.org/r/51346/diff/
> 
> 
> Testing
> ---
> 
> Unit tests to test scenarios for inorder processing, out-of-order processing 
> and commit semantics.
> 
> 
> Thanks,
> 
> Jagadish Venkatraman
> 
>



Re: Review Request 50174: SAMZA-977: User doc for samza multithreading

2016-07-28 Thread Chris Pettitt

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/50174/#review143972
---


Ship it!




Looks like this covers my comments. However, the documentation reads like it is 
OK to do synchronous IO by just throwing more threads at the problem. This is 
not a good idea. The ability to do synchronous IO is a transition step towards 
async, not an end state.

- Chris Pettitt


On July 27, 2016, 11:05 p.m., Xinyu Liu wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/50174/
> ---
> 
> (Updated July 27, 2016, 11:05 p.m.)
> 
> 
> Review request for samza, Chris Pettitt, Navina Ramesh, and Yi Pan (Data 
> Infrastructure).
> 
> 
> Repository: samza
> 
> 
> Description
> ---
> 
> Update samza web docs with new multithreading api, core and configs.
> 
> 
> Diffs
> -
> 
>   docs/learn/documentation/versioned/api/overview.md 
> 6712344e84e19883b857e00549db2acb101c7e0e 
>   docs/learn/documentation/versioned/container/event-loop.md 
> 116238312df7071747cbbc14bc9c46f558755195 
>   docs/learn/documentation/versioned/jobs/configuration-table.html 
> 54c52981c3055b398ee60af50eeaf2592ed0e64f 
> 
> Diff: https://reviews.apache.org/r/50174/diff/
> 
> 
> Testing
> ---
> 
> Test the web pages locally.
> 
> 
> Thanks,
> 
> Xinyu Liu
> 
>



Re: Review Request 50174: SAMZA-977: User doc for samza multithreading

2016-07-20 Thread Chris Pettitt

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/50174/#review142985
---




docs/learn/documentation/versioned/api/overview.md (line 22)
<https://reviews.apache.org/r/50174/#comment208638>

Maybe provide an example of what "synchronous process" means. For example, 
a computation that does not involve remote calls.



docs/learn/documentation/versioned/container/event-loop.md (line 26)
<https://reviews.apache.org/r/50174/#comment208639>

When would you want to run a synchronous task in parallel? What are the 
rules (e.g. memory visibility) with such a configuration?



docs/learn/documentation/versioned/container/event-loop.md (line 28)
<https://reviews.apache.org/r/50174/#comment208640>

This doesn't quite sound right. Global state is a problem if there is > 1 
concurrency (whether running with multiple samza threads or not). For example, 
async tasks may or may not be safe depending on concurrency. We also can make 
stronger guarantees than what is implied by the paragraph (e.g. state from 
process is fully visible to window and commit).



docs/learn/documentation/versioned/container/event-loop.md (line 43)
<https://reviews.apache.org/r/50174/#comment208643>

s/in the a single thread/in a single thread/.

A few other minor grammatical errors, but this is not the easy way to share 
them. If they're not obvious I can send you back a slightly edited version of 
this paragraph.

---

Larger comment: as we'd ultimately want to move to a single implementation 
should we not allow process and window to run in parallel for the same task 
even with multiple threads?



docs/learn/documentation/versioned/jobs/configuration-table.html (line 636)
<https://reviews.apache.org/r/50174/#comment208644>

s/complete/completes/



docs/learn/documentation/versioned/jobs/configuration-table.html (line 638)
<https://reviews.apache.org/r/50174/#comment208645>

But also may result in out-of-order processing. Probably good to be 
explicit about this even if it should be obvious to most people.



docs/learn/documentation/versioned/jobs/configuration-table.html (line 647)
<https://reviews.apache.org/r/50174/#comment208646>

What happens if a timeout occurs?


- Chris Pettitt


On July 19, 2016, 1:05 a.m., Xinyu Liu wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/50174/
> ---
> 
> (Updated July 19, 2016, 1:05 a.m.)
> 
> 
> Review request for samza, Chris Pettitt, Navina Ramesh, and Yi Pan (Data 
> Infrastructure).
> 
> 
> Repository: samza
> 
> 
> Description
> ---
> 
> Update samza web docs with new multithreading api, core and configs.
> 
> 
> Diffs
> -
> 
>   docs/learn/documentation/versioned/api/overview.md 
> 6712344e84e19883b857e00549db2acb101c7e0e 
>   docs/learn/documentation/versioned/container/event-loop.md 
> 116238312df7071747cbbc14bc9c46f558755195 
>   docs/learn/documentation/versioned/jobs/configuration-table.html 
> 54c52981c3055b398ee60af50eeaf2592ed0e64f 
> 
> Diff: https://reviews.apache.org/r/50174/diff/
> 
> 
> Testing
> ---
> 
> Test the web pages locally.
> 
> 
> Thanks,
> 
> Xinyu Liu
> 
>



Re: Review Request 47835: SAMZA-914: Initial draft for Java programming APIs on operators supporting DAGs

2016-07-18 Thread Chris Pettitt

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/47835/#review142421
---



The description says this is the initial draft implementation, but the title 
says initial draft for the APIs. I take it the latter is more accurate?

Some initial high-level thoughts:

1. We should use Java 8 constructs where possible (if we've moved to Java 8). 
For example, Function. If not, we should probably have some Function equivalent 
and various type specifications for map, flatMap, etc.
2. The stream operators don't seem to **do** anything - they primarily appear 
to hold metadata about something to be done. Are they intended only to be 
declarative? Are they also not intended to know directly how to build 
themselves, but rather you would run some graph processor (with knowledge about 
each operator type) to build out a real processor? This is pretty fundamental 
to the design. If there is a doc that covers this let me know; otherwise it 
would be super helpful to be on the same page about the end goal.
3. If bullet 2 is correct, how would you navigate through the graph? It would 
seem that you would need some way to navigate `(source stream, operator)` 
tuples?
4. Related to bullet 3, why do the operators know about their output streams? 
Abstractly aren't they totally independent in the sense that I could apply the 
same operator to multiple input streams to produce a corresponding number of 
output streams?
5. It looks like you can compose graphs in two ways: directly using the 
operators or using the interfaces on DataStream. I would choose one or the 
other and use appropriate hiding mechanisms to expose just the API the user 
should be concerned with. If you go with the latter I would extract interfaces 
for the operators (if that is even necessary) into a public package and hide 
everything else in a package private namespace.
6. I suspect you want to limit the ability to create custom operators (at least 
if the assumption about how graph walking would work in bullet 2 holds), so 
StreamOperator's constructor probably needs to be package private.

I suspect some over arching docs or a bluejeans session would be very helpful 
in allowing me to dig deeper into this.


samza-operator/src/main/java/org/apache/samza/operators/api/FlatMapper.java 
(line 34)
<https://reviews.apache.org/r/47835/#comment207976>

Have we moved to Java 8 yet? If so, it would be preferable to use the Java 
8 constructs if possible. If not, I think this can be generalized a little more 
nicely as a function from T to a Collection of R instead of having a separate 
flat map function type.

This comment can be applied generally wherever we're providing "functions"


- Chris Pettitt


On July 13, 2016, 8:54 a.m., Yi Pan (Data Infrastructure) wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/47835/
> ---
> 
> (Updated July 13, 2016, 8:54 a.m.)
> 
> 
> Review request for samza, Boris Shkolnik, Chris Pettitt, Chinmay Soman, Jake 
> Maes, Navina Ramesh, Jagadish Venkatraman, and Xinyu Liu.
> 
> 
> Bugs: SAMZA-914
> https://issues.apache.org/jira/browse/SAMZA-914
> 
> 
> Repository: samza
> 
> 
> Description
> ---
> 
> SAMZA-914: initial draft implementation of operator programming API.
> 
> 
> Diffs
> -
> 
>   build.gradle 16facbbf4dff378c561461786ff186bd9eed 
>   gradle/dependency-versions.gradle 52e25aa53a1edc85d478b48898621b26508ad4bb 
>   samza-operator/src/main/java/org/apache/samza/operators/api/FlatMapper.java 
> PRE-CREATION 
>   samza-operator/src/main/java/org/apache/samza/operators/api/Mapper.java 
> PRE-CREATION 
>   samza-operator/src/main/java/org/apache/samza/operators/api/Scanner.java 
> PRE-CREATION 
>   samza-operator/src/main/java/org/apache/samza/operators/api/Sink.java 
> PRE-CREATION 
>   
> samza-operator/src/main/java/org/apache/samza/operators/api/data/DataStream.java
>  PRE-CREATION 
>   
> samza-operator/src/main/java/org/apache/samza/operators/api/join/Joiner.java 
> PRE-CREATION 
>   
> samza-operator/src/main/java/org/apache/samza/operators/api/window/SessionWindow.java
>  PRE-CREATION 
>   
> samza-operator/src/main/java/org/apache/samza/operators/api/window/Window.java
>  PRE-CREATION 
>   samza-operator/src/main/java/org/apache/samza/operators/impl/Pipeline.java 
> PRE-CREATION 
>   
> samza-operator/src/main/java/org/apache/samza/operators/impl/SimpleOperatorImpl.java
>  PRE-CREATION 
>   samza-operator/src/main/java/org/apache/samza/task/DataStreamTask.java 
> PRE-CREATION 
&g

Re: Review Request 50082: SAMZA-973: Disk Quotas: clamp max delay, better measure processing time

2016-07-18 Thread Chris Pettitt

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/50082/
---

(Updated July 18, 2016, 3:29 p.m.)


Review request for samza.


Repository: samza


Description
---

The delay time for the disk quotas feature is currently unbounded which
can cause delays to become excessive e.g. in the case of a badly time GC
pause. This change provides the ability to set a maximum delay, which
defaults to 1 second.

This patch also improves the measurement of processing time. The process
method in RunLoop potentially blocks waiting to receive a new message.
This time should obviously not be included in the delay calculations.
Instead we split out message polling and process and apply delay
calculations only to the processing time.


Diffs (updated)
-

  samza-core/src/main/java/org/apache/samza/util/ThrottlingExecutor.java 
214cefd4e8698fada6fc1bb14ab79be6afb27b9d 
  samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala 
cf05c15c836ddfa54ba8fe27abc18ed88ac5fc11 
  samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 
18c09224bbae959342daf9b2b7a7d971cc224f48 
  samza-core/src/test/java/org/apache/samza/util/TestThrottlingExecutor.java 
26590507b9c72a8c64171aeb1e5b7c3d5c24c41a 

Diff: https://reviews.apache.org/r/50082/diff/


Testing
---

gradle test


Thanks,

Chris Pettitt



Re: Review Request 50082: SAMZA-973: Disk Quotas: clamp max delay, better measure processing time

2016-07-18 Thread Chris Pettitt


> On July 15, 2016, 10:52 p.m., Xinyu Liu wrote:
> > samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala, line 80
> > <https://reviews.apache.org/r/50082/diff/1/?file=1445036#file1445036line80>
> >
> > Shall we move this line in the process() function?

I'll move it into process. BTW, the current location preserves where it was 
relative to other code prior to this change. Moving it slightly changes the 
behavior, but appears to be closer to the original intention.


- Chris


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/50082/#review142459
---


On July 15, 2016, 7:59 p.m., Chris Pettitt wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/50082/
> ---
> 
> (Updated July 15, 2016, 7:59 p.m.)
> 
> 
> Review request for samza.
> 
> 
> Repository: samza
> 
> 
> Description
> ---
> 
> The delay time for the disk quotas feature is currently unbounded which
> can cause delays to become excessive e.g. in the case of a badly time GC
> pause. This change provides the ability to set a maximum delay, which
> defaults to 1 second.
> 
> This patch also improves the measurement of processing time. The process
> method in RunLoop potentially blocks waiting to receive a new message.
> This time should obviously not be included in the delay calculations.
> Instead we split out message polling and process and apply delay
> calculations only to the processing time.
> 
> 
> Diffs
> -
> 
>   samza-core/src/main/java/org/apache/samza/util/ThrottlingExecutor.java 
> 214cefd4e8698fada6fc1bb14ab79be6afb27b9d 
>   samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala 
> cf05c15c836ddfa54ba8fe27abc18ed88ac5fc11 
>   samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 
> 18c09224bbae959342daf9b2b7a7d971cc224f48 
>   samza-core/src/test/java/org/apache/samza/util/TestThrottlingExecutor.java 
> 26590507b9c72a8c64171aeb1e5b7c3d5c24c41a 
> 
> Diff: https://reviews.apache.org/r/50082/diff/
> 
> 
> Testing
> ---
> 
> gradle test
> 
> 
> Thanks,
> 
> Chris Pettitt
> 
>



Re: Review Request 50082: SAMZA-973: Disk Quotas: clamp max delay, better measure processing time

2016-07-15 Thread Chris Pettitt

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/50082/
---

(Updated July 15, 2016, 7:59 p.m.)


Review request for samza.


Repository: samza


Description
---

The delay time for the disk quotas feature is currently unbounded which
can cause delays to become excessive e.g. in the case of a badly time GC
pause. This change provides the ability to set a maximum delay, which
defaults to 1 second.

This patch also improves the measurement of processing time. The process
method in RunLoop potentially blocks waiting to receive a new message.
This time should obviously not be included in the delay calculations.
Instead we split out message polling and process and apply delay
calculations only to the processing time.


Diffs
-

  samza-core/src/main/java/org/apache/samza/util/ThrottlingExecutor.java 
214cefd4e8698fada6fc1bb14ab79be6afb27b9d 
  samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala 
cf05c15c836ddfa54ba8fe27abc18ed88ac5fc11 
  samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 
18c09224bbae959342daf9b2b7a7d971cc224f48 
  samza-core/src/test/java/org/apache/samza/util/TestThrottlingExecutor.java 
26590507b9c72a8c64171aeb1e5b7c3d5c24c41a 

Diff: https://reviews.apache.org/r/50082/diff/


Testing (updated)
---

gradle test


Thanks,

Chris Pettitt



Review Request 50082: SAMZA-973: Disk Quotas: clamp max delay, better measure processing time

2016-07-15 Thread Chris Pettitt

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/50082/
---

Review request for samza.


Repository: samza


Description
---

The delay time for the disk quotas feature is currently unbounded which
can cause delays to become excessive e.g. in the case of a badly time GC
pause. This change provides the ability to set a maximum delay, which
defaults to 1 second.

This patch also improves the measurement of processing time. The process
method in RunLoop potentially blocks waiting to receive a new message.
This time should obviously not be included in the delay calculations.
Instead we split out message polling and process and apply delay
calculations only to the processing time.


Diffs
-

  samza-core/src/main/java/org/apache/samza/util/ThrottlingExecutor.java 
214cefd4e8698fada6fc1bb14ab79be6afb27b9d 
  samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala 
cf05c15c836ddfa54ba8fe27abc18ed88ac5fc11 
  samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 
18c09224bbae959342daf9b2b7a7d971cc224f48 
  samza-core/src/test/java/org/apache/samza/util/TestThrottlingExecutor.java 
26590507b9c72a8c64171aeb1e5b7c3d5c24c41a 

Diff: https://reviews.apache.org/r/50082/diff/


Testing
---


Thanks,

Chris Pettitt



Re: Review Request 48213: SAMZA-960: Make system producer thread safe

2016-07-15 Thread Chris Pettitt

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/48213/#review142418
---


Ship it!




- Chris Pettitt


On July 14, 2016, 12:43 a.m., Xinyu Liu wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/48213/
> ---
> 
> (Updated July 14, 2016, 12:43 a.m.)
> 
> 
> Review request for samza, Chris Pettitt, Navina Ramesh, and Yi Pan (Data 
> Infrastructure).
> 
> 
> Repository: samza
> 
> 
> Description
> ---
> 
> All the system producers need to be thread safe in order to be used in 
> multithreaded tasks. The following are the changes 
> (ElasticSearchSystemProducer is already thread safe so no change made there):
> 
> In KafkaSystemProducer, remove the buggy retry logic and treat any exception 
> as fatal.
> In HdfsSystemProducer, add synchronization lock to all public methods.
> 
> 
> Diffs
> -
> 
>   samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala 
> e9a51083aff4dc316e94144f6242fe702ca73a68 
>   
> samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsSystemProducer.scala
>  1f4b5c46436e44b7c7cd1a49689c4f43f1f6ed1b 
>   
> samza-kafka/src/main/scala/org/apache/samza/migration/KafkaCheckpointMigration.scala
>  5e8cc65260b6961350c64ddc13b9807dca9099c5 
>   
> samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala
>  3769e103616dc0f1fd869706cc086e24cd926c48 
>   
> samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemProducerJava.java
>  04c9113fd6c3dd56c49ff46c8c1c0ff12f68e5e2 
>   
> samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemProducer.scala
>  8e32bba6ced090f0fc8d4e5176fe0788df36981d 
> 
> Diff: https://reviews.apache.org/r/48213/diff/
> 
> 
> Testing
> ---
> 
> Unit tests and local testing.
> 
> 
> Thanks,
> 
> Xinyu Liu
> 
>



Re: Review Request 49877: SAMZA-972: Holistic memory monitoring for SamzaContainer

2016-07-15 Thread Chris Pettitt

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/49877/#review142417
---


Ship it!




I would add at least a test to verify you can get a valid RSS size for the 
current process.


samza-core/src/main/java/org/apache/samza/container/disk/PollingScanDiskSpaceMonitor.java
 (lines 192 - 197)
<https://reviews.apache.org/r/49877/#comment207971>

Thanks Jagadish!



samza-core/src/main/java/org/apache/samza/container/host/PosixCommandBasedStatisticsGetter.java
 (lines 47 - 55)
<https://reviews.apache.org/r/49877/#comment207972>

Not required, but you could simplify this with try-with-resources: 
https://docs.oracle.com/javase/tutorial/essential/exceptions/tryResourceClose.html


- Chris Pettitt


On July 14, 2016, 2:37 a.m., Jagadish Venkatraman wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/49877/
> ---
> 
> (Updated July 14, 2016, 2:37 a.m.)
> 
> 
> Review request for samza, Boris Shkolnik, Chris Pettitt, Fred Ji, Jake Maes, 
> Yi Pan (Data Infrastructure), and Navina Ramesh.
> 
> 
> Repository: samza
> 
> 
> Description
> ---
> 
> This feature introduces physical memory monitoring in SamzaContainer.
> 
> Context:
> Often memory used by the SamzaContainer process includes 
> A. JVM Heap memory: This is where all JVM variables live.
> B. Native memory: This memory lives out of the JVM heap and is not visible to 
> the JVM. Examples include used by RocksDb, native libraries that user code 
> depends on etc.
> 
> User jobs could be killed by Yarn if their total memory (A+B) exceeds the 
> configured maximum of yarn.container.memory.mb.
> 
> Currently, while our existing metrics provide visibility into [A] via JMX, we 
> don't have visibility into [B]. (as it's totally external to the JVM). 
> 
> This feature uses Linux ProcFS to provide a complete view of the memory (both 
> A & B) to help Samza users understand memory better. (Schedulers like Apache 
> Yarn that require a holistic view of memory (A+B) also use ProcFS. For the 
> curious, here's the Yarn implementation - 
> http://grepcode.com/file/repo1.maven.org/maven2/org.apache.hadoop/hadoop-yarn-common/0.23.1/org/apache/hadoop/yarn/util/ProcfsBasedProcessTree.java
>  that inspired this idea)
> 
> Scope: The scope of this RB is only to Linux distributions. (Mac based 
> implementation may be a separate change list.)
> 
> 
> Diffs
> -
> 
>   build.gradle ba4a9d14fe24e1ff170873920cd5eeef656955af 
>   checkstyle/import-control.xml 325c38131047836dc8aedaea4187598ef3ba7666 
>   
> samza-core/src/main/java/org/apache/samza/container/disk/PollingScanDiskSpaceMonitor.java
>  50c85007123dd568ef90cf028af33a93a4470cb6 
>   
> samza-core/src/main/java/org/apache/samza/container/host/PosixCommandBasedStatisticsGetter.java
>  PRE-CREATION 
>   
> samza-core/src/main/java/org/apache/samza/container/host/ProcfsBasedStatisticsGetter.java
>  PRE-CREATION 
>   
> samza-core/src/main/java/org/apache/samza/container/host/StatisticsMonitorImpl.java
>  PRE-CREATION 
>   
> samza-core/src/main/java/org/apache/samza/container/host/SystemStatistics.java
>  PRE-CREATION 
>   
> samza-core/src/main/java/org/apache/samza/container/host/SystemStatisticsGetter.java
>  PRE-CREATION 
>   
> samza-core/src/main/java/org/apache/samza/container/host/SystemStatisticsMonitor.java
>  PRE-CREATION 
>   samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 
> 18c09224bbae959342daf9b2b7a7d971cc224f48 
>   
> samza-core/src/main/scala/org/apache/samza/container/SamzaContainerMetrics.scala
>  2044ce01ffded8434e762d99355d5df43642c66b 
>   
> samza-core/src/test/java/org/apache/samza/container/host/TestStatisticsMonitorImpl.java
>  PRE-CREATION 
> 
> Diff: https://reviews.apache.org/r/49877/diff/
> 
> 
> Testing
> ---
> 
> 1. Unit tests with mock PROC-FS snapshots of processes
> 2. Deployed actual jobs on my dev box. 
>2.1 Obtained the operating system's view of the container memory using 
> 'ps' and other tools.
>2.2 Verified that the total memory reported by the monitor is the same as 
> the OS's view of memory[2.1]
> 3. Tested on various Linux distributions I could find internally:
> - RHEL release 6.4, 6.5, 6.6 (Santiago)
> 
> 
> Thanks,
> 
> Jagadish Venkatraman
> 
>



Re: Review Request 48356: RFC: Samza as a library

2016-07-12 Thread Chris Pettitt


> On July 12, 2016, 6:43 p.m., Chris Pettitt wrote:
> > samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java, 
> > lines 174-176
> > <https://reviews.apache.org/r/48356/diff/8/?file=1441900#file1441900line174>
> >
> > Don't we need to synchronize these? Are we  guaranteed that 
> > JobModelUpdateHandler cannot be invoked concurrently? If so, let's document 
> > this.
> 
> Navina Ramesh wrote:
> Yeah. I thought about this. I know that JobModelUpdateHandler is not 
> going to be invoked concurrently. However, right now, it is not even called 
> :) I am open to fully removing this interface until the ZK based design is 
> more concrete.

Either approach - documenting or removing - works.


- Chris


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/48356/#review141942
---


On July 12, 2016, 6:45 p.m., Navina Ramesh wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/48356/
> -------
> 
> (Updated July 12, 2016, 6:45 p.m.)
> 
> 
> Review request for samza, Chris Pettitt and Yi Pan (Data Infrastructure).
> 
> 
> Repository: samza
> 
> 
> Description
> ---
> 
> Added ConfigBuilder and support classes
> 
> Added JobCoordinator interfaces
> 
> 
> Adding StreamProcessor, StandaloneJobCoordinator and updating SamzaContainer 
> interface
> 
> 
> Added TestStreamProcessor and some unit tests for ConfigBuilders
> 
> 
> Changing who defined processorId
> 
> 
> Fixed checkstyle errors
> 
> 
> Replaced SamzaException with ConfigException
> 
> 
> Removing localityManager instantiation from Samza Container
> 
> 
> Diffs
> -
> 
>   build.gradle ba4a9d14fe24e1ff170873920cd5eeef656955af 
>   checkstyle/import-control.xml 325c38131047836dc8aedaea4187598ef3ba7666 
>   samza-core/src/main/java/org/apache/samza/config/TaskConfigJava.java 
> 021d42a70179f5d14f51ac87cb09dcc97218095e 
>   
> samza-core/src/main/java/org/apache/samza/configbuilder/CheckpointConfig.java 
> PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/configbuilder/ConfigBuilder.java 
> PRE-CREATION 
>   
> samza-core/src/main/java/org/apache/samza/configbuilder/GenericConfigBuilder.java
>  PRE-CREATION 
>   
> samza-core/src/main/java/org/apache/samza/configbuilder/KafkaCheckpointConfig.java
>  PRE-CREATION 
>   
> samza-core/src/main/java/org/apache/samza/configbuilder/KafkaSystemConfig.java
>  PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/configbuilder/SerdeConfig.java 
> PRE-CREATION 
>   
> samza-core/src/main/java/org/apache/samza/configbuilder/StandaloneConfigBuilder.java
>  PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/configbuilder/SystemConfig.java 
> PRE-CREATION 
>   
> samza-core/src/main/java/org/apache/samza/container/grouper/stream/AllSspToSingleTaskGrouper.java
>  PRE-CREATION 
>   
> samza-core/src/main/java/org/apache/samza/container/grouper/stream/AllSspToSingleTaskGrouperFactory.java
>  PRE-CREATION 
>   
> samza-core/src/main/java/org/apache/samza/container/grouper/task/SingleContainerGrouper.java
>  PRE-CREATION 
>   
> samza-core/src/main/java/org/apache/samza/container/grouper/task/SingleContainerGrouperFactory.java
>  PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinator.java 
> PRE-CREATION 
>   
> samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinatorFactory.java
>  PRE-CREATION 
>   
> samza-core/src/main/java/org/apache/samza/coordinator/JobModelUpdateHandler.java
>  PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java 
> PRE-CREATION 
>   
> samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinator.java
>  PRE-CREATION 
>   
> samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinatorFactory.java
>  PRE-CREATION 
>   samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala 
> 49b08f6b68dbb44757dcc8ce8d60c365a9d22981 
>   samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala 
> 08a4debb06f9925ae741049abb2ee0df97b2243b 
>   samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala 
> cf05c15c836ddfa54ba8fe27abc18ed88ac5fc11 
>   samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 
> 18c09224bbae959342daf9b2b7a7d971cc224f48 
>   samza-core/src/main/scala/org/apache/samza/coordinator/Job

Re: Review Request 48356: RFC: Samza as a library

2016-07-12 Thread Chris Pettitt


> On June 27, 2016, 6:53 p.m., Chris Pettitt wrote:
> > A few more thoughts below.
> > 
> > Still not a fan of the direction we're going with the config. I know it is 
> > status quo, but it further locks us into a limited model. One other benefit 
> > of the Offspring way of doing config that occurred to me while reading this 
> > is that with Offspring you get all config violations in one shot versus 
> > once per run (e.g. Samza fails fast on first config problem). The latter is 
> > how LiSpring worked and we intentionally addressed that as a part of 
> > Offspring.

I'll follow up with another pass on the latest RB (I think I saw an issue). 
However, we're on the same page on all of your follow up comments and edits 
(see below for more detail).


> On June 27, 2016, 6:53 p.m., Chris Pettitt wrote:
> > samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java, 
> > lines 125-126
> > <https://reviews.apache.org/r/48356/diff/3/?file=1428181#file1428181line125>
> >
> > Don't we need to stop the container directly here? shutdown will stop 
> > the executor from accepting any new work, but will not stop running work. 
> > In any case, wouldn't a clean shutdown here be better (e.g. for flushing 
> > state) then trying to force shutdown via the executor?
> 
> Navina Ramesh wrote:
> This stop() method allows the user to directly stop the processor. So, it 
> should not only stop the container, but also not accept more requests on the 
> same executor instance. A stopped stream processor cannot be restarted unless 
> the user creates another instance of the processor. 
> 
> Shutting the executor service should trigger the shutdown hook. The 
> shutdown hook invokes the shutdown actions (flushing state, checkpoint etc) 
> and guarantees a clean shutdown. Is there a better way of triggering the 
> shutdown actions ? 
> 
> The alternative would be to not trigger the shutdown hook and directly 
> call all steps for shutting down the container. Right now, stopping the 
> container only "stops" the runloop from further submitting tasks. It doesn't 
> clean up anything.
> 
> Navina Ramesh wrote:
> Right now, stopping the container only "stops" the runloop from further 
> submitting tasks. It doesn't clean up anything.
> > I take back what I said! Exiting the run loop automatically triggers 
> the shutdown sequence.
> 
> Chris Pettitt wrote:
> W.r.t. to how to shutdown, I would shutdown the container and join on it. 
> I would do this before the jobCoordinator as it (jobCoordinator) must already 
> be tolerant of container stops.
> 
> Navina Ramesh wrote:
> I am waiting on the container to shutdown fully before stopping the 
> jobCoordinator and executor. I hope this is what you meant.
> 
> Thinking a bit more on this, I feel that there is no strong need for the 
> user to provide an ExecutorService. It doesn't seem to add a lot of value 
> when the user cannot control the lifecycle of the executor itself. The same 
> executor may be used to manage JobCoordinator thread in the future as well. 
> These are internal to Samza and shouldn't require any user intervention. Do 
> you still think there is value in keeping the executor Service as an argument 
> to the StreamProcessor constructor?

Shutdown looks good.

I'm open to not exposing the executor until we have a use case for it. It's 
easier to add it later than to remove it. In most of the infra I've done - e.g. 
ParSeq, Rest.li - there has been a reason to expose the executor, but it may be 
that we don't have the same needs here.


> On June 27, 2016, 6:53 p.m., Chris Pettitt wrote:
> > samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java, 
> > line 145
> > <https://reviews.apache.org/r/48356/diff/3/?file=1428181#file1428181line145>
> >
> > Do we need to ensure the previous container is stopped before starting 
> > the new container? For example, would it be possible for the new container 
> > and the old container to stomp on eachother's local state if they're 
> > running at the same time? container.stop appears to be asynchronous and 
> > doesn't appear to give you any guarantee about when the container is 
> > actually stopped.
> > 
> > ---
> > 
> > Is the JobModelUpdateHandler called from the same thread that 
> > StreamProcessor.start is? If not (and given this is a callback its not a 
> > good assumption) you should make container volatile.
> 
> Navina Ramesh wrote:
> In the current world, restarting a container with a job model should not 
> stomp another's local state a

Re: Review Request 48356: RFC: Samza as a library

2016-07-12 Thread Chris Pettitt

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/48356/#review141942
---


Fix it, then Ship it!




Noticed one issue while responding to your previous round of comments.


samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java (lines 
174 - 176)
<https://reviews.apache.org/r/48356/#comment207386>

Don't we need to synchronize these? Are we  guaranteed that 
JobModelUpdateHandler cannot be invoked concurrently? If so, let's document 
this.


- Chris Pettitt


On July 12, 2016, midnight, Navina Ramesh wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/48356/
> ---
> 
> (Updated July 12, 2016, midnight)
> 
> 
> Review request for samza, Chris Pettitt and Yi Pan (Data Infrastructure).
> 
> 
> Repository: samza
> 
> 
> Description
> ---
> 
> Added ConfigBuilder and support classes
> 
> Added JobCoordinator interfaces
> 
> 
> Adding StreamProcessor, StandaloneJobCoordinator and updating SamzaContainer 
> interface
> 
> 
> Added TestStreamProcessor and some unit tests for ConfigBuilders
> 
> 
> Changing who defined processorId
> 
> 
> Fixed checkstyle errors
> 
> 
> Replaced SamzaException with ConfigException
> 
> 
> Removing localityManager instantiation from Samza Container
> 
> 
> Diffs
> -
> 
>   build.gradle ba4a9d14fe24e1ff170873920cd5eeef656955af 
>   checkstyle/import-control.xml 325c38131047836dc8aedaea4187598ef3ba7666 
>   samza-core/src/main/java/org/apache/samza/config/TaskConfigJava.java 
> 021d42a70179f5d14f51ac87cb09dcc97218095e 
>   
> samza-core/src/main/java/org/apache/samza/configbuilder/CheckpointConfig.java 
> PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/configbuilder/ConfigBuilder.java 
> PRE-CREATION 
>   
> samza-core/src/main/java/org/apache/samza/configbuilder/GenericConfigBuilder.java
>  PRE-CREATION 
>   
> samza-core/src/main/java/org/apache/samza/configbuilder/KafkaCheckpointConfig.java
>  PRE-CREATION 
>   
> samza-core/src/main/java/org/apache/samza/configbuilder/KafkaSystemConfig.java
>  PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/configbuilder/SerdeConfig.java 
> PRE-CREATION 
>   
> samza-core/src/main/java/org/apache/samza/configbuilder/StandaloneConfigBuilder.java
>  PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/configbuilder/SystemConfig.java 
> PRE-CREATION 
>   
> samza-core/src/main/java/org/apache/samza/container/grouper/stream/AllSspToSingleTaskGrouper.java
>  PRE-CREATION 
>   
> samza-core/src/main/java/org/apache/samza/container/grouper/stream/AllSspToSingleTaskGrouperFactory.java
>  PRE-CREATION 
>   
> samza-core/src/main/java/org/apache/samza/container/grouper/task/SingleContainerGrouper.java
>  PRE-CREATION 
>   
> samza-core/src/main/java/org/apache/samza/container/grouper/task/SingleContainerGrouperFactory.java
>  PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinator.java 
> PRE-CREATION 
>   
> samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinatorFactory.java
>  PRE-CREATION 
>   
> samza-core/src/main/java/org/apache/samza/coordinator/JobModelUpdateHandler.java
>  PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java 
> PRE-CREATION 
>   
> samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinator.java
>  PRE-CREATION 
>   
> samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinatorFactory.java
>  PRE-CREATION 
>   samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala 
> 49b08f6b68dbb44757dcc8ce8d60c365a9d22981 
>   samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala 
> 08a4debb06f9925ae741049abb2ee0df97b2243b 
>   samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala 
> cf05c15c836ddfa54ba8fe27abc18ed88ac5fc11 
>   samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 
> 18c09224bbae959342daf9b2b7a7d971cc224f48 
>   samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala 
> d3bd9b7c11afd44ccfb681b660fefffafd216c29 
>   samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala 
> 56881d46be9f85adabbbda20433b208e012e 
>   
> samza-core/src/test/java/org/apache/samza/configbuilder/TestStandaloneConfigBuilder.java
>  PRE-CREATION 
>   samza-test/src/test/java/org/a

Re: Review Request 49877: SAMZA-972: Holistic memory monitoring for SamzaContainer

2016-07-12 Thread Chris Pettitt


> On July 11, 2016, 6:47 p.m., Chris Pettitt wrote:
> > Very high level question: I assume you looked at `ps -o rss` and 
> > disqualified it for some reason. Could you elaborate as to why? `ps` itself 
> > is certainly more portable than procfs (though `-o rss` is not part of the 
> > POSIX standard) - it works on RHEL and OSX. It also gives you the actual 
> > memory usage vs the number of pages.
> 
> Jagadish Venkatraman wrote:
> Thanks for the suggestion and the input! I could not find a portable 
> platform-independent way to get the PID of the current JVM process (without 
> going through JNI/ some other flaky solutions). Commands like `ps` require 
> the PID of the process. Using the procfs has a nice property that we can 
> directly parse `/proc/self/stat` since `self` automatically refers to the 
> current process. Also, if we decide to capture other metrics specific to the 
> host (for example: page faults, network statistics any other information that 
> ProcFs exposes), this maybe be easier to extend and build on top.

You could do something like this, which would get you close to POSIX 
compatibility (again RSS not being part of the POSIX standard):

```
Runtime.getRuntime().exec(new String[] {"sh", "-c", "ps -o rss -p $PPID"});
```

This at least gets you better compatibility with OSX (not that you'd run real 
work on OSX :P) and probably with other *nix operating systems that don't 
auto-mount procfs. It also avoids the page size assumption which may not be 
correct.

---

Review for current RB coming shortly.


- Chris


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/49877/#review141745
---


On July 12, 2016, 12:17 a.m., Jagadish Venkatraman wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/49877/
> -----------
> 
> (Updated July 12, 2016, 12:17 a.m.)
> 
> 
> Review request for samza, Boris Shkolnik, Chris Pettitt, Fred Ji, Jake Maes, 
> Yi Pan (Data Infrastructure), and Navina Ramesh.
> 
> 
> Repository: samza
> 
> 
> Description
> ---
> 
> This feature introduces physical memory monitoring in SamzaContainer.
> 
> Context:
> Often memory used by the SamzaContainer process includes 
> A. JVM Heap memory: This is where all JVM variables live.
> B. Native memory: This memory lives out of the JVM heap and is not visible to 
> the JVM. Examples include used by RocksDb, native libraries that user code 
> depends on etc.
> 
> User jobs could be killed by Yarn if their total memory (A+B) exceeds the 
> configured maximum of yarn.container.memory.mb.
> 
> Currently, while our existing metrics provide visibility into [A] via JMX, we 
> don't have visibility into [B]. (as it's totally external to the JVM). 
> 
> This feature uses Linux ProcFS to provide a complete view of the memory (both 
> A & B) to help Samza users understand memory better. (Schedulers like Apache 
> Yarn that require a holistic view of memory (A+B) also use ProcFS. For the 
> curious, here's the Yarn implementation - 
> http://grepcode.com/file/repo1.maven.org/maven2/org.apache.hadoop/hadoop-yarn-common/0.23.1/org/apache/hadoop/yarn/util/ProcfsBasedProcessTree.java
>  that inspired this idea)
> 
> Scope: The scope of this RB is only to Linux distributions. (Mac based 
> implementation may be a separate change list.)
> 
> 
> Diffs
> -
> 
>   build.gradle ba4a9d14fe24e1ff170873920cd5eeef656955af 
>   checkstyle/import-control.xml 325c38131047836dc8aedaea4187598ef3ba7666 
>   
> samza-core/src/main/java/org/apache/samza/container/host/ProcfsBasedStatisticsMonitor.java
>  PRE-CREATION 
>   
> samza-core/src/main/java/org/apache/samza/container/host/SystemStatistics.java
>  PRE-CREATION 
>   
> samza-core/src/main/java/org/apache/samza/container/host/SystemStatisticsMonitor.java
>  PRE-CREATION 
>   samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 
> 18c09224bbae959342daf9b2b7a7d971cc224f48 
>   
> samza-core/src/main/scala/org/apache/samza/container/SamzaContainerMetrics.scala
>  2044ce01ffded8434e762d99355d5df43642c66b 
>   
> samza-core/src/test/java/org/apache/samza/container/host/TestProcfsBasedStatisticsMonitor.java
>  PRE-CREATION 
> 
> Diff: https://reviews.apache.org/r/49877/diff/
> 
> 
> Testing
> ---
> 
> 1. Unit tests with mock PROC-FS snapshots of processes
> 2. Deployed actual jobs on my dev box. 
>2.1 Obtained the operating system's view of the container memory using 
> 'ps' and other tools.
>2.2 Verified that the total memory reported by the monitor is the same as 
> the OS's view of memory[2.1]
> 3. Tested on various Linux distributions I could find internally:
> - RHEL release 6.4, 6.5, 6.6 (Santiago)
> 
> 
> Thanks,
> 
> Jagadish Venkatraman
> 
>



Re: Review Request 49877: SAMZA-972: Holistic memory monitoring for SamzaContainer

2016-07-12 Thread Chris Pettitt

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/49877/#review141915
---



Biggest concern with this patch is that it seems to bake in a dependency on 
Linux. Is Samza only supported on Linux?

Other than that, some minor stuff to be fixed, but no major issues.


samza-core/src/main/java/org/apache/samza/container/host/ProcfsBasedStatisticsMonitor.java
 (line 66)
<https://reviews.apache.org/r/49877/#comment207330>

Generally it is nice to have static and instance members separate instead 
of interleaved to improve readability. Common practice is to have statics above 
instance members.



samza-core/src/main/java/org/apache/samza/container/host/ProcfsBasedStatisticsMonitor.java
 (line 67)
<https://reviews.apache.org/r/49877/#comment207333>

How stable is this across versions of Linux? It appears to be totally 
non-standard across platforms. For example, FreeBSD doesn't appear to have a 
stat file. It has "status" but the format is different: 
https://www.freebsd.org/cgi/man.cgi?query=procfs=0=5=FreeBSD+11.0-stable=default=html



samza-core/src/main/java/org/apache/samza/container/host/ProcfsBasedStatisticsMonitor.java
 (line 72)
<https://reviews.apache.org/r/49877/#comment207342>

It looks like you can drop the first two groups - I don't see them used 
anywhere.



samza-core/src/main/java/org/apache/samza/container/host/ProcfsBasedStatisticsMonitor.java
 (line 90)
<https://reviews.apache.org/r/49877/#comment207334>

Looks like this can be private.



samza-core/src/main/java/org/apache/samza/container/host/ProcfsBasedStatisticsMonitor.java
 (lines 93 - 94)
<https://reviews.apache.org/r/49877/#comment207335>

It's the default, but it can be changed. If the page size is off it could 
result in undercounting.



samza-core/src/main/java/org/apache/samza/container/host/ProcfsBasedStatisticsMonitor.java
 (lines 140 - 141)
<https://reviews.apache.org/r/49877/#comment207338>

Prefer blocks '{' ... '}' for better safety as the code is changed.



samza-core/src/main/java/org/apache/samza/container/host/ProcfsBasedStatisticsMonitor.java
 (lines 150 - 151)
<https://reviews.apache.org/r/49877/#comment207339>

Use blocks. Also, should we log a warning?



samza-core/src/main/java/org/apache/samza/container/host/ProcfsBasedStatisticsMonitor.java
 (line 178)
<https://reviews.apache.org/r/49877/#comment207340>

You might be able to log an error with a little more context here. Also, 
not sure if log4j is going to emit the stack trace for the error if you're 
asking it to stringify the first arg.



samza-core/src/main/java/org/apache/samza/container/host/ProcfsBasedStatisticsMonitor.java
 (line 181)
<https://reviews.apache.org/r/49877/#comment207341>

We should probably be catching exceptions here to prevent one listener from 
blowing up others. I believe the code you copied this from had the same flaw 
(shame on the original author :P).



samza-core/src/main/java/org/apache/samza/container/host/ProcfsBasedStatisticsMonitor.java
 (line 249)
<https://reviews.apache.org/r/49877/#comment207343>

In the code you copied this from I prefix the thread name with "Samza-". 
The reason to do that is it becomes much easier when looking at a thread dump 
to determine if the thread is from Samza or thirdy party code. I'd recommend 
indicating this is a Samza thread somehow.


- Chris Pettitt


On July 12, 2016, 12:17 a.m., Jagadish Venkatraman wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/49877/
> ---
> 
> (Updated July 12, 2016, 12:17 a.m.)
> 
> 
> Review request for samza, Boris Shkolnik, Chris Pettitt, Fred Ji, Jake Maes, 
> Yi Pan (Data Infrastructure), and Navina Ramesh.
> 
> 
> Repository: samza
> 
> 
> Description
> ---
> 
> This feature introduces physical memory monitoring in SamzaContainer.
> 
> Context:
> Often memory used by the SamzaContainer process includes 
> A. JVM Heap memory: This is where all JVM variables live.
> B. Native memory: This memory lives out of the JVM heap and is not visible to 
> the JVM. Examples include used by RocksDb, native libraries that user code 
> depends on etc.
> 
> User jobs could be killed by Yarn if their total memory (A+B) exceeds the 
> configured maximum of yarn.container.memory.mb.
> 
> Currently, while our existing metrics provide visibility into [A] via JMX, we 
> don't have visibility into [B]. (as it's totally external to the JVM). 
> 
> This feature uses Linux ProcFS to provide a complete view of the memory (both 

Re: Review Request 49877: SAMZA-972: Holistic memory monitoring for SamzaContainer

2016-07-11 Thread Chris Pettitt

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/49877/#review141745
---



Very high level question: I assume you looked at `ps -o rss` and disqualified 
it for some reason. Could you elaborate as to why? `ps` itself is certainly 
more portable than procfs (though `-o rss` is not part of the POSIX standard) - 
it works on RHEL and OSX. It also gives you the actual memory usage vs the 
number of pages.

- Chris Pettitt


On July 11, 2016, 6:57 a.m., Jagadish Venkatraman wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/49877/
> ---
> 
> (Updated July 11, 2016, 6:57 a.m.)
> 
> 
> Review request for samza, Boris Shkolnik, Chris Pettitt, Jake Maes, Yi Pan 
> (Data Infrastructure), and Navina Ramesh.
> 
> 
> Repository: samza
> 
> 
> Description
> ---
> 
> This feature introduces physical memory monitoring in SamzaContainer.
> 
> Context:
> Often memory used by the SamzaContainer process includes 
> A. JVM Heap memory: This is where all JVM variables live.
> B. Native memory: This memory lives out of the JVM heap and is not visible to 
> the JVM. Examples include used by RocksDb, native libraries that user code 
> depends on etc.
> 
> User jobs could be killed by Yarn if their total memory (A+B) exceeds the 
> configured maximum of yarn.container.memory.mb.
> 
> Currently, while our existing metrics provide visibility into [A] via JMX, we 
> don't have visibility into [B]. (as it's totally external to the JVM). 
> 
> This feature uses Linux ProcFS to provide a complete view of the memory (both 
> A & B) to help Samza users understand memory better. (Schedulers like Apache 
> Yarn that require a holistic view of memory (A+B) also use ProcFS. For the 
> curious, here's the Yarn implementation - 
> http://grepcode.com/file/repo1.maven.org/maven2/org.apache.hadoop/hadoop-yarn-common/0.23.1/org/apache/hadoop/yarn/util/ProcfsBasedProcessTree.java
>  that inspired this idea)
> 
> Scope: The scope of this RB is only to Linux distributions. (Mac based 
> implementation may be a separate change list.)
> 
> 
> Diffs
> -
> 
>   build.gradle ba4a9d14fe24e1ff170873920cd5eeef656955af 
>   checkstyle/import-control.xml 325c38131047836dc8aedaea4187598ef3ba7666 
>   
> samza-core/src/main/java/org/apache/samza/container/host/ProcfsBasedStatisticsMonitor.java
>  PRE-CREATION 
>   
> samza-core/src/main/java/org/apache/samza/container/host/SystemStatistics.java
>  PRE-CREATION 
>   
> samza-core/src/main/java/org/apache/samza/container/host/SystemStatisticsMonitor.java
>  PRE-CREATION 
>   samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 
> 18c09224bbae959342daf9b2b7a7d971cc224f48 
>   
> samza-core/src/main/scala/org/apache/samza/container/SamzaContainerMetrics.scala
>  2044ce01ffded8434e762d99355d5df43642c66b 
>   
> samza-core/src/test/java/org/apache/samza/container/host/TestProcfsBasedStatisticsMonitor.java
>  PRE-CREATION 
> 
> Diff: https://reviews.apache.org/r/49877/diff/
> 
> 
> Testing
> ---
> 
> 1. Unit tests with mock PROC-FS snapshots of processes
> 2. Deployed actual jobs on my dev box. 
>2.1 Obtained the operating system's view of the container memory using 
> 'ps' and other tools.
>2.2 Verified that the total memory reported by the monitor is the same as 
> the OS's view of memory[2.1]
> 3. Tested on various Linux distributions I could find internally:
> - RHEL release 6.4, 6.5, 6.6 (Santiago)
> 
> 
> Thanks,
> 
> Jagadish Venkatraman
> 
>



Re: Review Request 48356: RFC: Samza as a library

2016-06-27 Thread Chris Pettitt

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/48356/#review139626
---



A few more thoughts below.

Still not a fan of the direction we're going with the config. I know it is 
status quo, but it further locks us into a limited model. One other benefit of 
the Offspring way of doing config that occurred to me while reading this is 
that with Offspring you get all config violations in one shot versus once per 
run (e.g. Samza fails fast on first config problem). The latter is how LiSpring 
worked and we intentionally addressed that as a part of Offspring.


samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java (lines 
125 - 126)
<https://reviews.apache.org/r/48356/#comment204894>

Don't we need to stop the container directly here? shutdown will stop the 
executor from accepting any new work, but will not stop running work. In any 
case, wouldn't a clean shutdown here be better (e.g. for flushing state) then 
trying to force shutdown via the executor?



samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java (line 
145)
<https://reviews.apache.org/r/48356/#comment204893>

Do we need to ensure the previous container is stopped before starting the 
new container? For example, would it be possible for the new container and the 
old container to stomp on eachother's local state if they're running at the 
same time? container.stop appears to be asynchronous and doesn't appear to give 
you any guarantee about when the container is actually stopped.

---

Is the JobModelUpdateHandler called from the same thread that 
StreamProcessor.start is? If not (and given this is a callback its not a good 
assumption) you should make container volatile.



samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinator.java
 (line 44)
<https://reviews.apache.org/r/48356/#comment204898>

If this is write-once I would move this to the constructor and make it 
final. Otherwise does this need to be volatile? Its hard to tell as it is not 
clear how it is used. It might be worth noting in the class docs that this 
class is not thread safe.



samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinator.java
 (lines 60 - 61)
<https://reviews.apache.org/r/48356/#comment204896>

I would suggest using a block here versus a single statement. It is easy to 
break, e.g.:

```
if (systemFactoryClassName == null)
log.error("error message");
throw new SamzaException("error message")
```



samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinator.java
 (lines 92 - 94)
<https://reviews.apache.org/r/48356/#comment204897>

How is this used? It seems to be write only?



samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala (lines 107 - 
109)
<https://reviews.apache.org/r/48356/#comment204899>

Should this ensure that stop is complete before returning? Alternatively if 
we want to allow stop to be async, should we provide a way to wait for it?



samza-core/src/test/java/org/apache/samza/configbuilder/TestStandaloneConfigBuilder.java
 (line 50)
<https://reviews.apache.org/r/48356/#comment204900>

To future proof this a bit you could use 
AllSspToSingleTaskGrouperFactory.class.getName. Same for the one below.


- Chris Pettitt


On June 23, 2016, 1:14 a.m., Navina Ramesh wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/48356/
> ---
> 
> (Updated June 23, 2016, 1:14 a.m.)
> 
> 
> Review request for samza, Chris Pettitt and Yi Pan (Data Infrastructure).
> 
> 
> Repository: samza
> 
> 
> Description
> ---
> 
> Added ConfigBuilder and support classes
> 
> Added JobCoordinator interfaces
> 
> 
> Adding StreamProcessor, StandaloneJobCoordinator and updating SamzaContainer 
> interface
> 
> 
> Added TestStreamProcessor and some unit tests for ConfigBuilders
> 
> 
> Changing who defined processorId
> 
> 
> Fixed checkstyle errors
> 
> 
> Replaced SamzaException with ConfigException
> 
> 
> Removing localityManager instantiation from Samza Container
> 
> 
> Diffs
> -
> 
>   build.gradle ba4a9d14fe24e1ff170873920cd5eeef656955af 
>   checkstyle/import-control.xml 325c38131047836dc8aedaea4187598ef3ba7666 
>   
> samza-core/src/main/java/org/apache/samza/configbuilder/CheckpointConfig.java 
> PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/configbuilder/ConfigBuilder.java 
> PRE-CREATION 
>   
> samza-co

Re: Review Request 48243: SAMZA-961: Async tasks and multithreading model

2016-06-16 Thread Chris Pettitt

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/48243/#review138013
---


Fix it, then Ship it!




Contingent on perf tests, of course.


samza-core/src/main/java/org/apache/samza/task/TaskCallbackManager.java (line 
88)
<https://reviews.apache.org/r/48243/#comment203230>

I would probably make this an AtomicLong as it is the only thing in the way 
of this being thread-safe and it is not obvious whether seqNum is used in a 
multi-threaded context.



samza-core/src/main/java/org/apache/samza/task/TaskCallbackManager.java (line 
94)
<https://reviews.apache.org/r/48243/#comment203223>

    final


- Chris Pettitt


On June 15, 2016, 11:41 p.m., Xinyu Liu wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/48243/
> ---
> 
> (Updated June 15, 2016, 11:41 p.m.)
> 
> 
> Review request for samza, Chris Pettitt, Navina Ramesh, and Yi Pan (Data 
> Infrastructure).
> 
> 
> Repository: samza
> 
> 
> Description
> ---
> 
> This is the main part of the change, including the following:
> 
> - New API for AsyncStreamTask and callback.
> - Multithread scheduling in AsyncRunLoop
> - Callback management for asyn tasks
> 
> 
> Diffs
> -
> 
>   checkstyle/import-control.xml 7a09c7ed8ab372d2342f31e850ae09c605292eb2 
>   samza-api/src/main/java/org/apache/samza/task/AsyncStreamTask.java 
> PRE-CREATION 
>   samza-api/src/main/java/org/apache/samza/task/TaskCallback.java 
> PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/container/RunLoopFactory.java 
> PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java 
> PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/task/AsyncStreamTaskAdapter.java 
> PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/task/CoordinatorRequests.java 
> PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/task/TaskCallbackFactory.java 
> PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/task/TaskCallbackImpl.java 
> PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/task/TaskCallbackListener.java 
> PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/task/TaskCallbackManager.java 
> PRE-CREATION 
>   
> samza-core/src/main/java/org/apache/samza/task/TaskCallbackTimeoutException.java
>  PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/util/Utils.java PRE-CREATION 
>   samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala 
> 00648e49f8c7a9bbf5634e18ba0f95feb244613e 
>   samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala 
> 08a4debb06f9925ae741049abb2ee0df97b2243b 
>   samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala 
> 3f25eca6e3dffc57360e8bd8c435177c2a9a910a 
>   samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 
> cf3c4c0ab08a59760bc899c6f2027755e933b350 
>   
> samza-core/src/main/scala/org/apache/samza/container/SamzaContainerMetrics.scala
>  9e6641c3628290dc05e1eb5537e86bff9d37f92c 
>   samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala 
> d32a92976e43ca24033b48c91851ee706de7de6b 
>   
> samza-core/src/main/scala/org/apache/samza/container/TaskInstanceMetrics.scala
>  8b863887cf584d2d7a9b18181c7b0cd1e9dfec00 
>   samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala 
> 2efe836fc3b622cbe89e2042a37407f3cf732f58 
>   samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java 
> PRE-CREATION 
>   samza-core/src/test/java/org/apache/samza/task/TestAsyncStreamAdapter.java 
> PRE-CREATION 
>   samza-core/src/test/java/org/apache/samza/task/TestCoordinatorRequests.java 
> PRE-CREATION 
>   samza-core/src/test/java/org/apache/samza/task/TestTaskCallbackImpl.java 
> PRE-CREATION 
>   samza-core/src/test/java/org/apache/samza/task/TestTaskCallbackManager.java 
> PRE-CREATION 
>   samza-core/src/test/scala/org/apache/samza/container/TestRunLoop.scala 
> e280daa9626757cb4d17c0c03eed923277230c3e 
>   
> samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala 
> 1358fdd8a386f5f81128ef871c72833d8cf11d86 
>   samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala 
> 5457f0e05ae4d615b9c86f48a662c54b13828e78 
>   samza-core/src/test/scala/org/apache/samza/system/TestSystemConsumers.scala 
> 09da62e0f9a10f7c3683345a309c6278ff01fb4b 
> 
> Diff: https://reviews.apache.org/r/48243/diff/
> 
> 
> Testing
> ---
> 
> unit tests and local testing.
> 
> 
> Thanks,
> 
> Xinyu Liu
> 
>



Re: Review Request 48243: SAMZA-961: Async tasks and multithreading model

2016-06-16 Thread Chris Pettitt


> On June 15, 2016, 3:08 p.m., Chris Pettitt wrote:
> > samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java, line 188
> > <https://reviews.apache.org/r/48243/diff/2/?file=1412994#file1412994line188>
> >
> > Do we need to handle the case that the SSP is not in the mappings or is 
> > that impossible?
> 
> Xinyu Liu wrote:
> For current version of Samza the SSPs is assigned to a task in the job 
> start up time so it's won't change during the life cycle of run loop. If this 
> is going to change in the future, we need to revisit the logic here.

I'd suggest being defensive since it is low cost. For this you would just need 
to do a null check on the worker before using it.


> On June 15, 2016, 3:08 p.m., Chris Pettitt wrote:
> > samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java, line 403
> > <https://reviews.apache.org/r/48243/diff/2/?file=1412994#file1412994line403>
> >
> > Same comment as above re. run loop termination. You potentially could 
> > even have a abortRunLoop(Throwable) function that would make this super 
> > clear.
> 
> Xinyu Liu wrote:
> Added the AsyncRunLoop.abort(Throwable).

Cool, looks good!


- Chris


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/48243/#review137543
---


On June 15, 2016, 11:41 p.m., Xinyu Liu wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/48243/
> -------
> 
> (Updated June 15, 2016, 11:41 p.m.)
> 
> 
> Review request for samza, Chris Pettitt, Navina Ramesh, and Yi Pan (Data 
> Infrastructure).
> 
> 
> Repository: samza
> 
> 
> Description
> ---
> 
> This is the main part of the change, including the following:
> 
> - New API for AsyncStreamTask and callback.
> - Multithread scheduling in AsyncRunLoop
> - Callback management for asyn tasks
> 
> 
> Diffs
> -
> 
>   checkstyle/import-control.xml 7a09c7ed8ab372d2342f31e850ae09c605292eb2 
>   samza-api/src/main/java/org/apache/samza/task/AsyncStreamTask.java 
> PRE-CREATION 
>   samza-api/src/main/java/org/apache/samza/task/TaskCallback.java 
> PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/container/RunLoopFactory.java 
> PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java 
> PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/task/AsyncStreamTaskAdapter.java 
> PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/task/CoordinatorRequests.java 
> PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/task/TaskCallbackFactory.java 
> PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/task/TaskCallbackImpl.java 
> PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/task/TaskCallbackListener.java 
> PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/task/TaskCallbackManager.java 
> PRE-CREATION 
>   
> samza-core/src/main/java/org/apache/samza/task/TaskCallbackTimeoutException.java
>  PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/util/Utils.java PRE-CREATION 
>   samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala 
> 00648e49f8c7a9bbf5634e18ba0f95feb244613e 
>   samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala 
> 08a4debb06f9925ae741049abb2ee0df97b2243b 
>   samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala 
> 3f25eca6e3dffc57360e8bd8c435177c2a9a910a 
>   samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 
> cf3c4c0ab08a59760bc899c6f2027755e933b350 
>   
> samza-core/src/main/scala/org/apache/samza/container/SamzaContainerMetrics.scala
>  9e6641c3628290dc05e1eb5537e86bff9d37f92c 
>   samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala 
> d32a92976e43ca24033b48c91851ee706de7de6b 
>   
> samza-core/src/main/scala/org/apache/samza/container/TaskInstanceMetrics.scala
>  8b863887cf584d2d7a9b18181c7b0cd1e9dfec00 
>   samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala 
> 2efe836fc3b622cbe89e2042a37407f3cf732f58 
>   samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java 
> PRE-CREATION 
>   samza-core/src/test/java/org/apache/samza/task/TestAsyncStreamAdapter.java 
> PRE-CREATION 
>   samza-core/src/test/java/org/apache/samza/task/TestCoordinatorRequests.java 
> PRE-CREATION 
>   samza-core/src/test/java/org/apache/samza/task/TestTaskCallbackImpl.java 
> PRE-CREATIO

Re: Review Request 48243: SAMZA-961: Async tasks and multithreading model

2016-06-15 Thread Chris Pettitt

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/48243/#review137543
---




samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java (lines 88 - 91)
<https://reviews.apache.org/r/48243/#comment202688>

Wrap the result in a Collections.immutableMap to ensure it cannot later be 
changed.



samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java (line 92)
<https://reviews.apache.org/r/48243/#comment202691>

Wrap in immutable map to ensure this cannot be later changed.



samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java (line 183)
<https://reviews.apache.org/r/48243/#comment202692>

This is not tue for all cases (e.g. process). You could probably just pull 
this comment.



samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java (line 188)
<https://reviews.apache.org/r/48243/#comment202693>

Do we need to handle the case that the SSP is not in the mappings or is 
that impossible?



samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java (line 368)
<https://reviews.apache.org/r/48243/#comment202944>

It might be worth noting that this will cause the run loop to terminate.



samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java (line 403)
<https://reviews.apache.org/r/48243/#comment202945>

Same comment as above re. run loop termination. You potentially could even 
have a abortRunLoop(Throwable) function that would make this super clear.


We should sync up on how to coordinate the disk quota changes and this change.

- Chris Pettitt


On June 9, 2016, 7:49 p.m., Xinyu Liu wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/48243/
> ---
> 
> (Updated June 9, 2016, 7:49 p.m.)
> 
> 
> Review request for samza, Chris Pettitt, Navina Ramesh, and Yi Pan (Data 
> Infrastructure).
> 
> 
> Repository: samza
> 
> 
> Description
> ---
> 
> This is the main part of the change, including the following:
> 
> - New API for AsyncStreamTask and callback.
> - Multithread scheduling in AsyncRunLoop
> - Callback management for asyn tasks
> 
> 
> Diffs
> -
> 
>   checkstyle/import-control.xml 7a09c7ed8ab372d2342f31e850ae09c605292eb2 
>   samza-api/src/main/java/org/apache/samza/task/AsyncStreamTask.java 
> PRE-CREATION 
>   samza-api/src/main/java/org/apache/samza/task/TaskCallback.java 
> PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/container/RunLoopFactory.java 
> PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java 
> PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/task/AsyncStreamTaskAdapter.java 
> PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/task/CoordinatorRequests.java 
> PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/task/TaskCallbackFactory.java 
> PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/task/TaskCallbackImpl.java 
> PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/task/TaskCallbackListener.java 
> PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/task/TaskCallbackManager.java 
> PRE-CREATION 
>   
> samza-core/src/main/java/org/apache/samza/task/TaskCallbackTimeoutException.java
>  PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/util/Utils.java PRE-CREATION 
>   samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala 
> 00648e49f8c7a9bbf5634e18ba0f95feb244613e 
>   samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala 
> 08a4debb06f9925ae741049abb2ee0df97b2243b 
>   samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala 
> 3f25eca6e3dffc57360e8bd8c435177c2a9a910a 
>   samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 
> cf3c4c0ab08a59760bc899c6f2027755e933b350 
>   
> samza-core/src/main/scala/org/apache/samza/container/SamzaContainerMetrics.scala
>  9e6641c3628290dc05e1eb5537e86bff9d37f92c 
>   samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala 
> d32a92976e43ca24033b48c91851ee706de7de6b 
>   
> samza-core/src/main/scala/org/apache/samza/container/TaskInstanceMetrics.scala
>  8b863887cf584d2d7a9b18181c7b0cd1e9dfec00 
>   samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala 
> 2efe836fc3b622cbe89e2042a37407f3cf732f58 
>   samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java 
> PRE-CREATION 
>   samza-core/src/test/java/org/apache/samza/task/TestAsyncStreamAdapter.ja

Re: Review Request 48243: SAMZA-961: Async tasks and multithreading model

2016-06-15 Thread Chris Pettitt


> On June 15, 2016, 6:49 a.m., Yi Pan (Data Infrastructure) wrote:
> > samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java, line 578
> > <https://reviews.apache.org/r/48243/diff/2/?file=1412994#file1412994line578>
> >
> > Don't quite get this point. It seems to me that the broadcast stream 
> > message will be called as worker.state.insertEnvelope(envelope) for all 
> > tasks, which will essentially create a PendingEnvelope for each task. 
> > Hence, w/ the current code, it will call consumerMultiplexer.tryUpdate() 
> > from each task???

I had the same comment on the last pass. I don't understand how this is 
supposed to work. Might be worth discussing this in person with Xinyu and then 
either fixing the code or the doc as appropriate.


- Chris


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/48243/#review137369
---


On June 9, 2016, 7:49 p.m., Xinyu Liu wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/48243/
> ---
> 
> (Updated June 9, 2016, 7:49 p.m.)
> 
> 
> Review request for samza, Chris Pettitt, Navina Ramesh, and Yi Pan (Data 
> Infrastructure).
> 
> 
> Repository: samza
> 
> 
> Description
> ---
> 
> This is the main part of the change, including the following:
> 
> - New API for AsyncStreamTask and callback.
> - Multithread scheduling in AsyncRunLoop
> - Callback management for asyn tasks
> 
> 
> Diffs
> -
> 
>   checkstyle/import-control.xml 7a09c7ed8ab372d2342f31e850ae09c605292eb2 
>   samza-api/src/main/java/org/apache/samza/task/AsyncStreamTask.java 
> PRE-CREATION 
>   samza-api/src/main/java/org/apache/samza/task/TaskCallback.java 
> PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/container/RunLoopFactory.java 
> PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java 
> PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/task/AsyncStreamTaskAdapter.java 
> PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/task/CoordinatorRequests.java 
> PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/task/TaskCallbackFactory.java 
> PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/task/TaskCallbackImpl.java 
> PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/task/TaskCallbackListener.java 
> PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/task/TaskCallbackManager.java 
> PRE-CREATION 
>   
> samza-core/src/main/java/org/apache/samza/task/TaskCallbackTimeoutException.java
>  PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/util/Utils.java PRE-CREATION 
>   samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala 
> 00648e49f8c7a9bbf5634e18ba0f95feb244613e 
>   samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala 
> 08a4debb06f9925ae741049abb2ee0df97b2243b 
>   samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala 
> 3f25eca6e3dffc57360e8bd8c435177c2a9a910a 
>   samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 
> cf3c4c0ab08a59760bc899c6f2027755e933b350 
>   
> samza-core/src/main/scala/org/apache/samza/container/SamzaContainerMetrics.scala
>  9e6641c3628290dc05e1eb5537e86bff9d37f92c 
>   samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala 
> d32a92976e43ca24033b48c91851ee706de7de6b 
>   
> samza-core/src/main/scala/org/apache/samza/container/TaskInstanceMetrics.scala
>  8b863887cf584d2d7a9b18181c7b0cd1e9dfec00 
>   samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala 
> 2efe836fc3b622cbe89e2042a37407f3cf732f58 
>   samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java 
> PRE-CREATION 
>   samza-core/src/test/java/org/apache/samza/task/TestAsyncStreamAdapter.java 
> PRE-CREATION 
>   samza-core/src/test/java/org/apache/samza/task/TestCoordinatorRequests.java 
> PRE-CREATION 
>   samza-core/src/test/java/org/apache/samza/task/TestTaskCallbackImpl.java 
> PRE-CREATION 
>   samza-core/src/test/java/org/apache/samza/task/TestTaskCallbackManager.java 
> PRE-CREATION 
>   samza-core/src/test/scala/org/apache/samza/container/TestRunLoop.scala 
> e280daa9626757cb4d17c0c03eed923277230c3e 
>   
> samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala 
> 1358fdd8a386f5f81128ef871c72833d8cf11d86 
>   samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala 
> 5457f0e05ae4d615b9c86f48a662c54b13828e78 
>   samza-core/src/test/scala/org/apache/samza/system/TestSystemConsumers.scala 
> 09da62e0f9a10f7c3683345a309c6278ff01fb4b 
> 
> Diff: https://reviews.apache.org/r/48243/diff/
> 
> 
> Testing
> ---
> 
> unit tests and local testing.
> 
> 
> Thanks,
> 
> Xinyu Liu
> 
>



Re: Review Request 48080: SAMZA-956: Disk Quotas: Add throttler and disk quota enforcement

2016-06-13 Thread Chris Pettitt

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/48080/
---

(Updated June 13, 2016, 10:28 p.m.)


Review request for samza.


Changes
---

More consistent use of "entry" where "policy" was previously used as policy now 
means a collection of policy entries.


Repository: samza


Description
---

This change introduces a ThrottlingExecutor which is used to control the
rate of execution in the main run loop. The DiskQuotaEnforcer houses the
rules for switching from one DiskQuotaPolicy to the next as new disk
usage samples arrive.

By default, no throttling will occur. New policies can be added using
the following form:

```
container.disk.quota.bytes=XXX
container.disk.quota.policy.count=2
container.disk.quota.policy.0.lowWaterMark=0.4
container.disk.quota.policy.0.highWaterMark=0.5
container.disk.quota.policy.0.workFactor=0.5
container.disk.quota.policy.1.lowWaterMark=0.05
container.disk.quota.policy.1.highWaterMark=0.1
container.disk.quota.policy.1.workFactor=0.05
```

See ThrottlingExecutor for details about how the work factor works and
DiskQuotaPolicy for details about how the low and high water mark
configuration work.


Diffs (updated)
-

  samza-core/src/main/java/org/apache/samza/container/disk/DiskQuotaPolicy.java 
PRE-CREATION 
  
samza-core/src/main/java/org/apache/samza/container/disk/DiskQuotaPolicyFactory.java
 PRE-CREATION 
  
samza-core/src/main/java/org/apache/samza/container/disk/DiskSpaceMonitor.java 
2a565be7858a4d3a6adbc49989b43b71ca3f6721 
  
samza-core/src/main/java/org/apache/samza/container/disk/NoThrottlingDiskQuotaPolicy.java
 PRE-CREATION 
  
samza-core/src/main/java/org/apache/samza/container/disk/NoThrottlingDiskQuotaPolicyFactory.java
 PRE-CREATION 
  
samza-core/src/main/java/org/apache/samza/container/disk/WatermarkDiskQuotaPolicy.java
 PRE-CREATION 
  
samza-core/src/main/java/org/apache/samza/container/disk/WatermarkDiskQuotaPolicyFactory.java
 PRE-CREATION 
  samza-core/src/main/java/org/apache/samza/util/HighResolutionClock.java 
PRE-CREATION 
  samza-core/src/main/java/org/apache/samza/util/SystemHighResolutionClock.java 
PRE-CREATION 
  samza-core/src/main/java/org/apache/samza/util/ThrottlingExecutor.java 
PRE-CREATION 
  samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala 
6916c5c71e479d43a7435fa4987565d93ed437ac 
  samza-core/src/main/scala/org/apache/samza/container/SameThreadExecutor.scala 
PRE-CREATION 
  samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 
cf3c4c0ab08a59760bc899c6f2027755e933b350 
  
samza-core/src/main/scala/org/apache/samza/container/SamzaContainerMetrics.scala
 9e6641c3628290dc05e1eb5537e86bff9d37f92c 
  samza-core/src/main/scala/org/apache/samza/util/Util.scala 
fc3d085d7fff9f7dcec766ba48e550eb0052e99d 
  
samza-core/src/test/java/org/apache/samza/container/disk/TestDiskQuotaPolicyEntry.java
 PRE-CREATION 
  
samza-core/src/test/java/org/apache/samza/container/disk/TestWatermarkDiskQuotaPolicy.java
 PRE-CREATION 
  samza-core/src/test/java/org/apache/samza/util/TestThrottlingExecutor.java 
PRE-CREATION 
  samza-core/src/test/scala/org/apache/samza/util/TestUtil.scala 
05b4e5c37578340eefe6d412f5a76f540bec6fa6 

Diff: https://reviews.apache.org/r/48080/diff/


Testing
---

- Added new unit tests
- Ran existing tests with gradle test
- Verified throttling behavior and instrumentation with local deployment
- Verified average latency impact of feature to be < 150ns for Linux and OSX


Thanks,

Chris Pettitt



Re: Review Request 48182: SAMZA-958: Make store/cache thread safe

2016-06-09 Thread Chris Pettitt

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/48182/#review136879
---


Ship it!




Ship It!

- Chris Pettitt


On June 9, 2016, 12:33 a.m., Xinyu Liu wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/48182/
> ---
> 
> (Updated June 9, 2016, 12:33 a.m.)
> 
> 
> Review request for samza, Chris Pettitt, Navina Ramesh, and Yi Pan (Data 
> Infrastructure).
> 
> 
> Repository: samza
> 
> 
> Description
> ---
> 
> All the existing stores/cache need to be thread safe in order to be used by 
> multithreaded tasks. The following changes are made to ensure the thread 
> safety of the stores:
> 
> For CachedStore, use sychronized lock for each public function;
> For current InMemoryKeyValueStore, use ConcurrentSkipListMap as underlying 
> map for thread safety.
> For store Iterator, do not support remove functionality (throw 
> UnsupportedOperationException like RocksDb does today).
> 
> 
> Diffs
> -
> 
>   
> samza-kv-inmemory/src/main/scala/org/apache/samza/storage/kv/inmemory/InMemoryKeyValueStore.scala
>  72f25a354eaa98e8df379d07d9cc8613dfafd13a 
>   
> samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala
>  f0965aec5f3ec2a214dc40c70832c58273623749 
>   
> samza-kv-rocksdb/src/test/scala/org/apache/samza/storage/kv/TestRocksDbKeyValueStore.scala
>  b7f1cdc4dbaeea2413cee2ad60d74528f3950513 
>   samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStore.scala 
> c28f8db8cb59bd5415e78535877acc1e5bee0f67 
>   samza-kv/src/test/scala/org/apache/samza/storage/kv/TestCachedStore.scala 
> eee74473726cb2a36d0b75fe5c9df737440980bc 
>   
> samza-test/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStores.scala
>  23f8a1a6bee8ef38e0640a4e90778e53d982deeb 
> 
> Diff: https://reviews.apache.org/r/48182/diff/
> 
> 
> Testing
> ---
> 
> Unit tests and local deployment.
> 
> 
> Thanks,
> 
> Xinyu Liu
> 
>



Re: Review Request 48182: SAMZA-958: Make store/cache thread safe

2016-06-09 Thread Chris Pettitt


> On June 9, 2016, 12:28 a.m., Yi Pan (Data Infrastructure) wrote:
> > samza-kv-rocksdb/src/test/scala/org/apache/samza/storage/kv/TestRocksDbKeyValueStore.scala,
> >  line 90
> > <https://reviews.apache.org/r/48182/diff/2/?file=1406146#file1406146line90>
> >
> > This is a test that Xinyu ported over from my experiments. Totally 
> > agreed that the names are not well-thought-out and I was mainly focusing on 
> > testing all RocksDB put/get/remove/iterator behavior under the synchronized 
> > lock. It turns out that RocksDB access is safe under the synchronized lock 
> > and this test probably is no longer needed.

+1. I don't think these tests are particularly useful as a part of the regular 
test suite.


- Chris


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/48182/#review136735
---


On June 9, 2016, 12:33 a.m., Xinyu Liu wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/48182/
> ---
> 
> (Updated June 9, 2016, 12:33 a.m.)
> 
> 
> Review request for samza, Chris Pettitt, Navina Ramesh, and Yi Pan (Data 
> Infrastructure).
> 
> 
> Repository: samza
> 
> 
> Description
> ---
> 
> All the existing stores/cache need to be thread safe in order to be used by 
> multithreaded tasks. The following changes are made to ensure the thread 
> safety of the stores:
> 
> For CachedStore, use sychronized lock for each public function;
> For current InMemoryKeyValueStore, use ConcurrentSkipListMap as underlying 
> map for thread safety.
> For store Iterator, do not support remove functionality (throw 
> UnsupportedOperationException like RocksDb does today).
> 
> 
> Diffs
> -
> 
>   
> samza-kv-inmemory/src/main/scala/org/apache/samza/storage/kv/inmemory/InMemoryKeyValueStore.scala
>  72f25a354eaa98e8df379d07d9cc8613dfafd13a 
>   
> samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala
>  f0965aec5f3ec2a214dc40c70832c58273623749 
>   
> samza-kv-rocksdb/src/test/scala/org/apache/samza/storage/kv/TestRocksDbKeyValueStore.scala
>  b7f1cdc4dbaeea2413cee2ad60d74528f3950513 
>   samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStore.scala 
> c28f8db8cb59bd5415e78535877acc1e5bee0f67 
>   samza-kv/src/test/scala/org/apache/samza/storage/kv/TestCachedStore.scala 
> eee74473726cb2a36d0b75fe5c9df737440980bc 
>   
> samza-test/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStores.scala
>  23f8a1a6bee8ef38e0640a4e90778e53d982deeb 
> 
> Diff: https://reviews.apache.org/r/48182/diff/
> 
> 
> Testing
> ---
> 
> Unit tests and local deployment.
> 
> 
> Thanks,
> 
> Xinyu Liu
> 
>



Re: Review Request 48213: SAMZA-960: Make system producer thread safe

2016-06-09 Thread Chris Pettitt

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/48213/#review136875
---


Ship it!




Ship It!

- Chris Pettitt


On June 8, 2016, 11:53 p.m., Xinyu Liu wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/48213/
> ---
> 
> (Updated June 8, 2016, 11:53 p.m.)
> 
> 
> Review request for samza, Chris Pettitt, Navina Ramesh, and Yi Pan (Data 
> Infrastructure).
> 
> 
> Repository: samza
> 
> 
> Description
> ---
> 
> All the system producers need to be thread safe in order to be used in 
> multithreaded tasks. The following are the changes 
> (ElasticSearchSystemProducer is already thread safe so no change made there):
> 
> In KafkaSystemProducer, remove the buggy retry logic and treat any exception 
> as fatal.
> In HdfsSystemProducer, add synchronization lock to all public methods.
> 
> 
> Diffs
> -
> 
>   
> samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsSystemProducer.scala
>  1f4b5c46436e44b7c7cd1a49689c4f43f1f6ed1b 
>   
> samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala
>  3769e103616dc0f1fd869706cc086e24cd926c48 
>   
> samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemProducerJava.java
>  04c9113fd6c3dd56c49ff46c8c1c0ff12f68e5e2 
>   
> samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemProducer.scala
>  8e32bba6ced090f0fc8d4e5176fe0788df36981d 
> 
> Diff: https://reviews.apache.org/r/48213/diff/
> 
> 
> Testing
> ---
> 
> Unit tests and local testing.
> 
> 
> Thanks,
> 
> Xinyu Liu
> 
>



Re: Review Request 48356: RFC: Samza as a library

2016-06-09 Thread Chris Pettitt
to have code to handle invalid states (e.g. stop called before start, etc.).



samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java (line 
41)
<https://reviews.apache.org/r/48356/#comment201968>

This is not what I was getting at. What I was intending was that your 
factory for this would take an optional executor that would be passed here at 
construction time. In its current state it is not overridable without 
subclassing.



samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java (line 
42)
<https://reviews.apache.org/r/48356/#comment201971>

I'd make this final.



samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java (line 
47)
<https://reviews.apache.org/r/48356/#comment201967>

What is the purpose of this class? It seems to be trying to manage 
orchestration for a few different services? If so, I'd definitely give this a 
name that better reflects the intent. I'd also suggest not making it also be a 
JobModelUpdateHandler (that could potentially be an inner class).



samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java (line 
52)
<https://reviews.apache.org/r/48356/#comment201964>

It is not thread-safe to leak the this pointer from the constructor.



samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java (line 
68)
<https://reviews.apache.org/r/48356/#comment201970>

To reinforce some comments above, this can lead to an NPE if stop is called 
before start.



samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 
(lines 617 - 619)
<https://reviews.apache.org/r/48356/#comment201973>

I hope this is temporary. If not, if would be nicer to provide proper 
lifecycle / shutdown mechansisms. For example, on stop, I may want to flush a 
cache to disk.


- Chris Pettitt


On June 8, 2016, 9:59 p.m., Navina Ramesh wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/48356/
> -------
> 
> (Updated June 8, 2016, 9:59 p.m.)
> 
> 
> Review request for samza and Chris Pettitt.
> 
> 
> Repository: samza
> 
> 
> Description
> ---
> 
> Added ConfigBuilder and support classes
> 
> Added JobCoordinator interfaces
> 
> 
> Adding StreamProcessor, StandaloneJobCoordinator and updating SamzaContainer 
> interface
> 
> 
> Added TestStreamProcessor and some unit tests for ConfigBuilders
> 
> 
> Changing who defined processorId
> 
> 
> Fixed checkstyle errors
> 
> 
> Replaced SamzaException with ConfigException
> 
> 
> Removing localityManager instantiation from Samza Container
> 
> 
> Diffs
> -
> 
>   build.gradle 16facbbf4dff378c561461786ff186bd9eed 
>   checkstyle/import-control.xml 7a09c7ed8ab372d2342f31e850ae09c605292eb2 
>   
> samza-core/src/main/java/org/apache/samza/configbuilder/BuilderInterface.java 
> PRE-CREATION 
>   
> samza-core/src/main/java/org/apache/samza/configbuilder/CheckpointConfig.java 
> PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/configbuilder/ConfigBuilder.java 
> PRE-CREATION 
>   
> samza-core/src/main/java/org/apache/samza/configbuilder/GenericConfigBuilder.java
>  PRE-CREATION 
>   
> samza-core/src/main/java/org/apache/samza/configbuilder/KafkaCheckpointConfig.java
>  PRE-CREATION 
>   
> samza-core/src/main/java/org/apache/samza/configbuilder/KafkaSystemConfig.java
>  PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/configbuilder/SerdeConfig.java 
> PRE-CREATION 
>   
> samza-core/src/main/java/org/apache/samza/configbuilder/StandaloneConfigBuilder.java
>  PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/configbuilder/SystemConfig.java 
> PRE-CREATION 
>   
> samza-core/src/main/java/org/apache/samza/configbuilder/SystemStreamConfig.java
>  PRE-CREATION 
>   
> samza-core/src/main/java/org/apache/samza/container/grouper/stream/AllSspToSingleTaskGrouper.java
>  PRE-CREATION 
>   
> samza-core/src/main/java/org/apache/samza/container/grouper/stream/AllSspToSingleTaskGrouperFactory.java
>  PRE-CREATION 
>   
> samza-core/src/main/java/org/apache/samza/container/grouper/task/SingleContainerGrouper.java
>  PRE-CREATION 
>   
> samza-core/src/main/java/org/apache/samza/container/grouper/task/SingleContainerGrouperFactory.java
>  PRE-CREATION 
>   
> samza-core/src/main/java/org/apache/samza/coordinator/AbstractJobCoordinator.java
>  PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinator.java 
> PRE-CREATION 
>   
> samza-core/src/main/java/org/apache/samza/coordinator/JobCoord

Re: Review Request 48243: SAMZA-961: Async tasks and multithreading model

2016-06-08 Thread Chris Pettitt

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/48243/#review136357
---



Biggest take away on this pass is that the async run loop code would be more 
readable without the two level deep anonymous inner classes. Surfacing how 
these classes interact with eachother should make this easier to review.


samza-core/src/main/java/org/apache/samza/container/RunLoopFactory.java (line 
73)
<https://reviews.apache.org/r/48243/#comment201378>

It's been a while and I'm not sure if we discussed: is the goal to 
ultimately just switch over to async task for both cases (async and sync)?



samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java (line 118)
<https://reviews.apache.org/r/48243/#comment201758>

As far as I can tell, this will only run window and commit in the thread 
pool but otherwise will run process on the current thread.



samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java (lines 123 - 
128)
<https://reviews.apache.org/r/48243/#comment201794>

I believe this was copied from the original run loop. It seems to me that 
it would be nice to have the run loop have lifecycle methods (e.g. shutdown) 
and have some other application setup class (the container?) that is 
responsible for tying shutdown of the process to shutting down the run loop. 
This would allow the run loop lifecycle to be decouple from the process 
lifecycle at no loss on generality.



samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java (lines 275 - 
276)
<https://reviews.apache.org/r/48243/#comment201783>

Only for commit and window, right?



samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java (lines 437 - 
442)
<https://reviews.apache.org/r/48243/#comment201786>

This is a bit confusing to me. I see that we're enqueueing an evelope 
wrapped in a pending envelope per task worker, but I don't see how we would end 
up dequeueing the same pending envelope twice. Could we expand on the 
documentation to make it clearer?

Also, depending on how we end up dequeueing twice we need to ensure that 
the state change (mark processed) is visible. Is that the case? The state is 
mutable and we're not using anything to publish the state change (as far as I 
can see). Either we should ensure this state is visible across threads, e.g. 
with a volatile or CAS, or we should document why it doesn't need to be.



samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java (line 518)
<https://reviews.apache.org/r/48243/#comment201791>

This is the second level deep non-static inner class. At this point 
reasoning about the interactions between everything is getting pretty 
difficult. Instead, could we make the interactions a clearer by making these 
inner classes top-level, package private? I suspect this might involve 
shuffling some things around. For example, maybe pendingEnvelopeQueue really 
belongs in workerState and the worker inserts and removes via the state 
instance.

Having a smaller non-static inner class, e.g. for a callback, is reasonable 
if it is not too complicated, but both of these, as is, are a bit too heavy.



samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala (lines 71 - 
75)
<https://reviews.apache.org/r/48243/#comment201795>

Same comment as for the async run loop.


- Chris Pettitt


On June 4, 2016, 1:18 a.m., Xinyu Liu wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/48243/
> ---
> 
> (Updated June 4, 2016, 1:18 a.m.)
> 
> 
> Review request for samza, Chris Pettitt, Navina Ramesh, and Yi Pan (Data 
> Infrastructure).
> 
> 
> Repository: samza
> 
> 
> Description
> ---
> 
> This is the main part of the change, including the following:
> 
> - New API for AsyncStreamTask and callback.
> - Multithread scheduling in AsyncRunLoop
> - Callback management for asyn tasks
> 
> 
> Diffs
> -
> 
>   checkstyle/import-control.xml 7a09c7ed8ab372d2342f31e850ae09c605292eb2 
>   samza-api/src/main/java/org/apache/samza/task/AsyncStreamTask.java 
> PRE-CREATION 
>   samza-api/src/main/java/org/apache/samza/task/TaskCallback.java 
> PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/container/RunLoopFactory.java 
> PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java 
> PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/task/AsyncStreamTaskAdapter.java 
> PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/task/CoordinatorRequests.java 
> PRE-CREATION 
>   samza-core/src/mai

Re: Review Request 48213: SAMZA-960: Make system producer thread safe

2016-06-06 Thread Chris Pettitt

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/48213/#review136338
---


Fix it, then Ship it!





samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsSystemProducer.scala 
(line 86)
<https://reviews.apache.org/r/48213/#comment201357>

I think you can drop this synchronized call. Otherwise the one inside the 
function is redundant.



samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala
 (lines 64 - 65)
<https://reviews.apache.org/r/48213/#comment201358>

Why not use null to indicate the created state? In particular, why don't 
you null out the producer when it is no longer needed. Leaving a ref holds in 
memory all of the objects reachable from the producer whether they are needed 
or not. Hopefully close on the producer covers that, but the null approach is 
safer with no apparent downside.



samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala
 (line 97)
<https://reviews.apache.org/r/48213/#comment201359>

I think I see why you were trying to not null out the producer above. 
However, this can still NPE if start has not been called. Instead, now that 
producer is volatile, why not grab its state at the beginning of this function. 
If it is null throw an error, otherwise you can use it (assuming that producer 
handles close correctly). Another benefit is that you reduce the number of 
volatile reads you need to make.


- Chris Pettitt


On June 3, 2016, 10:09 p.m., Xinyu Liu wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/48213/
> ---
> 
> (Updated June 3, 2016, 10:09 p.m.)
> 
> 
> Review request for samza, Chris Pettitt, Navina Ramesh, and Yi Pan (Data 
> Infrastructure).
> 
> 
> Repository: samza
> 
> 
> Description
> ---
> 
> All the system producers need to be thread safe in order to be used in 
> multithreaded tasks. The following are the changes 
> (ElasticSearchSystemProducer is already thread safe so no change made there):
> 
> In KafkaSystemProducer, remove the buggy retry logic and treat any exception 
> as fatal.
> In HdfsSystemProducer, add synchronization lock to all public methods.
> 
> 
> Diffs
> -
> 
>   
> samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsSystemProducer.scala
>  1f4b5c46436e44b7c7cd1a49689c4f43f1f6ed1b 
>   
> samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala
>  3769e103616dc0f1fd869706cc086e24cd926c48 
>   
> samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemProducerJava.java
>  04c9113fd6c3dd56c49ff46c8c1c0ff12f68e5e2 
>   
> samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemProducer.scala
>  8e32bba6ced090f0fc8d4e5176fe0788df36981d 
> 
> Diff: https://reviews.apache.org/r/48213/diff/
> 
> 
> Testing
> ---
> 
> Unit tests and local testing.
> 
> 
> Thanks,
> 
> Xinyu Liu
> 
>



Re: Review Request 48182: SAMZA-958: Make store/cache thread safe

2016-06-06 Thread Chris Pettitt

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/48182/#review136335
---


Fix it, then Ship it!





samza-test/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStores.scala 
(line 412)
<https://reviews.apache.org/r/48182/#comment201356>

This might be sufficient, but I've seen some excessively long GC pauses and 
the like on Hudson. Something like 10s has worked well for me in the past and 
the join should take nowhere near that long in most cases.



samza-test/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStores.scala 
(lines 490 - 491)
<https://reviews.apache.org/r/48182/#comment201353>

I would name these a little differently to improve readability. Something 
like runner3StartedLatch and runner1FinishedLatch; or startRunner2 and 
startRunner1 latch; or similar.



samza-test/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStores.scala 
(line 527)
<https://reviews.apache.org/r/48182/#comment201355>

How about actually capturing the test failure and rethrowing from the main 
thread? It will give you a much more helpful error message.


- Chris Pettitt


On June 3, 2016, 9:30 p.m., Xinyu Liu wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/48182/
> ---
> 
> (Updated June 3, 2016, 9:30 p.m.)
> 
> 
> Review request for samza, Chris Pettitt, Navina Ramesh, and Yi Pan (Data 
> Infrastructure).
> 
> 
> Repository: samza
> 
> 
> Description
> ---
> 
> All the existing stores/cache need to be thread safe in order to be used by 
> multithreaded tasks. The following changes are made to ensure the thread 
> safety of the stores:
> 
> For CachedStore, use sychronized lock for each public function;
> For current InMemoryKeyValueStore, use ConcurrentSkipListMap as underlying 
> map for thread safety.
> For store Iterator, do not support remove functionality (throw 
> UnsupportedOperationException like RocksDb does today).
> 
> 
> Diffs
> -
> 
>   
> samza-kv-inmemory/src/main/scala/org/apache/samza/storage/kv/inmemory/InMemoryKeyValueStore.scala
>  72f25a354eaa98e8df379d07d9cc8613dfafd13a 
>   
> samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala
>  f0965aec5f3ec2a214dc40c70832c58273623749 
>   
> samza-kv-rocksdb/src/test/scala/org/apache/samza/storage/kv/TestRocksDbKeyValueStore.scala
>  b7f1cdc4dbaeea2413cee2ad60d74528f3950513 
>   samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStore.scala 
> c28f8db8cb59bd5415e78535877acc1e5bee0f67 
>   samza-kv/src/test/scala/org/apache/samza/storage/kv/TestCachedStore.scala 
> eee74473726cb2a36d0b75fe5c9df737440980bc 
>   
> samza-test/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStores.scala
>  23f8a1a6bee8ef38e0640a4e90778e53d982deeb 
> 
> Diff: https://reviews.apache.org/r/48182/diff/
> 
> 
> Testing
> ---
> 
> Unit tests and local deployment.
> 
> 
> Thanks,
> 
> Xinyu Liu
> 
>



Re: Review Request 48213: SAMZA-960: Make system producer thread safe

2016-06-03 Thread Chris Pettitt

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/48213/#review136097
---




samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsSystemProducer.scala 
(line 76)
<https://reviews.apache.org/r/48213/#comment201099>

It is generally not a good idea to making logging calls while holding a 
lock (actually any code you don't control, but we know logging is particularly 
a bad actor). The reason is that default logger configurations typically use a 
blocking strategy to handle overload. If this code is intended to be 
parallelized this can block anything that needs to use the lock.



samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala
 (line 105)
<https://reviews.apache.org/r/48213/#comment201103>

Producer needs to be volatile or you need to hold the producerLock to get 
the producer.

Also this new design can lead to a somewhat bad error - if the producer is 
shut down and then send is used you'll just get an NPE versus an error about 
the producer being stopped. I think the latter would make things easier from a 
troubleshooting perspective.



samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala
 (line 118)
<https://reviews.apache.org/r/48213/#comment201104>

Need to hold a lock for producer or make producer volatile.



samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala
 (line 129)
<https://reviews.apache.org/r/48213/#comment201105>

same re. lock / volatile


- Chris Pettitt


On June 3, 2016, 5:25 p.m., Xinyu Liu wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/48213/
> ---
> 
> (Updated June 3, 2016, 5:25 p.m.)
> 
> 
> Review request for samza, Chris Pettitt, Navina Ramesh, and Yi Pan (Data 
> Infrastructure).
> 
> 
> Repository: samza
> 
> 
> Description
> ---
> 
> All the system producers need to be thread safe in order to be used in 
> multithreaded tasks. The following are the changes 
> (ElasticSearchSystemProducer is already thread safe so no change made there):
> 
> In KafkaSystemProducer, remove the buggy retry logic and treat any exception 
> as fatal.
> In HdfsSystemProducer, add synchronization lock to all public methods.
> 
> 
> Diffs
> -
> 
>   
> samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsSystemProducer.scala
>  1f4b5c46436e44b7c7cd1a49689c4f43f1f6ed1b 
>   
> samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala
>  3769e103616dc0f1fd869706cc086e24cd926c48 
>   
> samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemProducerJava.java
>  04c9113fd6c3dd56c49ff46c8c1c0ff12f68e5e2 
>   
> samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemProducer.scala
>  8e32bba6ced090f0fc8d4e5176fe0788df36981d 
> 
> Diff: https://reviews.apache.org/r/48213/diff/
> 
> 
> Testing
> ---
> 
> Unit tests and local testing.
> 
> 
> Thanks,
> 
> Xinyu Liu
> 
>



Re: Review Request 48182: SAMZA-958: Make store/cache thread safe

2016-06-03 Thread Chris Pettitt

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/48182/#review136075
---


Fix it, then Ship it!





samza-kv-rocksdb/src/test/scala/org/apache/samza/storage/kv/TestRocksDbKeyValueStore.scala
 (line 67)
<https://reviews.apache.org/r/48182/#comment201048>

It looks like this can be broken out into a second test. The first seems to 
be testing a simple flush where as this one appears to be testing the behavior 
of remove with iterators.



samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStore.scala (line 40)
<https://reviews.apache.org/r/48182/#comment201049>

very thread safe? :)



samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStore.scala (line 195)
<https://reviews.apache.org/r/48182/#comment201050>

At the very least I would add a comment that the lock must be held to call 
this method.



samza-test/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStores.scala 
(lines 409 - 410)
<https://reviews.apache.org/r/48182/#comment201052>

I'm not totally sure what you're trying to do with this test, but if you 
want to guarantee that runner1 only starts after runner2 then you should use a 
count down latch.



samza-test/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStores.scala 
(lines 412 - 413)
<https://reviews.apache.org/r/48182/#comment201051>

It's generally a good idea to join with a timeout. If there were a bug this 
would hang the test rather than cause a test failure.



samza-test/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStores.scala 
(lines 505 - 506)
<https://reviews.apache.org/r/48182/#comment201066>

Asserts on another thread are not going to work correctly. They won't fail 
the test - they will kill the thread.



samza-test/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStores.scala 
(line 519)
<https://reviews.apache.org/r/48182/#comment201064>

Isn't there a race here if this code is invoked before the store.put on 
line 492?


- Chris Pettitt


On June 2, 2016, 6:33 p.m., Xinyu Liu wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/48182/
> ---
> 
> (Updated June 2, 2016, 6:33 p.m.)
> 
> 
> Review request for samza, Chris Pettitt, Navina Ramesh, and Yi Pan (Data 
> Infrastructure).
> 
> 
> Repository: samza
> 
> 
> Description
> ---
> 
> All the existing stores/cache need to be thread safe in order to be used by 
> multithreaded tasks. The following changes are made to ensure the thread 
> safety of the stores:
> 
> For CachedStore, use sychronized lock for each public function;
> For current InMemoryKeyValueStore, use ConcurrentSkipListMap as underlying 
> map for thread safety.
> For store Iterator, do not support remove functionality (throw 
> UnsupportedOperationException like RocksDb does today).
> 
> 
> Diffs
> -
> 
>   
> samza-kv-inmemory/src/main/scala/org/apache/samza/storage/kv/inmemory/InMemoryKeyValueStore.scala
>  72f25a354eaa98e8df379d07d9cc8613dfafd13a 
>   
> samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala
>  f0965aec5f3ec2a214dc40c70832c58273623749 
>   
> samza-kv-rocksdb/src/test/scala/org/apache/samza/storage/kv/TestRocksDbKeyValueStore.scala
>  b7f1cdc4dbaeea2413cee2ad60d74528f3950513 
>   samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStore.scala 
> c28f8db8cb59bd5415e78535877acc1e5bee0f67 
>   samza-kv/src/test/scala/org/apache/samza/storage/kv/TestCachedStore.scala 
> eee74473726cb2a36d0b75fe5c9df737440980bc 
>   
> samza-test/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStores.scala
>  23f8a1a6bee8ef38e0640a4e90778e53d982deeb 
> 
> Diff: https://reviews.apache.org/r/48182/diff/
> 
> 
> Testing
> ---
> 
> Unit tests and local deployment.
> 
> 
> Thanks,
> 
> Xinyu Liu
> 
>



Re: Review Request 48080: SAMZA-956: Disk Quotas: Add throttler and disk quota enforcement

2016-06-02 Thread Chris Pettitt
016, 6:24 p.m., Jake Maes wrote:
> > samza-core/src/main/java/org/apache/samza/container/disk/DiskQuotaPolicy.java,
> >  lines 34-35
> > <https://reviews.apache.org/r/48080/diff/1/?file=1402148#file1402148line34>
> >
> > Should there be any verification that lowWaterMarkPercent <= 
> > highWaterMarkPercent and that both of them should be values betweeen 0.0 
> > and 1.0?
> > 
> > Actually, I found the former verification in DiskQuotaEnforcer, but 
> > expected to find it here. I don't see any verification of the min watermark.

Yes, we should verify the lower bound. I put the check in the enforcer because 
I wanted to keep all of the checks in one place, but I can see your argument 
for moving them here. I'll do the upper / lower bounds and their relation 
checks here and do the overlapping range checks in the enforcer. Will also add 
some additional tests :).


> On June 2, 2016, 6:24 p.m., Jake Maes wrote:
> > samza-core/src/main/java/org/apache/samza/util/ThrottlingExecutor.java, 
> > line 70
> > <https://reviews.apache.org/r/48080/diff/1/?file=1402152#file1402152line70>
> >
> > Won't this continue to add more delay if the workFactor stays constant? 
> > So it becomes an rapid decay instead of a static throttle value for each 
> > policy?
> > 
> > It seems the job would essentially halt after a small number of loops 
> > for any factor other than an infintesimally small one.
> > 
> > Also, this behavior seems to overlap with the multiple policies. When I 
> > first saw the multiple policies, I thought, "ok, so to implement a linear 
> > decay, I'd specify a set of policies, each one with a tighter throttle" I 
> > was assuming the throttle rate was static. But seeing this additive code, I 
> > can't imagine using multiple policies because the job will rapidly slow 
> > with even just one.

Sorry, this logic must be a bit to clever - at least a comment in the code is 
warranted. General back story: I would love to drop the whole pendingNanos 
thing and just say "a little error is not going to kill anything". The reason 
pendingNanos exists is that the best precision the JVM can muster for sleep is 
1ms. So we need to accumulate enough to get that first sleep.

We're adding to the pendingNanos here, but we are reducing them a few lines 
down in the sleep call. The sleep call is returning the amount of error on the 
sleep (which may be negative). Previously I actually measured the error here 
instead of in the sleep call and did the decrement inline. I'm not wed to doing 
it in the sleep call - I moved it there because it made the HighResClock a 
little nicer for other potential use cases that would need to do the same. I'd 
be interested in your perspective on one of these or an alternative approach. 
Given the confusion, at least a comment in the code is warranted.


- Chris


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/48080/#review135892
---


On May 31, 2016, 5:27 p.m., Chris Pettitt wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/48080/
> ---
> 
> (Updated May 31, 2016, 5:27 p.m.)
> 
> 
> Review request for samza.
> 
> 
> Repository: samza
> 
> 
> Description
> ---
> 
> This change introduces a ThrottlingExecutor which is used to control the
> rate of execution in the main run loop. The DiskQuotaEnforcer houses the
> rules for switching from one DiskQuotaPolicy to the next as new disk
> usage samples arrive.
> 
> By default, no throttling will occur. New policies can be added using
> the following form:
> 
> ```
> container.disk.quota.bytes=XXX
> container.disk.quota.policy.count=2
> container.disk.quota.policy.0.lowWaterMark=0.4
> container.disk.quota.policy.0.highWaterMark=0.5
> container.disk.quota.policy.0.workFactor=0.5
> container.disk.quota.policy.1.lowWaterMark=0.05
> container.disk.quota.policy.1.highWaterMark=0.1
> container.disk.quota.policy.1.workFactor=0.05
> ```
> 
> See ThrottlingExecutor for details about how the work factor works and
> DiskQuotaPolicy for details about how the low and high water mark
> configuration work.
> 
> 
> Diffs
> -
> 
>   
> samza-core/src/main/java/org/apache/samza/container/disk/DiskQuotaEnforcer.java
>  PRE-CREATION 
>   
> samza-core/src/main/java/org/apache/samza/container/disk/DiskQuotaPolicy.java 
> PRE-CREATION 
>   
> samza-core/src/

Re: Review Request 48109: SAMZA-957 Avoid unnecessary KV Store flushes (part 3)

2016-06-01 Thread Chris Pettitt


> On June 1, 2016, 3:20 p.m., Chris Pettitt wrote:
> > samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStore.scala, line 
> > 179
> > <https://reviews.apache.org/r/48109/diff/1/?file=1402973#file1402973line179>
> >
> > I think you could even get away with returning true here only if the 
> > key is an array and we're already doing the check earlier. That would also 
> > allow the hasArrayKeys method to be dropped along with the 
> > containsArrayKeys field, which would later need to be guarded by a lock or 
> > made volatile. This would be an improvement but is tangential to your 
> > change. It would be fine to bias towards safety in doubt.
> 
> Jake Maes wrote:
> I like the idea of getting rid of hasArrayKeys, but I'm not sure how to 
> get rid of containsArrayKeys if we want to preserve 1) printing the warning 
> only once, 2) performance of put(). Although instanceOf is supposed to be 
> fast, it's probably not as fast as checking a boolean. Any suggestions?
> 
> As far as only returning true if the key is an array, that would 
> basically disable the write batching. The current code seems to purge dirty 
> values if there are writeBatchSize dirty keys or if the cache is full and the 
> LRU item is dirty. So the check above is similar, but for a different purpose.

You're right. We must take the hit of some additional state if we want to log a 
warning at runtime. Regarding the second paragraph, I wasn't intending to 
change the behavior for the other branches.


- Chris


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/48109/#review135803
---


On June 1, 2016, 2:23 a.m., Jake Maes wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/48109/
> -------
> 
> (Updated June 1, 2016, 2:23 a.m.)
> 
> 
> Review request for samza, Boris Shkolnik, Chris Pettitt, Jake Maes, Navina 
> Ramesh, Jagadish Venkatraman, Xinyu Liu, and Yi Pan (Data Infrastructure).
> 
> 
> Bugs: SAMZA-957
> https://issues.apache.org/jira/browse/SAMZA-957
> 
> 
> Repository: samza
> 
> 
> Description
> ---
> 
> SAMZA-957 Avoid unnecessary KV Store flushes (part 3)
> 
> 
> Diffs
> -
> 
>   samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStore.scala 
> c28f8db8cb59bd5415e78535877acc1e5bee0f67 
>   samza-kv/src/test/scala/org/apache/samza/storage/kv/TestCachedStore.scala 
> eee74473726cb2a36d0b75fe5c9df737440980bc 
> 
> Diff: https://reviews.apache.org/r/48109/diff/
> 
> 
> Testing
> ---
> 
> ./gradlew build
> 
> 
> Thanks,
> 
> Jake Maes
> 
>



Review Request 48080: SAMZA-956: Disk Quotas: Add throttler and disk quota enforcement

2016-05-31 Thread Chris Pettitt

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/48080/
---

Review request for samza.


Repository: samza


Description
---

This change introduces a ThrottlingExecutor which is used to control the
rate of execution in the main run loop. The DiskQuotaEnforcer houses the
rules for switching from one DiskQuotaPolicy to the next as new disk
usage samples arrive.

By default, no throttling will occur. New policies can be added using
the following form:

```
container.disk.quota.bytes=XXX
container.disk.quota.policy.count=2
container.disk.quota.policy.0.lowWaterMark=0.4
container.disk.quota.policy.0.highWaterMark=0.5
container.disk.quota.policy.0.workFactor=0.5
container.disk.quota.policy.1.lowWaterMark=0.05
container.disk.quota.policy.1.highWaterMark=0.1
container.disk.quota.policy.1.workFactor=0.05
```

See ThrottlingExecutor for details about how the work factor works and
DiskQuotaPolicy for details about how the low and high water mark
configuration work.


Diffs
-

  
samza-core/src/main/java/org/apache/samza/container/disk/DiskQuotaEnforcer.java 
PRE-CREATION 
  samza-core/src/main/java/org/apache/samza/container/disk/DiskQuotaPolicy.java 
PRE-CREATION 
  
samza-core/src/main/java/org/apache/samza/container/disk/DiskSpaceMonitor.java 
2a565be7858a4d3a6adbc49989b43b71ca3f6721 
  samza-core/src/main/java/org/apache/samza/util/HighResolutionClock.java 
PRE-CREATION 
  samza-core/src/main/java/org/apache/samza/util/SystemHighResolutionClock.java 
PRE-CREATION 
  samza-core/src/main/java/org/apache/samza/util/ThrottlingExecutor.java 
PRE-CREATION 
  samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala 
3f25eca6e3dffc57360e8bd8c435177c2a9a910a 
  samza-core/src/main/scala/org/apache/samza/container/SameThreadExecutor.scala 
PRE-CREATION 
  samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 
cf3c4c0ab08a59760bc899c6f2027755e933b350 
  
samza-core/src/main/scala/org/apache/samza/container/SamzaContainerMetrics.scala
 9e6641c3628290dc05e1eb5537e86bff9d37f92c 
  samza-core/src/main/scala/org/apache/samza/util/Util.scala 
fc3d085d7fff9f7dcec766ba48e550eb0052e99d 
  
samza-core/src/test/java/org/apache/samza/container/disk/TestDiskQuotaEnforcer.java
 PRE-CREATION 
  samza-core/src/test/java/org/apache/samza/util/TestThrottlingExecutor.java 
PRE-CREATION 
  samza-core/src/test/scala/org/apache/samza/util/TestUtil.scala 
05b4e5c37578340eefe6d412f5a76f540bec6fa6 

Diff: https://reviews.apache.org/r/48080/diff/


Testing
---

- Added new unit tests
- Ran existing tests with gradle test
- Verified throttling behavior and instrumentation with local deployment
- Verified average latency impact of feature to be < 150ns for Linux and OSX


Thanks,

Chris Pettitt



Re: Review Request 47197: SAMZA-948 CoordinatorSystemStreamConsumer is not threadsafe

2016-05-11 Thread Chris Pettitt

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/47197/#review132785
---


Ship it!





samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemConsumer.java
 (line 66)
<https://reviews.apache.org/r/47197/#comment197035>

You actually don't need to wrap emptySet because it's already immutable.


- Chris Pettitt


On May 11, 2016, 8:13 p.m., Jake Maes wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/47197/
> ---
> 
> (Updated May 11, 2016, 8:13 p.m.)
> 
> 
> Review request for samza, Boris Shkolnik, Chris Pettitt, Jake Maes, Navina 
> Ramesh, Jagadish Venkatraman, Xinyu Liu, and Yi Pan (Data Infrastructure).
> 
> 
> Bugs: SAMZA-948
> https://issues.apache.org/jira/browse/SAMZA-948
> 
> 
> Repository: samza
> 
> 
> Description
> ---
> 
> SAMZA-948 CoordinatorSystemStreamConsumer is not threadsafe
> 
> See the stack traces in the JIRA for more context. Essentially the consumer 
> can bootstrap concurrently from multiple code paths (AM UI, RM Client 
> callbacks, etc) and with the remove() logic that was added in SAMZA-913, we 
> can get ConcurrentModificationExceptions. 
> 
> Fix:
> * Use an AtomicReference to swap in the updated messages when they are ready 
> * In bootstrap()
> * Acquire a lock
> * Make a copy of the messages
> * Append the new messages
> * Set the atomic reference to the copy
> * Release lock
> 
> Also sneaking in a log message fix for JobCoordinator. It previously didn't 
> include the task names.
> 
> 
> Diffs
> -
> 
>   
> samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemConsumer.java
>  8e1057b4d055159acb49d2cc60d3acad7665a532 
>   samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala 
> 03f48db7f42b2617995b14cf51248b82b6cc2636 
> 
> Diff: https://reviews.apache.org/r/47197/diff/
> 
> 
> Testing
> ---
> 
> ./gradlew build
> 
> 
> Thanks,
> 
> Jake Maes
> 
>



Re: Review Request 47197: SAMZA-948 CoordinatorSystemStreamConsumer is not threadsafe

2016-05-11 Thread Chris Pettitt

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/47197/#review132710
---


Fix it, then Ship it!





samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemConsumer.java
 (line 62)
<https://reviews.apache.org/r/47197/#comment196973>

If this is really code that can be run from multiple threads, as opposed to 
code that was blowing up due to ConcurrentModificationException (which is 
sometimes a misleading name), then this needs to be volatile.

isStarted might also need to be volatile, but I didn't look at how it was 
being used.



samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemConsumer.java
 (line 65)
<https://reviews.apache.org/r/47197/#comment196969>

You only need volatile here (vs. AtomicReference) since you're not using 
any CAS operation.

For full safety, you need to wrap the set in an unmodifiable wrapper. 
Otherwise it would be possible to modify the set via "read only" methods like 
getBootstrappedStream.


- Chris Pettitt


On May 10, 2016, 11:07 p.m., Jake Maes wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/47197/
> ---
> 
> (Updated May 10, 2016, 11:07 p.m.)
> 
> 
> Review request for samza, Boris Shkolnik, Chris Pettitt, Jake Maes, Navina 
> Ramesh, Jagadish Venkatraman, Xinyu Liu, and Yi Pan (Data Infrastructure).
> 
> 
> Bugs: SAMZA-948
> https://issues.apache.org/jira/browse/SAMZA-948
> 
> 
> Repository: samza
> 
> 
> Description
> ---
> 
> SAMZA-948 CoordinatorSystemStreamConsumer is not threadsafe
> 
> See the stack traces in the JIRA for more context. Essentially the consumer 
> can bootstrap concurrently from multiple code paths (AM UI, RM Client 
> callbacks, etc) and with the remove() logic that was added in SAMZA-913, we 
> can get ConcurrentModificationExceptions. 
> 
> Fix:
> * Use an AtomicReference to swap in the updated messages when they are ready 
> * In bootstrap()
> * Acquire a lock
> * Make a copy of the messages
> * Append the new messages
> * Set the atomic reference to the copy
> * Release lock
> 
> Also sneaking in a log message fix for JobCoordinator. It previously didn't 
> include the task names.
> 
> 
> Diffs
> -
> 
>   
> samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemConsumer.java
>  8e1057b4d055159acb49d2cc60d3acad7665a532 
>   samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala 
> 03f48db7f42b2617995b14cf51248b82b6cc2636 
> 
> Diff: https://reviews.apache.org/r/47197/diff/
> 
> 
> Testing
> ---
> 
> ./gradlew build
> 
> 
> Thanks,
> 
> Jake Maes
> 
>



Re: Review Request 46644: SAMZA-889 - Change log not working properly with In memory Store

2016-05-05 Thread Chris Pettitt

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/46644/#review131932
---


Fix it, then Ship it!




Looks good. Minor tweak suggested below.


samza-api/src/main/java/org/apache/samza/storage/StoreProperties.java (lines 25 
- 31)
<https://reviews.apache.org/r/46644/#comment195978>

I would actually move this doc to the interface. I would imagine folks 
would be looking at methods versus the private fields (in fact, I believe 
javadoc is not generated for private members by default).


- Chris Pettitt


On May 5, 2016, 9:13 p.m., Navina Ramesh wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/46644/
> ---
> 
> (Updated May 5, 2016, 9:13 p.m.)
> 
> 
> Review request for samza, Boris Shkolnik, Chris Pettitt, and Yi Pan (Data 
> Infrastructure).
> 
> 
> Repository: samza
> 
> 
> Description
> ---
> 
> Adding StoreProperties that is accessible from the StorageEngine
> 
> Added a lifecycle unit test for TaskStorageManager
> 
> Question: Is there a better way to refactor the TaskStorageManager class?
> 
> 
> Diffs
> -
> 
>   checkstyle/import-control.xml 9afab881f2a822925ae716e1dbd744b86321c34e 
>   samza-api/src/main/java/org/apache/samza/storage/StorageEngine.java 
> 5463648fd01e0cba52fa9bd9a33b247e7014cfde 
>   samza-api/src/main/java/org/apache/samza/storage/StorageEngineFactory.java 
> adb62643a311e25fb4fed91c39e1a75cd5664b17 
>   samza-api/src/main/java/org/apache/samza/storage/StoreProperties.java 
> PRE-CREATION 
>   samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 
> 5462208c08cddbfd30d886daffa8c02c82447059 
>   samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala 
> c7b05203a1958a62af9dec04b215d985c4646dc4 
>   samza-core/src/test/java/org/apache/samza/storage/MockStorageEngine.java 
> b90ea87b7e575e646c58ddfb5a53ced9ed04a880 
>   
> samza-core/src/test/java/org/apache/samza/storage/MockStorageEngineFactory.java
>  c00c4547307f5a8b401c6bb6438eaa7fb8a38651 
>   samza-core/src/test/java/org/apache/samza/storage/TestStorageRecovery.java 
> 13f4fa97d42b02e54634c8de1575118ca0433fe8 
>   
> samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala
>  c8ea64c7c67dd6bf789d2a3445d620ccef1beac0 
>   
> samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStorageEngineFactory.scala
>  dae6e35d1ba75daf5c816bccbc625c623a44d3b2 
>   
> samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala
>  f0965aec5f3ec2a214dc40c70832c58273623749 
>   
> samza-kv/src/main/scala/org/apache/samza/storage/kv/BaseKeyValueStorageEngineFactory.scala
>  391cf89b05f90ececae63160cd3cb9c811e4ab66 
>   
> samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngine.scala
>  e5a66a4770b9553a1cc48fbb505f52d123c6c754 
>   
> samza-test/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStores.scala
>  23f8a1a6bee8ef38e0640a4e90778e53d982deeb 
> 
> Diff: https://reviews.apache.org/r/46644/diff/
> 
> 
> Testing
> ---
> 
> ./gradlew clean build
> TODO: Test with sample job to verify that the behavior reported in SAMZA-889 
> is fixed
> 
> 
> Thanks,
> 
> Navina Ramesh
> 
>



Re: Review Request 46644: SAMZA-889 - Change log not working properly with In memory Store

2016-05-05 Thread Chris Pettitt

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/46644/#review131835
---


Fix it, then Ship it!




Looks good. I'd suggest moving non-change-related code to a separate cleanup RB.


samza-api/src/main/java/org/apache/samza/storage/StoreProperties.java (line 29)
<https://reviews.apache.org/r/46644/#comment195875>

Generally you make the constructor private, but this is such a trivial 
value object that I wouldn't have a strong opinion either way.



samza-api/src/main/java/org/apache/samza/storage/StoreProperties.java (lines 31 
- 33)
<https://reviews.apache.org/r/46644/#comment195873>

Prefer boolean to Boolean through out. Boolean is a boxed type (thus 
nullable), introducing a third state whereas boolean has only two states to 
manage.



samza-api/src/main/java/org/apache/samza/storage/StoreProperties.java (line 40)
<https://reviews.apache.org/r/46644/#comment195879>

It would be great to have some documentation about what each of these 
properties means. It's probably already documented somewhere, so a javadoc link 
might be sufficient.



samza-api/src/main/java/org/apache/samza/storage/StoreProperties.java (lines 43 
- 45)
<https://reviews.apache.org/r/46644/#comment195876>

What is this for? It doesn't appear to be used. Generally bias towards not 
adding things that are unused ceteris paribus.



samza-test/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStores.scala 
(line 18)
<https://reviews.apache.org/r/46644/#comment195880>

Generally it is a best practice to seperate out non-change-related clean up 
to a separate commit. This helps when working with history (e.g. git bisect).


- Chris Pettitt


On April 25, 2016, 5:25 p.m., Navina Ramesh wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/46644/
> ---
> 
> (Updated April 25, 2016, 5:25 p.m.)
> 
> 
> Review request for samza, Boris Shkolnik, Chris Pettitt, and Yi Pan (Data 
> Infrastructure).
> 
> 
> Repository: samza
> 
> 
> Description
> ---
> 
> Adding StoreProperties that is accessible from the StorageEngine
> 
> Added a lifecycle unit test for TaskStorageManager
> 
> Question: Is there a better way to refactor the TaskStorageManager class?
> 
> 
> Diffs
> -
> 
>   checkstyle/import-control.xml c15b8e74de8e5aac5ac83278c52ab3dba1630e50 
>   samza-api/src/main/java/org/apache/samza/storage/StorageEngine.java 
> 5463648fd01e0cba52fa9bd9a33b247e7014cfde 
>   samza-api/src/main/java/org/apache/samza/storage/StorageEngineFactory.java 
> adb62643a311e25fb4fed91c39e1a75cd5664b17 
>   samza-api/src/main/java/org/apache/samza/storage/StoreProperties.java 
> PRE-CREATION 
>   samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 
> 5462208c08cddbfd30d886daffa8c02c82447059 
>   samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala 
> c7b05203a1958a62af9dec04b215d985c4646dc4 
>   samza-core/src/test/java/org/apache/samza/storage/MockStorageEngine.java 
> b90ea87b7e575e646c58ddfb5a53ced9ed04a880 
>   
> samza-core/src/test/java/org/apache/samza/storage/MockStorageEngineFactory.java
>  c00c4547307f5a8b401c6bb6438eaa7fb8a38651 
>   samza-core/src/test/java/org/apache/samza/storage/TestStorageRecovery.java 
> 13f4fa97d42b02e54634c8de1575118ca0433fe8 
>   
> samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala
>  c8ea64c7c67dd6bf789d2a3445d620ccef1beac0 
>   
> samza-kv-inmemory/src/main/scala/org/apache/samza/storage/kv/inmemory/InMemoryKeyValueStorageEngineFactory.scala
>  53147ad4faae2fae24a5bc0677167d06c64afead 
>   
> samza-kv-inmemory/src/main/scala/org/apache/samza/storage/kv/inmemory/InMemoryKeyValueStore.scala
>  661a83517c5a603c841d4a373ac979724457471c 
>   
> samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStorageEngineFactory.scala
>  dae6e35d1ba75daf5c816bccbc625c623a44d3b2 
>   
> samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala
>  b896810ccf7f12d72195f07cac27ba5cc510077d 
>   
> samza-kv/src/main/scala/org/apache/samza/storage/kv/BaseKeyValueStorageEngineFactory.scala
>  391cf89b05f90ececae63160cd3cb9c811e4ab66 
>   samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStore.scala 
> 4c4e82eb5be82d469fe3c5f85b92523faeb0a193 
>   
> samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngine.scala
>  defc91e7e28b1d9419b98f363f9989d58511e923 
>   samza-kv/src/main/scala/org/apache/samza/storage/k

Re: Review Request 46856: SAMZA-943 Occasional test failure: TestStreamPartitionCountMonitor.testStartStopBehavior

2016-05-03 Thread Chris Pettitt


> On May 3, 2016, 5:23 p.m., Chris Pettitt wrote:
> > samza-core/src/main/java/org/apache/samza/coordinator/StreamPartitionCountMonitor.java,
> >  line 107
> > <https://reviews.apache.org/r/46856/diff/2/?file=1367775#file1367775line107>
> >
> > This is not thread-safe, but I believe it could be made so without 
> > requiring volatile / atomics / locks. Could we initialize all of the guages 
> > in the constructor and place them in an immutable map?
> 
> Jake Maes wrote:
> Does it need to be thread-safe? The executor is single-threaded, start() 
> is idempotent, and this method is private. Is there a hole I'm not seeing?

Yes, that is a fair argument to make. However, those assumptions may not hold 
over time. Assuming that we can construct these in the constructor and make the 
map immutable at no cost, I think we should make this thread safe.

There are also assumptions about how the tests are run. For example, getGauges 
exposes the map, which would require that test code does enough to guarantee 
visibility (this holds now, may not later).

If there is a cost to make this thread-safe then this is not as clear cut 
(though I know how I'd bias). Would be interested in the details if this is the 
case.


- Chris


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/46856/#review131525
---


On April 29, 2016, 11:38 p.m., Jake Maes wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/46856/
> ---
> 
> (Updated April 29, 2016, 11:38 p.m.)
> 
> 
> Review request for samza, Boris Shkolnik, Navina Ramesh, Jagadish 
> Venkatraman, and Yi Pan (Data Infrastructure).
> 
> 
> Bugs: SAMZA-943
> https://issues.apache.org/jira/browse/SAMZA-943
> 
> 
> Repository: samza
> 
> 
> Description
> ---
> 
> SAMZA-943 Occasional test failure: 
> TestStreamPartitionCountMonitor.testStartStopBehavior
> 
> * Rewrote the monitor in Java following the pattern of the 
> PollingScanDiskSpaceMonitor in SAMZA-924
>   ** The main difference is that it uses a ScheduledExecutorService to 
> cleanly run the monitor in a loop and provide determinism around startup and 
> shutdown
> * Got rid of the sleep() in the unit test
> * Added a unit test to verify the scheduler calls the monitor method
> * Enforced that the monitor isn't restarted (which is a problem for the 
> scheduler service)
>   ** This required that the reference to the monitor not be static (defined 
> in the JobCoordinator object) and instead instantiated whenever the 
> JobCoordinator is instantiated.
> 
> 
> Diffs
> -
> 
>   checkstyle/import-control.xml c15b8e74de8e5aac5ac83278c52ab3dba1630e50 
>   
> samza-core/src/main/java/org/apache/samza/coordinator/StreamPartitionCountMonitor.java
>  PRE-CREATION 
>   samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala 
> 384b2e777c73fc1e4bc8a29312c9ea5372162ca1 
>   
> samza-core/src/main/scala/org/apache/samza/coordinator/StreamPartitionCountMonitor.scala
>  6aeff5787a0018ca2cae7d901c25537fbc7dea23 
>   
> samza-core/src/test/scala/org/apache/samza/coordinator/TestStreamPartitionCountMonitor.scala
>  f47f8189bd92c4071ae76ae323e066823f3a6f61 
> 
> Diff: https://reviews.apache.org/r/46856/diff/
> 
> 
> Testing
> ---
> 
> Added a test. 
> 
> Ran check-all.sh
> 
> 
> Thanks,
> 
> Jake Maes
> 
>



Re: Review Request 46856: SAMZA-943 Occasional test failure: TestStreamPartitionCountMonitor.testStartStopBehavior

2016-05-03 Thread Chris Pettitt

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/46856/#review131525
---


Fix it, then Ship it!




Looks good. Mostly minor stuff that you can choose to take or ignore.


samza-core/src/main/java/org/apache/samza/coordinator/StreamPartitionCountMonitor.java
 (line 53)
<https://reviews.apache.org/r/46856/#comment195535>

cosmetic: extra WS.



samza-core/src/main/java/org/apache/samza/coordinator/StreamPartitionCountMonitor.java
 (line 93)
<https://reviews.apache.org/r/46856/#comment195536>

cosmetic: maybe call this initialPartitionCount? Initially when I saw 
prevPartitionCount I thought that you might be saving the current count for the 
next loop, which would be problematic (e.g. non-zero delta missed due to 
polling interval). This approach looks good, but naming could be slightly 
clearer.



samza-core/src/main/java/org/apache/samza/coordinator/StreamPartitionCountMonitor.java
 (line 107)
<https://reviews.apache.org/r/46856/#comment195538>

This is not thread-safe, but I believe it could be made so without 
requiring volatile / atomics / locks. Could we initialize all of the guages in 
the constructor and place them in an immutable map?



samza-core/src/main/java/org/apache/samza/coordinator/StreamPartitionCountMonitor.java
 (line 127)
<https://reviews.apache.org/r/46856/#comment195540>

This looks like it could be made static (e.g. as a static helper function) 
as it doesn't require any state from the class.



samza-core/src/main/java/org/apache/samza/coordinator/StreamPartitionCountMonitor.java
 (line 184)
<https://reviews.apache.org/r/46856/#comment195544>

state should be volatile otherwise you're not guaranteed visibility. I 
believe the test code is accessing this in a thread-safe way, but if this ever 
gets promoted to a public method it could lead to surprising behavior.

Minor note: usually state == State.RUNNING is sufficient. I think you're 
checking the terminated flag for test, but the test code is already doing a 
timed wait, so you could switch it to 
`assertTrue(monitor.awaitTermination(...))` for the same effect.


- Chris Pettitt


On April 29, 2016, 11:38 p.m., Jake Maes wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/46856/
> ---
> 
> (Updated April 29, 2016, 11:38 p.m.)
> 
> 
> Review request for samza, Boris Shkolnik, Navina Ramesh, Jagadish 
> Venkatraman, and Yi Pan (Data Infrastructure).
> 
> 
> Bugs: SAMZA-943
> https://issues.apache.org/jira/browse/SAMZA-943
> 
> 
> Repository: samza
> 
> 
> Description
> ---
> 
> SAMZA-943 Occasional test failure: 
> TestStreamPartitionCountMonitor.testStartStopBehavior
> 
> * Rewrote the monitor in Java following the pattern of the 
> PollingScanDiskSpaceMonitor in SAMZA-924
>   ** The main difference is that it uses a ScheduledExecutorService to 
> cleanly run the monitor in a loop and provide determinism around startup and 
> shutdown
> * Got rid of the sleep() in the unit test
> * Added a unit test to verify the scheduler calls the monitor method
> * Enforced that the monitor isn't restarted (which is a problem for the 
> scheduler service)
>   ** This required that the reference to the monitor not be static (defined 
> in the JobCoordinator object) and instead instantiated whenever the 
> JobCoordinator is instantiated.
> 
> 
> Diffs
> -
> 
>   checkstyle/import-control.xml c15b8e74de8e5aac5ac83278c52ab3dba1630e50 
>   
> samza-core/src/main/java/org/apache/samza/coordinator/StreamPartitionCountMonitor.java
>  PRE-CREATION 
>   samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala 
> 384b2e777c73fc1e4bc8a29312c9ea5372162ca1 
>   
> samza-core/src/main/scala/org/apache/samza/coordinator/StreamPartitionCountMonitor.scala
>  6aeff5787a0018ca2cae7d901c25537fbc7dea23 
>   
> samza-core/src/test/scala/org/apache/samza/coordinator/TestStreamPartitionCountMonitor.scala
>  f47f8189bd92c4071ae76ae323e066823f3a6f61 
> 
> Diff: https://reviews.apache.org/r/46856/diff/
> 
> 
> Testing
> ---
> 
> Added a test. 
> 
> Ran check-all.sh
> 
> 
> Thanks,
> 
> Jake Maes
> 
>



Re: Review Request 45258: Abandon producer retry after a certain # of errors : SAMZA-911

2016-04-14 Thread Chris Pettitt

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/45258/#review128972
---




samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala
 (line 44)
<https://reviews.apache.org/r/45258/#comment192433>

Is 30 arbitrary or is there some significance to the number?



samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala
 (line 71)
<https://reviews.apache.org/r/45258/#comment192431>

This is not directly related to your commit, but:

This function is not reentrant. Is that expected? For example, it would be 
possible for a the Kafka callback to set sendFailed to true but a subsequent 
send would reset this flag.

---

Separately, there is an implicit assumption that retryBackoff is providing 
a happens-before constraint between an invocation of the exception handler in 
retryBackoff and a subsequent invocation of the loop. This likely always holds, 
but a CAS for numRetries would cover cases where that does not hold.


- Chris Pettitt


On March 24, 2016, 1:10 a.m., Jagadish Venkatraman wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/45258/
> ---
> 
> (Updated March 24, 2016, 1:10 a.m.)
> 
> 
> Review request for samza, Jake Maes, Navina Ramesh, and Yi Pan (Data 
> Infrastructure).
> 
> 
> Repository: samza
> 
> 
> Description
> ---
> 
> Currently, the KafkaSystemProducer's producer loop keeps retrying 
> indefinitely when there is an exception in the retryBackOff loop. If there 
> are repeated exceptions, then it makes sense to retry for awhile, and then 
> fail the container.
> 
> Long term, we should focus on getting rid off the retryBackOff loop, and 
> close the producer object in the callback during failure. This will guarantee 
> in-order delivery.
> 
> 1.Modified the KafkaSystemProducer to take a maxRetries. (currently, its set 
> to 30).
> 2.Add tests to verify retry in case of RetriableExceptions.
> 
> 
> Diffs
> -
> 
>   
> samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala
>  9a44d46d29a1997958a9d2bbf7be0bde860fff64 
>   
> samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemProducer.scala
>  39426d8cf64516ec4fdc0cb4ff60b1df3a757470 
> 
> Diff: https://reviews.apache.org/r/45258/diff/
> 
> 
> Testing
> ---
> 
> Added unit tests to verify functionality.
> 
> 
> Thanks,
> 
> Jagadish Venkatraman
> 
>



Re: Review Request 44920: SAMZA-680 Refactor the Samza AppMaster to support other cluster managers

2016-04-14 Thread Chris Pettitt


> On April 11, 2016, 5:08 p.m., Jake Maes wrote:
> > samza-core/src/main/java/org/apache/samza/clustermanager/AbstractContainerAllocator.java,
> >  line 114
> > <https://reviews.apache.org/r/44920/diff/7/?file=1337656#file1337656line114>
> >
> > Will this accomplish anything if we don't exit the loop?
> > 
> > If I'm reading this correctly, it will just set the flag, which will 
> > cause an InterruptedException in the next Thread.sleep, which will again 
> > get caught here. So it turns into a busy wait. 
> > 
> > I think rethrowing the InterruptedException or just returning are 
> > better options

I like the idea of returning (perhaps via isRunning = false, if its necessary). 
I would suggest keeping the interrupt here, though - it may not be required 
today but keeps the code more resilient to refactoring. InterruptedException is 
checked so you can't throw it from run. In any case it is a less direct way to 
manage the thread (it would kill the thread, but may involve uncaught exception 
handlers).


- Chris


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/44920/#review128148
---


On April 12, 2016, 11:45 p.m., Jagadish Venkatraman wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/44920/
> ---
> 
> (Updated April 12, 2016, 11:45 p.m.)
> 
> 
> Review request for samza, Boris Shkolnik, Chris Pettitt, Jake Maes, Yi Pan 
> (Data Infrastructure), Navina Ramesh, and Xinyu Liu.
> 
> 
> Repository: samza
> 
> 
> Description
> ---
> 
> 1.Proposed new APIs for running Samza without Yarn. (SAMZA-881)
>- Defined the ContainerProcessManager abstraction.
>- Defined the SamzaResource, SamzaResourceRequest. 
>- Re-wrote the SamzaAppMaster logic into a ClusterBasedJobCoordinator.
> 2.Defined a ClusterManagerConfig to handle configurations independent of 
> Yarn/Mesos.
> 3.Made Samza completely independent of Yarn. This cleanly separates Samza 
> specific components and Yarn
> specific components.
> 4.Readability improvements to the existing code base.
>-Added explicity documentation for every method, member and class 
> (including on thread-safety)
>- Made internal variables final to document intent, visibility across 
> threads. (trivially by adding modifiers, or by changing where they're 
> initialized.)
> 5.Refactored JobCoordinator into a JobModelReader.
> 
> == Diff2 ==
> Address Chriss review feedback.
> 
> Design Doc: 
> https://issues.apache.org/jira/secure/attachment/12790540/SamzaJobCoordinatorRe-designProposal.pdf
> 
> == Diff 3 ==
> Address Yi's feedback
> 
> == Diff 4 ==
> Sync with current master
> 
> 
> Diffs
> -
> 
>   checkstyle/import-control.xml c15b8e74de8e5aac5ac83278c52ab3dba1630e50 
>   
> samza-core/src/main/java/org/apache/samza/clustermanager/AbstractContainerAllocator.java
>  PRE-CREATION 
>   
> samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java
>  PRE-CREATION 
>   
> samza-core/src/main/java/org/apache/samza/clustermanager/ClusterResourceManager.java
>  PRE-CREATION 
>   
> samza-core/src/main/java/org/apache/samza/clustermanager/ContainerAllocator.java
>  PRE-CREATION 
>   
> samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java
>  PRE-CREATION 
>   
> samza-core/src/main/java/org/apache/samza/clustermanager/HostAwareContainerAllocator.java
>  PRE-CREATION 
>   
> samza-core/src/main/java/org/apache/samza/clustermanager/ResourceFailure.java 
> PRE-CREATION 
>   
> samza-core/src/main/java/org/apache/samza/clustermanager/ResourceManagerFactory.java
>  PRE-CREATION 
>   
> samza-core/src/main/java/org/apache/samza/clustermanager/ResourceRequestState.java
>  PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/clustermanager/SamzaAppState.java 
> PRE-CREATION 
>   
> samza-core/src/main/java/org/apache/samza/clustermanager/SamzaContainerLaunchException.java
>  PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/clustermanager/SamzaResource.java 
> PRE-CREATION 
>   
> samza-core/src/main/java/org/apache/samza/clustermanager/SamzaResourceRequest.java
>  PRE-CREATION 
>   
> samza-core/src/main/java/org/apache/samza/clustermanager/SamzaResourceStatus.java
>  PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/config/ClusterManagerConfig

Re: Review Request 45190: SAMZA-910 Fix expired request test in HostAwareContainerAllocator

2016-04-08 Thread Chris Pettitt

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/45190/#review127874
---


Fix it, then Ship it!





samza-yarn/src/test/java/org/apache/samza/job/yarn/TestHostAwareContainerAllocator.java
 (line 59)
<https://reviews.apache.org/r/45190/#comment191234>

It looks like this and TestContainerAllocator have some overlap. I wonder 
if it would be easy to have a base class to cover the common functionality? Not 
necessary for this RB as it is not related to your change, but worth 
considering.



samza-yarn/src/test/java/org/apache/samza/job/yarn/TestHostAwareContainerAllocator.java
 (line 359)
<https://reviews.apache.org/r/45190/#comment191233>

Not necessarily specific to your RB, but maybe a low hanging fruit: I would 
suggest extracting out all of the Runnables into local variables. That way you 
can document what they're doing via the name and the construction of listener 
is easier to follow.



samza-yarn/src/test/java/org/apache/samza/job/yarn/util/MockContainerListener.java
 (line 76)
<https://reviews.apache.org/r/45190/#comment191236>

This doesn't handle spurious wakeups: 
https://docs.oracle.com/javase/7/docs/api/java/lang/Object.html#wait()

---

Assuming that the order in which post condition assertions are applied does 
not matter (maybe a bad assumption?) I think this could be simplified massively 
by using a CountDownLatch to join once everything is done. You could make the 
numXXX fields volatile and after join check all of the post conditions on the 
main thread (e.g. in the test code that calls this). One other nice thing about 
this approach (other than killing off several LoC) is that you get a timeout 
for the whole process versus timeouts on individual steps.


Looks very good! I especially like the more intuitive host names.

- Chris Pettitt


On April 8, 2016, 3:02 p.m., Jake Maes wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/45190/
> ---
> 
> (Updated April 8, 2016, 3:02 p.m.)
> 
> 
> Review request for samza.
> 
> 
> Repository: samza
> 
> 
> Description
> ---
> 
> SAMZA-910 Fix expired request test in HostAwareContainerAllocator
> 
> Summary of changes:
> 1. Remove the last sleep() from HostAwareContainerAllocator
> 2. Fix a silent failure in testRerequestOnAnyHostIfContainerStartFails by 
> setting the neededContainers to 1 before running the test.
> 3. Update MockContainerListener so assertion failures in other threads are 
> thrown in the main thread to fail the test. (no silent failures) This should 
> help troubleshoot the tests if there are any remaining issues.
> 4. Rename obscure hostnames to make it easier to reason about the tests.
> 
> 
> Diffs
> -
> 
>   
> samza-yarn/src/test/java/org/apache/samza/job/yarn/TestContainerAllocator.java
>  b253f98f7258bb611e1ad6672f74b07ab7e20b70 
>   
> samza-yarn/src/test/java/org/apache/samza/job/yarn/TestHostAwareContainerAllocator.java
>  93e430b6ee986b06ecdac4979552d774724a1fbd 
>   
> samza-yarn/src/test/java/org/apache/samza/job/yarn/util/MockContainerListener.java
>  cb82cccf75b54cfbefd586700e8283cb41173833 
>   
> samza-yarn/src/test/java/org/apache/samza/job/yarn/util/MockContainerRequestState.java
>  879a7d0d06b087cfe0417f3fa5801b43ac7fc458 
>   
> samza-yarn/src/test/java/org/apache/samza/job/yarn/util/MockContainerUtil.java
>  2f9669f8b7e77abb65b244ccd067ae7ab1f245c3 
> 
> Diff: https://reviews.apache.org/r/45190/diff/
> 
> 
> Testing
> ---
> 
> 
> Thanks,
> 
> Jake Maes
> 
>



Review Request 45504: SAMZA-924: Add disk space monitoring

2016-03-30 Thread Chris Pettitt

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/45504/
---

Review request for samza.


Repository: samza


Description
---

This change introduces the measurement of disk usage for selected
directories (currently those used by stores only). The feature is off by
default, but can be enabled by setting "container.disk.poll.interval.ms"
to a non-zero value.


Diffs
-

  
samza-core/src/main/java/org/apache/samza/container/disk/DiskSpaceMonitor.java 
PRE-CREATION 
  
samza-core/src/main/java/org/apache/samza/container/disk/PollingScanDiskSpaceMonitor.java
 PRE-CREATION 
  samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 
bcbc90a0a460f8733e6d3a50dbc33f3720cad7d0 
  
samza-core/src/main/scala/org/apache/samza/container/SamzaContainerMetrics.scala
 6fae6509d177cc3a54dac9ad1d3e5cc479f4a4f5 
  
samza-core/src/test/java/org/apache/samza/container/disk/TestPollingScanDiskSpaceMonitor.java
 PRE-CREATION 

Diff: https://reviews.apache.org/r/45504/diff/


Testing
---

- Added tests for our disk space monitoring implementation.
- Verified metrics are correctly reported and updated when enabling the feature
- Verified metrics are not attached to the container when the feature is 
disabled

Perf testing for this feature is still pending and is a requirement for
this to be committed.


Thanks,

Chris Pettitt



Re: Review Request 44920: Remove tight coupling of Samza with Yarn. Define APIs for resource manager integration

2016-03-19 Thread Chris Pettitt


> On March 16, 2016, 11:10 p.m., Chris Pettitt wrote:
> > Some more comments

Sorry, did not mean to create issues for all of the below. I think #1 and #2 
are the most interesting to look at of the group.


- Chris


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/44920/#review123928
---


On March 16, 2016, 6:23 p.m., Jagadish Venkatraman wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/44920/
> ---
> 
> (Updated March 16, 2016, 6:23 p.m.)
> 
> 
> Review request for samza, Boris Shkolnik, Chris Pettitt, Jake Maes, Yi Pan 
> (Data Infrastructure), Navina Ramesh, and Xinyu Liu.
> 
> 
> Repository: samza
> 
> 
> Description
> ---
> 
> Samza currently has tight coupling with Yarn. This makes it impossible to 
> integrate with other resource managers like Mesos, or to run standalone 
> without any resource manager. This RB is a step to implementing SAMZA-881.
> 
> Design Doc: 
> https://issues.apache.org/jira/secure/attachment/12790540/SamzaJobCoordinatorRe-designProposal.pdf
> 
> 1.Proposed new APIs for a resource manager to integrate with Samza. 
> (SAMZA-881)
>- Defined the ContainerProcessManager abstraction, SamzaResource, 
> SamzaResourceRequest. 
>- Re-wrote the SamzaAppMaster into a ClusterBasedJobCoordinator.
>- Re-wrote yarn specific request logic by abstracting it into a 
> YarnContainerManager. 
> 2.Defined a ClusterManagerConfig to handle configurations independent of 
> Yarn/Mesos.
> 3.Made Samza's cluster interaction independent of Yarn. This separates Samza 
> specific components into samza-core and Yarn components into samza-yarn.
> 4.Readability improvements to the existing code base.
>-Added docs for most methods, member variables and classes (including on 
> thread-safety)
>- Made internal variables final to document intent, visibility across 
> threads. (trivially by adding modifiers, or by changing where they're 
> initialized.)
> 5.Refactored JobCoordinator into a JobModelReader.
> 
> TODO: Can go into the upcoming release. (P0)
> 1.Refactor the UI state variables and tests. Port some method re-orgs from 
> SAMZA-867 into here.
> 2.Revise packaging structure.
> 4.Document new configs.
> 5.Rename run-am.sh to run-coordinator.sh, Delete all files in the 
> non-refactored namespace. (For unit-testing, these files continue to exist)
> 
> TODO: (P1)
> 1.Build Mesos integration for Samza. Should be simpler to integrate with the 
> newer APIs.
>   - I started on this, and I plan to refine and post an RB in one of the 
> hack-days.
> 2.Refactor the SamzaAppState class to provide more accessors and eliminate 
> public variables. (This was
> a consequence of the already existing design which I've tried to be 
> compatible with)
> 
> TODO: I plan to track these with JIRAs so that they can be done later. (P2)
> 1.Get rid of the HTTP Server in the JobCoordinator
> 2.Make YarnJobCoordinator implement the JobCoordinator API as SAMZA-881.
> 
> 
> Diffs
> -
> 
>   
> samza-core/src/main/java/org/apache/samza/clustermanager/AbstractContainerAllocator.java
>  PRE-CREATION 
>   
> samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java
>  PRE-CREATION 
>   
> samza-core/src/main/java/org/apache/samza/clustermanager/ContainerAllocator.java
>  PRE-CREATION 
>   
> samza-core/src/main/java/org/apache/samza/clustermanager/ContainerManagerFactory.java
>  PRE-CREATION 
>   
> samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java
>  PRE-CREATION 
>   
> samza-core/src/main/java/org/apache/samza/clustermanager/ContainerRequestState.java
>  PRE-CREATION 
>   
> samza-core/src/main/java/org/apache/samza/clustermanager/HostAwareContainerAllocator.java
>  PRE-CREATION 
>   
> samza-core/src/main/java/org/apache/samza/clustermanager/ResourceFailure.java 
> PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/clustermanager/SamzaAppState.java 
> PRE-CREATION 
>   
> samza-core/src/main/java/org/apache/samza/clustermanager/SamzaContainerLaunchException.java
>  PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/clustermanager/SamzaResource.java 
> PRE-CREATION 
>   
> samza-core/src/main/java/org/apache/samza/clustermanager/SamzaResourceRequest.java
>  PRE-CREATION 
>   
> samza-core/src/main/java/org/apache/samza/clustermanager/SamzaResourceStatus.java
>  PRE

Re: Review Request 44920: Remove tight coupling of Samza with Yarn. Define APIs for resource manager integration

2016-03-19 Thread Chris Pettitt

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/44920/#review123928
---



Some more comments


samza-core/src/main/java/org/apache/samza/clustermanager/SamzaAppState.java 
(line 83)
<https://reviews.apache.org/r/44920/#comment186262>

This should be final, especially as we have other code that is relying on 
it to be a) correct and b) visible (from a thread-safety perspective) for 
proper shutdown.



samza-core/src/main/java/org/apache/samza/clustermanager/SamzaTaskManager.java 
(line 147)
<https://reviews.apache.org/r/44920/#comment186260>

Should we wait forever or fail after some reasonable timout?



samza-core/src/main/java/org/apache/samza/clustermanager/SamzaTaskManager.java 
(line 150)
<https://reviews.apache.org/r/44920/#comment186259>

Reinterrupting is the typical default behavior.



samza-core/src/main/scala/org/apache/samza/coordinator/JobModelReader.scala 
(line 59)
<https://reviews.apache.org/r/44920/#comment186272>

It's not obvious - I'm probably missing it - but it seems that we only 
write to this field in getJobCoordinator. If that's the case then we could just 
move this into that function instead of storing it here as a volatile.



samza-yarn/src/main/java/org/apache/samza/job/yarn/refactor/YarnContainerManager.java
 (line 86)
<https://reviews.apache.org/r/44920/#comment186274>

Minor: may as well use a diamond here since you're using it on the next 
line.


- Chris Pettitt


On March 16, 2016, 6:23 p.m., Jagadish Venkatraman wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/44920/
> ---
> 
> (Updated March 16, 2016, 6:23 p.m.)
> 
> 
> Review request for samza, Boris Shkolnik, Chris Pettitt, Jake Maes, Yi Pan 
> (Data Infrastructure), Navina Ramesh, and Xinyu Liu.
> 
> 
> Repository: samza
> 
> 
> Description
> ---
> 
> Samza currently has tight coupling with Yarn. This makes it impossible to 
> integrate with other resource managers like Mesos, or to run standalone 
> without any resource manager. This RB is a step to implementing SAMZA-881.
> 
> Design Doc: 
> https://issues.apache.org/jira/secure/attachment/12790540/SamzaJobCoordinatorRe-designProposal.pdf
> 
> 1.Proposed new APIs for a resource manager to integrate with Samza. 
> (SAMZA-881)
>- Defined the ContainerProcessManager abstraction, SamzaResource, 
> SamzaResourceRequest. 
>- Re-wrote the SamzaAppMaster into a ClusterBasedJobCoordinator.
>- Re-wrote yarn specific request logic by abstracting it into a 
> YarnContainerManager. 
> 2.Defined a ClusterManagerConfig to handle configurations independent of 
> Yarn/Mesos.
> 3.Made Samza's cluster interaction independent of Yarn. This separates Samza 
> specific components into samza-core and Yarn components into samza-yarn.
> 4.Readability improvements to the existing code base.
>-Added docs for most methods, member variables and classes (including on 
> thread-safety)
>- Made internal variables final to document intent, visibility across 
> threads. (trivially by adding modifiers, or by changing where they're 
> initialized.)
> 5.Refactored JobCoordinator into a JobModelReader.
> 
> TODO: Can go into the upcoming release. (P0)
> 1.Refactor the UI state variables and tests. Port some method re-orgs from 
> SAMZA-867 into here.
> 2.Revise packaging structure.
> 4.Document new configs.
> 5.Rename run-am.sh to run-coordinator.sh, Delete all files in the 
> non-refactored namespace. (For unit-testing, these files continue to exist)
> 
> TODO: (P1)
> 1.Build Mesos integration for Samza. Should be simpler to integrate with the 
> newer APIs.
>   - I started on this, and I plan to refine and post an RB in one of the 
> hack-days.
> 2.Refactor the SamzaAppState class to provide more accessors and eliminate 
> public variables. (This was
> a consequence of the already existing design which I've tried to be 
> compatible with)
> 
> TODO: I plan to track these with JIRAs so that they can be done later. (P2)
> 1.Get rid of the HTTP Server in the JobCoordinator
> 2.Make YarnJobCoordinator implement the JobCoordinator API as SAMZA-881.
> 
> 
> Diffs
> -
> 
>   
> samza-core/src/main/java/org/apache/samza/clustermanager/AbstractContainerAllocator.java
>  PRE-CREATION 
>   
> samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java
>  PRE-CREATION 
>   
> samza-core/src/main/java/org/apache/samza/clustermanager/ContainerAl

Re: Review Request 44920: Remove tight coupling of Samza with Yarn. Define APIs for resource manager integration

2016-03-19 Thread Chris Pettitt
s in the package without going through the setter methods. It 
makes it trickier to review, at the very least :).

Also, this looks like a good candidate for an immutable class, using copy 
on write if necessary.



samza-core/src/main/java/org/apache/samza/clustermanager/SamzaTaskManager.java 
(line 43)
<https://reviews.apache.org/r/44920/#comment186250>

Not thread safe.



samza-core/src/main/java/org/apache/samza/clustermanager/SamzaTaskManager.java 
(line 79)
<https://reviews.apache.org/r/44920/#comment186254>

This should be volatile, especially as I believe it is being used to convey 
state across threads (e.g. whatever calls onContainerCompleted and the main 
thread in ClusterBasedJobCoordinator).


- Chris Pettitt


On March 16, 2016, 6:23 p.m., Jagadish Venkatraman wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/44920/
> ---
> 
> (Updated March 16, 2016, 6:23 p.m.)
> 
> 
> Review request for samza, Boris Shkolnik, Chris Pettitt, Jake Maes, Yi Pan 
> (Data Infrastructure), Navina Ramesh, and Xinyu Liu.
> 
> 
> Repository: samza
> 
> 
> Description
> ---
> 
> Samza currently has tight coupling with Yarn. This makes it impossible to 
> integrate with other resource managers like Mesos, or to run standalone 
> without any resource manager. This RB is a step to implementing SAMZA-881.
> 
> Design Doc: 
> https://issues.apache.org/jira/secure/attachment/12790540/SamzaJobCoordinatorRe-designProposal.pdf
> 
> 1.Proposed new APIs for a resource manager to integrate with Samza. 
> (SAMZA-881)
>- Defined the ContainerProcessManager abstraction, SamzaResource, 
> SamzaResourceRequest. 
>- Re-wrote the SamzaAppMaster into a ClusterBasedJobCoordinator.
>- Re-wrote yarn specific request logic by abstracting it into a 
> YarnContainerManager. 
> 2.Defined a ClusterManagerConfig to handle configurations independent of 
> Yarn/Mesos.
> 3.Made Samza's cluster interaction independent of Yarn. This separates Samza 
> specific components into samza-core and Yarn components into samza-yarn.
> 4.Readability improvements to the existing code base.
>-Added docs for most methods, member variables and classes (including on 
> thread-safety)
>- Made internal variables final to document intent, visibility across 
> threads. (trivially by adding modifiers, or by changing where they're 
> initialized.)
> 5.Refactored JobCoordinator into a JobModelReader.
> 
> TODO: Can go into the upcoming release. (P0)
> 1.Refactor the UI state variables and tests. Port some method re-orgs from 
> SAMZA-867 into here.
> 2.Revise packaging structure.
> 4.Document new configs.
> 5.Rename run-am.sh to run-coordinator.sh, Delete all files in the 
> non-refactored namespace. (For unit-testing, these files continue to exist)
> 
> TODO: (P1)
> 1.Build Mesos integration for Samza. Should be simpler to integrate with the 
> newer APIs.
>   - I started on this, and I plan to refine and post an RB in one of the 
> hack-days.
> 2.Refactor the SamzaAppState class to provide more accessors and eliminate 
> public variables. (This was
> a consequence of the already existing design which I've tried to be 
> compatible with)
> 
> TODO: I plan to track these with JIRAs so that they can be done later. (P2)
> 1.Get rid of the HTTP Server in the JobCoordinator
> 2.Make YarnJobCoordinator implement the JobCoordinator API as SAMZA-881.
> 
> 
> Diffs
> -
> 
>   
> samza-core/src/main/java/org/apache/samza/clustermanager/AbstractContainerAllocator.java
>  PRE-CREATION 
>   
> samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java
>  PRE-CREATION 
>   
> samza-core/src/main/java/org/apache/samza/clustermanager/ContainerAllocator.java
>  PRE-CREATION 
>   
> samza-core/src/main/java/org/apache/samza/clustermanager/ContainerManagerFactory.java
>  PRE-CREATION 
>   
> samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java
>  PRE-CREATION 
>   
> samza-core/src/main/java/org/apache/samza/clustermanager/ContainerRequestState.java
>  PRE-CREATION 
>   
> samza-core/src/main/java/org/apache/samza/clustermanager/HostAwareContainerAllocator.java
>  PRE-CREATION 
>   
> samza-core/src/main/java/org/apache/samza/clustermanager/ResourceFailure.java 
> PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/clustermanager/SamzaAppState.java 
> PRE-CREATION 
>   
> samza-core/src/main/java/org/apache/samza/clustermanager/SamzaContainerLaunchException.java
>  PRE-CREATION 
>   sa

Re: Review Request 44920: Remove tight coupling of Samza with Yarn. Define APIs for resource manager integration

2016-03-18 Thread Chris Pettitt


> On March 16, 2016, 8:48 p.m., Chris Pettitt wrote:
> > samza-core/src/main/java/org/apache/samza/clustermanager/SamzaAppState.java,
> >  line 36
> > <https://reviews.apache.org/r/44920/diff/1/?file=1301270#file1301270line36>
> >
> > Agreed. This is a bit mix of atomic and mutable state. Either make it 
> > thread safe (e.g. lock down mutability of fields), ensure that remaining 
> > mutable state can be updated atomically, or decide it is not intended to be 
> > thread safe and get rid of additional locks / volatile accesses that add 
> > cost with no gain. Former is preferred :).
> 
> Jagadish Venkatraman wrote:
> Great feedback Chris! I could not agree more! :-)
> 
> SamzaAppState class is a currently a source of major problems. I did not 
> want to touch it (as it was not scoped in this refactoring). Upon digging 
> further, I realize the problem of making this thread-safe/private is slightly 
> involved.
> 
> 1. There is a jobCoordinator object that is exposed publicly as a part of 
> SamzaAppState. The jobCoordinator inturn exposes a nested jobModel instance 
> directly thorough its accessor. The JobModel embeds a LocalityManager that 
> mutates state during some public method calls. Hence, The jobModel instance 
> is *not* thread-safe and is shared concurrently across the UI threads, the 
> HTTP server threads in the queued thread pool,the SamzaAppMaster thread. 
> (Created SAMZA-899 to make the JobModel immutable)
> 
> 2. There are a bunch of state data structures that are publicly exposed 
> in SamzaAppState. These must be made thread-safe into accessors. These public 
> global variables could be mutated everywhere in Samza without regard for 
> safety/visibility or correctness. For example, there is an integer 
> containerCount that is public which is manipulated by both the metrics 
> reporter and the callback threads. (I created SAMZA-901 to track this)
> 
> I will work on both of these as these ASAP.
> 
> Jagadish Venkatraman wrote:
> Just a clarification:
> 1. The JobModel instance is shared concurrently as stated in [1]. This 
> presents a source of *potential* problems. (I believe there is not an actual 
> bug in the JobModel )
> 2. The containerCount is a public int that *could* be manipulated by both 
> the reporter and callback. I believe the current interaction does not have 
> any races (since count is just set at the startup once). But, having as 
> public non-final int *could* be a source of potential problems if it was 
> modified elsewhere.

For #2, the state of containerCount in onContainerCompleted is undefined unless 
start transitively "happens-before" onContainerCompleted. It may be that that 
holds, but it is not obvious to me (without spending more time working out the 
call graph). If it's not a problem now it could become one with a seemingly 
innocent refactoring.


- Chris


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/44920/#review123902
---


On March 16, 2016, 6:23 p.m., Jagadish Venkatraman wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/44920/
> ---
> 
> (Updated March 16, 2016, 6:23 p.m.)
> 
> 
> Review request for samza, Boris Shkolnik, Chris Pettitt, Jake Maes, Yi Pan 
> (Data Infrastructure), Navina Ramesh, and Xinyu Liu.
> 
> 
> Repository: samza
> 
> 
> Description
> ---
> 
> Samza currently has tight coupling with Yarn. This makes it impossible to 
> integrate with other resource managers like Mesos, or to run standalone 
> without any resource manager. This RB is a step to implementing SAMZA-881.
> 
> Design Doc: 
> https://issues.apache.org/jira/secure/attachment/12790540/SamzaJobCoordinatorRe-designProposal.pdf
> 
> 1.Proposed new APIs for a resource manager to integrate with Samza. 
> (SAMZA-881)
>- Defined the ContainerProcessManager abstraction, SamzaResource, 
> SamzaResourceRequest. 
>- Re-wrote the SamzaAppMaster into a ClusterBasedJobCoordinator.
>- Re-wrote yarn specific request logic by abstracting it into a 
> YarnContainerManager. 
> 2.Defined a ClusterManagerConfig to handle configurations independent of 
> Yarn/Mesos.
> 3.Made Samza's cluster interaction independent of Yarn. This separates Samza 
> specific components into samza-core and Yarn components into samza-yarn.
> 4.Readability improvements to the existing code base.
>-Added docs for most