Re: Review Request 50174: SAMZA-977: User doc for samza multithreading

2016-08-24 Thread Navina Ramesh

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/50174/#review146739
---



Not that I intend to give you more work. However, adding an example of 
AsyncStreamTask to samza's hello-world will give a quick idea to the user on 
when to use it and how to use it. This can be augmented in a tutorial page, 
similar to the one Jake provided for samza-rest api. Thanks!

- Navina Ramesh


On July 27, 2016, 11:05 p.m., Xinyu Liu wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/50174/
> ---
> 
> (Updated July 27, 2016, 11:05 p.m.)
> 
> 
> Review request for samza, Chris Pettitt, Navina Ramesh, and Yi Pan (Data 
> Infrastructure).
> 
> 
> Repository: samza
> 
> 
> Description
> ---
> 
> Update samza web docs with new multithreading api, core and configs.
> 
> 
> Diffs
> -
> 
>   docs/learn/documentation/versioned/api/overview.md 
> 6712344e84e19883b857e00549db2acb101c7e0e 
>   docs/learn/documentation/versioned/container/event-loop.md 
> 116238312df7071747cbbc14bc9c46f558755195 
>   docs/learn/documentation/versioned/jobs/configuration-table.html 
> 54c52981c3055b398ee60af50eeaf2592ed0e64f 
> 
> Diff: https://reviews.apache.org/r/50174/diff/
> 
> 
> Testing
> ---
> 
> Test the web pages locally.
> 
> 
> Thanks,
> 
> Xinyu Liu
> 
>



Re: Review Request 50174: SAMZA-977: User doc for samza multithreading

2016-08-24 Thread Navina Ramesh

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/50174/#review146554
---




docs/learn/documentation/versioned/api/overview.md (line 22)


"You should implement StreamTask for synchronous process, e.g. a 
computation that does not involve remote calls" 

This is not very clear. To me, it sounds like we are recommending a task 
interface based on a use-case. Instead, I think it will be useful to explain 
what you mean by "synchronous processing of a stream message" and compare it 
with asynchronous processing. This can be followed by the remote call example.



docs/learn/documentation/versioned/api/overview.md (line 54)


You have mentioned "processAsync". Your code sample above show "process". 
Please fix it.



docs/learn/documentation/versioned/container/event-loop.md (line 24)


There is no sharing of task state in Samza. Each task guarantee isolation 
from the others. What exactly are you referring to here? Unless I have 
misunderstood the semantics provided by processing in multiple threads



docs/learn/documentation/versioned/container/event-loop.md (line 41)


I don't understand when we are blocking. 

Are you saying "Block if (all task instances have outstanding messages to 
process or (window and commit is in progress)" ? 
Or "Block if (all task instances have outstanding messages to process or 
window) and (commit is in progress)?



docs/learn/documentation/versioned/container/event-loop.md (line 43)


I think you are trying to explain 2 things:
1. semantics of event loop when using AsyncStreamTask
2. Semantics of event loop when using StreamTask with threadPool size > 1

Can you try to elaborate more on this? Perhaps just separating semantics of 
event loop for StreamTask and AsyncStreamTask will add more clarity.

Also, the processing order guarantees need to be called out. It is possible 
to process out-of-order within a partition if the task threadpool size is >1. 
This is a very important behavior change that needs to be documented here.



docs/learn/documentation/versioned/container/event-loop.md (line 47)


nit: "through the standard InitableTask, ClosableTask, StreamTask / 
AsyncStreamTask, and WindowTask."



docs/learn/documentation/versioned/container/event-loop.md (line 49)


Does anything change with respect to adding "task.subtask.class" in the 
process() method of AsyncStreamTask? Please check if this does not change the 
expected behavior in SAMZA-437. We use that at LinkedIn for restliWrapper (I 
think).



docs/learn/documentation/versioned/jobs/configuration-table.html (line 360)


Isn't there a performance impact if we use AsyncStreamTask by default? Esp. 
on jobs that don't need any async processing. 
Why is the default value false? 

If users upgrade to the new version and automatically start using 
multithreaded execution, will they see any performance impact?



docs/learn/documentation/versioned/jobs/configuration-table.html (line 368)


is this property used only if job.container.single.thread.mode == true ?


- Navina Ramesh


On July 27, 2016, 11:05 p.m., Xinyu Liu wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/50174/
> ---
> 
> (Updated July 27, 2016, 11:05 p.m.)
> 
> 
> Review request for samza, Chris Pettitt, Navina Ramesh, and Yi Pan (Data 
> Infrastructure).
> 
> 
> Repository: samza
> 
> 
> Description
> ---
> 
> Update samza web docs with new multithreading api, core and configs.
> 
> 
> Diffs
> -
> 
>   docs/learn/documentation/versioned/api/overview.md 
> 6712344e84e19883b857e00549db2acb101c7e0e 
>   docs/learn/documentation/versioned/container/event-loop.md 
> 116238312df7071747cbbc14bc9c46f558755195 
>   docs/learn/documentation/versioned/jobs/configuration-table.html 
> 54c52981c3055b398ee60af50eeaf2592ed0e64f 
> 
> Diff: https://reviews.apache.org/r/50174/diff/
> 
> 
> Testing
> ---
> 
> Test the web pages locally.
> 
> 
> Thanks,
> 
> Xinyu Liu
> 
>



Job coordinator stream and job redeployment

2016-08-24 Thread David Yu
Hi,

I'm trying to understand role of the coordinator stream during a job
redeployment.

>From the Samza documentation, I'm seeing the following about the
coordinator stream:

The Job Coordinator bootstraps configuration from the coordinator stream
each time upon job start-up. It periodically catches up with any new data
written to the coordinator stream and updates the Job Model.

However, it is unclear to me how this will work with "--config-path" when
we need to redeploy a job. Does the provided config first gets persisted to
the coordinator stream, updating the previous model, or will it simply be
ignored?

Thanks,
David


Re: Debug Samza consumer lag issue

2016-08-24 Thread David Yu
Make sense. Thanks for the help, Jake!

On Wed, Aug 24, 2016 at 5:11 PM Jacob Maes  wrote:

> We don't have any hard guidelines around that metric just because there are
> no hard rules that work for every job. For example, some jobs are very
> bursty and need to keep up with huge traffic ramp-ups even though they're
> underutilized the rest of the time.
>
> That said, yes, I have used that metric to determine whether a job has too
> much parallelism. But it was a job that had very stable throughput patterns
> and didn't have any major time spent in the window or commit methods, which
> could cause periodic spikes in utilization.
>
>
>
> On Wed, Aug 24, 2016 at 2:55 PM, David Yu  wrote:
>
> > Interesting.
> >
> > To me, "event-loop-utilization" looks like a good indicator that shows us
> > how busy the containers are. Is it safe to use this metric as a reference
> > when we need to scale out/in our job? For example, if I'm seeing around
> 0.3
> > utilization most of the time, maybe I can decrease the # of containers
> and
> > save some resources?
> >
> > Thanks,
> > David
> >
> > On Wed, Aug 24, 2016 at 1:27 PM Jacob Maes  wrote:
> >
> > > >
> > > > Based on what you have described, the following should be true in
> > 0.10.1:
> > > > event-loop-ns = choose-ns + process-ns + window-ns (if necessary) +
> > > > commit-ns (if necessary)
> > >
> > > Yes, plus any time (e.g. due to an unlucky GC at just the right moment)
> > > that happens outside those timers.  And no "if necessary" for window or
> > > commit. There will be a small value for those methods even if they
> don't
> > do
> > > anything significant because the timer runs even for no-ops
> > >
> > > Since you're on 10.1, there's another useful metric
> > > "event-loop-utilization", which represents
> > > (process-ns+window-ns+commit-ns)/event-loop-ns
> > > (as you defined it). In other words, the proportion of time spend
> working
> > > vs waiting.
> > >
> > > On Wed, Aug 24, 2016 at 10:18 AM, David Yu 
> > > wrote:
> > >
> > > > Great. It all makes sense now.
> > > >
> > > > With the SSD fix, we also upgrade to 0.10.1. So we should see pretty
> > > > consistent process-ns (which we do).
> > > >
> > > > Based on what you have described, the following should be true in
> > 0.10.1:
> > > > event-loop-ns = choose-ns + process-ns + window-ns (if necessary) +
> > > > commit-ns (if necessary)
> > > >
> > > > Is this correct?
> > > > Thanks,
> > > > David
> > > >
> > > > On Wed, Aug 24, 2016 at 11:27 AM Jacob Maes 
> > > wrote:
> > > >
> > > > > A couple other notes.
> > > > >
> > > > > Prior to Samza 10.1, the choose-ns was part of process-ns. So when
> > > > > choose-ns and process-ns are both high (around 10,000,000 == 10ms,
> > > which
> > > > is
> > > > > the default poll timeout), that usually means the task is caught
> up.
> > In
> > > > > Samza 10.1 the same is true if ONLY choose-ns is high. process-ns
> is
> > > > always
> > > > > the time spent in the process() method.
> > > > >
> > > > > Based on the above, the metric numbers you provided after the SSD
> fix
> > > all
> > > > > look reasonable. They're all sub-millisecond and since choose-ns
> and
> > > > > process-ns are low, it seems that the container is chewing through
> > > > messages
> > > > > at a good rate.
> > > > >
> > > > > So I would conclude that the SSD fix was probably the right one and
> > it
> > > > just
> > > > > took the job a while to catch up to the backlog of messages. Once
> it
> > > > caught
> > > > > up, the choose-ns and process-ns increased, which is normal when
> the
> > > > > processor is waiting for new messages.
> > > > >
> > > > > -Jake
> > > > >
> > > > > On Wed, Aug 24, 2016 at 9:05 AM, Jacob Maes 
> > > > wrote:
> > > > >
> > > > > > Hey David,
> > > > > >
> > > > > > Answering the most recent question first, since it's also the
> > > easiest.
> > > > > :-)
> > > > > >
> > > > > > Is choose-ns the total number of ms used to choose a message from
> > the
> > > > > input
> > > > > >> stream? What are some gating factors (e.g. serialization?) for
> > this
> > > > > >> metric?
> > > > > >
> > > > > > It's the amount of time the event loop spent getting new
> messsages
> > > for
> > > > > > process(). It includes deserialization time and poll time which
> we
> > > > added
> > > > > > new metrics for, in Samza 10.1. Typically deserialization time is
> > > > pretty
> > > > > > consistent, so when you see a spike in choose-ns, it's usually
> > > because
> > > > > the
> > > > > > event loop is waiting for new messages. The two most common cases
> > > when
> > > > > it's
> > > > > > waiting are:
> > > > > > 1. There are no new messages in the topic partition. This is good
> > > > because
> > > > > > it means the processor is caught up.
> > > > > > 2. The consumer is slow and/or the buffer isn't large enough so
> the
> > > > > > 

Re: Debug Samza consumer lag issue

2016-08-24 Thread Jacob Maes
We don't have any hard guidelines around that metric just because there are
no hard rules that work for every job. For example, some jobs are very
bursty and need to keep up with huge traffic ramp-ups even though they're
underutilized the rest of the time.

That said, yes, I have used that metric to determine whether a job has too
much parallelism. But it was a job that had very stable throughput patterns
and didn't have any major time spent in the window or commit methods, which
could cause periodic spikes in utilization.



On Wed, Aug 24, 2016 at 2:55 PM, David Yu  wrote:

> Interesting.
>
> To me, "event-loop-utilization" looks like a good indicator that shows us
> how busy the containers are. Is it safe to use this metric as a reference
> when we need to scale out/in our job? For example, if I'm seeing around 0.3
> utilization most of the time, maybe I can decrease the # of containers and
> save some resources?
>
> Thanks,
> David
>
> On Wed, Aug 24, 2016 at 1:27 PM Jacob Maes  wrote:
>
> > >
> > > Based on what you have described, the following should be true in
> 0.10.1:
> > > event-loop-ns = choose-ns + process-ns + window-ns (if necessary) +
> > > commit-ns (if necessary)
> >
> > Yes, plus any time (e.g. due to an unlucky GC at just the right moment)
> > that happens outside those timers.  And no "if necessary" for window or
> > commit. There will be a small value for those methods even if they don't
> do
> > anything significant because the timer runs even for no-ops
> >
> > Since you're on 10.1, there's another useful metric
> > "event-loop-utilization", which represents
> > (process-ns+window-ns+commit-ns)/event-loop-ns
> > (as you defined it). In other words, the proportion of time spend working
> > vs waiting.
> >
> > On Wed, Aug 24, 2016 at 10:18 AM, David Yu 
> > wrote:
> >
> > > Great. It all makes sense now.
> > >
> > > With the SSD fix, we also upgrade to 0.10.1. So we should see pretty
> > > consistent process-ns (which we do).
> > >
> > > Based on what you have described, the following should be true in
> 0.10.1:
> > > event-loop-ns = choose-ns + process-ns + window-ns (if necessary) +
> > > commit-ns (if necessary)
> > >
> > > Is this correct?
> > > Thanks,
> > > David
> > >
> > > On Wed, Aug 24, 2016 at 11:27 AM Jacob Maes 
> > wrote:
> > >
> > > > A couple other notes.
> > > >
> > > > Prior to Samza 10.1, the choose-ns was part of process-ns. So when
> > > > choose-ns and process-ns are both high (around 10,000,000 == 10ms,
> > which
> > > is
> > > > the default poll timeout), that usually means the task is caught up.
> In
> > > > Samza 10.1 the same is true if ONLY choose-ns is high. process-ns is
> > > always
> > > > the time spent in the process() method.
> > > >
> > > > Based on the above, the metric numbers you provided after the SSD fix
> > all
> > > > look reasonable. They're all sub-millisecond and since choose-ns and
> > > > process-ns are low, it seems that the container is chewing through
> > > messages
> > > > at a good rate.
> > > >
> > > > So I would conclude that the SSD fix was probably the right one and
> it
> > > just
> > > > took the job a while to catch up to the backlog of messages. Once it
> > > caught
> > > > up, the choose-ns and process-ns increased, which is normal when the
> > > > processor is waiting for new messages.
> > > >
> > > > -Jake
> > > >
> > > > On Wed, Aug 24, 2016 at 9:05 AM, Jacob Maes 
> > > wrote:
> > > >
> > > > > Hey David,
> > > > >
> > > > > Answering the most recent question first, since it's also the
> > easiest.
> > > > :-)
> > > > >
> > > > > Is choose-ns the total number of ms used to choose a message from
> the
> > > > input
> > > > >> stream? What are some gating factors (e.g. serialization?) for
> this
> > > > >> metric?
> > > > >
> > > > > It's the amount of time the event loop spent getting new messsages
> > for
> > > > > process(). It includes deserialization time and poll time which we
> > > added
> > > > > new metrics for, in Samza 10.1. Typically deserialization time is
> > > pretty
> > > > > consistent, so when you see a spike in choose-ns, it's usually
> > because
> > > > the
> > > > > event loop is waiting for new messages. The two most common cases
> > when
> > > > it's
> > > > > waiting are:
> > > > > 1. There are no new messages in the topic partition. This is good
> > > because
> > > > > it means the processor is caught up.
> > > > > 2. The consumer is slow and/or the buffer isn't large enough so the
> > > > > BrokerProxy isn't able to keep enough messages buffered to keep the
> > > event
> > > > > loop busy. This is uncommon because the buffer is defaulted to
> 50,000
> > > > > messages, which should be plenty. But if it happens, it's bad. To
> > > control
> > > > > this behavior, see the following properties in the config table (
> > > > > http://samza.apache.org/learn/documentation/0.10/jobs/
> > > 

Re: Debug Samza consumer lag issue

2016-08-24 Thread David Yu
Interesting.

To me, "event-loop-utilization" looks like a good indicator that shows us
how busy the containers are. Is it safe to use this metric as a reference
when we need to scale out/in our job? For example, if I'm seeing around 0.3
utilization most of the time, maybe I can decrease the # of containers and
save some resources?

Thanks,
David

On Wed, Aug 24, 2016 at 1:27 PM Jacob Maes  wrote:

> >
> > Based on what you have described, the following should be true in 0.10.1:
> > event-loop-ns = choose-ns + process-ns + window-ns (if necessary) +
> > commit-ns (if necessary)
>
> Yes, plus any time (e.g. due to an unlucky GC at just the right moment)
> that happens outside those timers.  And no "if necessary" for window or
> commit. There will be a small value for those methods even if they don't do
> anything significant because the timer runs even for no-ops
>
> Since you're on 10.1, there's another useful metric
> "event-loop-utilization", which represents
> (process-ns+window-ns+commit-ns)/event-loop-ns
> (as you defined it). In other words, the proportion of time spend working
> vs waiting.
>
> On Wed, Aug 24, 2016 at 10:18 AM, David Yu 
> wrote:
>
> > Great. It all makes sense now.
> >
> > With the SSD fix, we also upgrade to 0.10.1. So we should see pretty
> > consistent process-ns (which we do).
> >
> > Based on what you have described, the following should be true in 0.10.1:
> > event-loop-ns = choose-ns + process-ns + window-ns (if necessary) +
> > commit-ns (if necessary)
> >
> > Is this correct?
> > Thanks,
> > David
> >
> > On Wed, Aug 24, 2016 at 11:27 AM Jacob Maes 
> wrote:
> >
> > > A couple other notes.
> > >
> > > Prior to Samza 10.1, the choose-ns was part of process-ns. So when
> > > choose-ns and process-ns are both high (around 10,000,000 == 10ms,
> which
> > is
> > > the default poll timeout), that usually means the task is caught up. In
> > > Samza 10.1 the same is true if ONLY choose-ns is high. process-ns is
> > always
> > > the time spent in the process() method.
> > >
> > > Based on the above, the metric numbers you provided after the SSD fix
> all
> > > look reasonable. They're all sub-millisecond and since choose-ns and
> > > process-ns are low, it seems that the container is chewing through
> > messages
> > > at a good rate.
> > >
> > > So I would conclude that the SSD fix was probably the right one and it
> > just
> > > took the job a while to catch up to the backlog of messages. Once it
> > caught
> > > up, the choose-ns and process-ns increased, which is normal when the
> > > processor is waiting for new messages.
> > >
> > > -Jake
> > >
> > > On Wed, Aug 24, 2016 at 9:05 AM, Jacob Maes 
> > wrote:
> > >
> > > > Hey David,
> > > >
> > > > Answering the most recent question first, since it's also the
> easiest.
> > > :-)
> > > >
> > > > Is choose-ns the total number of ms used to choose a message from the
> > > input
> > > >> stream? What are some gating factors (e.g. serialization?) for this
> > > >> metric?
> > > >
> > > > It's the amount of time the event loop spent getting new messsages
> for
> > > > process(). It includes deserialization time and poll time which we
> > added
> > > > new metrics for, in Samza 10.1. Typically deserialization time is
> > pretty
> > > > consistent, so when you see a spike in choose-ns, it's usually
> because
> > > the
> > > > event loop is waiting for new messages. The two most common cases
> when
> > > it's
> > > > waiting are:
> > > > 1. There are no new messages in the topic partition. This is good
> > because
> > > > it means the processor is caught up.
> > > > 2. The consumer is slow and/or the buffer isn't large enough so the
> > > > BrokerProxy isn't able to keep enough messages buffered to keep the
> > event
> > > > loop busy. This is uncommon because the buffer is defaulted to 50,000
> > > > messages, which should be plenty. But if it happens, it's bad. To
> > control
> > > > this behavior, see the following properties in the config table (
> > > > http://samza.apache.org/learn/documentation/0.10/jobs/
> > > > configuration-table.html)
> > > > systems.system-name.samza.fetch.threshold
> > > > task.poll.interval.ms
> > > >
> > > >
> > > >
> > > > On Wed, Aug 24, 2016 at 8:52 AM, David Yu 
> > > wrote:
> > > >
> > > >> More updates:
> > > >> 1. process-envelopes rate finally stabilized and converged. Consumer
> > lag
> > > >> is
> > > >> down to zero.
> > > >> 2. avg choose-ns across containers dropped overtime
> > > >>  > > >> -08-24%2010.46.22.png?dl=0>,
> > > >> which I assume is a good thing.
> > > >>
> > > >> My question:
> > > >> Is choose-ns the total number of ms used to choose a message from
> the
> > > >> input
> > > >> stream? What are some gating factors (e.g. serialization?) for this
> > > >> metric?
> > > >>
> > > >> Thanks,
> > > >> David

Re: Review Request 51346: SAMZA-974 - Support finite datasources in Samza that have a notion of End-Of-Stream

2016-08-24 Thread Xinyu Liu

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/51346/#review146712
---




samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java (line 350)


remove



samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java (line 558)


boolean isEndOfStream() {
  ...
  return ssps.isEmtpy() && pendingEvelopQueue.isEmpty() && messagesInFilght 
== 0);
}



samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java (line 580)


endOfStrem=isEndOfStream();



samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java (line 605)


if(endOfStream) ...



samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala (line 
355)


No need for this logic since AsyncRunLoop won't poll anymore.


- Xinyu Liu


On Aug. 24, 2016, 9:03 p.m., Jagadish Venkatraman wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/51346/
> ---
> 
> (Updated Aug. 24, 2016, 9:03 p.m.)
> 
> 
> Review request for samza, Boris Shkolnik, Chris Pettitt, Yi Pan (Data 
> Infrastructure), Navina Ramesh, and Xinyu Liu.
> 
> 
> Repository: samza
> 
> 
> Description
> ---
> 
> Samza currently works with unbounded data sources (kafka streams). However, 
> for bounded data sources like HDFS files, snapshot files which are not 
> infinite, we need a notion of 'end-of-stream'. 
> 
> This is a step towards realizing a 'finite' Samza job that terminates once 
> data processing is complete.(as opposed to an infinite stream job that keeps 
> running)  
> 
> RB changes:
> - New interface for EndOfStreamListener
> - New 'end-of-stream' state in the state-machine of AsyncStreamTask 
> (Invariant: When end-of-stream is reached there are no buffered messages, 
> no-callbacks are in-flight and no-window or commit call shall be in progress)
> - Changes to allow clean shut-downs of the tasks/container/job for 
> end-of-stream.
> 
> Design Doc and Implementation Notes: 
> https://issues.apache.org/jira/secure/attachment/12825119/ProposalforEndofStreaminSamza.pdf
> 
> 
> Diffs
> -
> 
>   
> samza-api/src/main/java/org/apache/samza/system/IncomingMessageEnvelope.java 
> cc860cf7eb4d514736913c1dceaa80534b61d71a 
>   
> samza-api/src/main/java/org/apache/samza/system/SystemStreamPartitionIterator.java
>  a8f858aa7e4f4ce436f450cf439fe1a102983c64 
>   samza-api/src/main/java/org/apache/samza/task/EndOfStreamListenerTask.java 
> PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java 
> a510bb0c5914c772438930d27f100b4d360c1296 
>   samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala 
> 89f6857014489aba2db4129bc2e26dfec5b10652 
>   samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala 
> a8355b944cad54faacf5eeb883d8f4b630440757 
>   samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java 
> ca913dea79fecbcecdfd1010dc794318055c5764 
> 
> Diff: https://reviews.apache.org/r/51346/diff/
> 
> 
> Testing
> ---
> 
> Unit tests to test scenarios for inorder processing, out-of-order processing 
> and commit semantics.
> 
> 
> Thanks,
> 
> Jagadish Venkatraman
> 
>



Re: Review Request 51346: SAMZA-974 - Support finite datasources in Samza that have a notion of End-Of-Stream

2016-08-24 Thread Jagadish Venkatraman

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/51346/
---

(Updated Aug. 24, 2016, 9:03 p.m.)


Review request for samza, Boris Shkolnik, Chris Pettitt, Yi Pan (Data 
Infrastructure), Navina Ramesh, and Xinyu Liu.


Changes
---

- Prevent future polls for Ssps at end of stream.
- Support the notion of endOfStream when iterating over messages in an SSP 
using a SystemStreamPartitionIterator


Repository: samza


Description
---

Samza currently works with unbounded data sources (kafka streams). However, for 
bounded data sources like HDFS files, snapshot files which are not infinite, we 
need a notion of 'end-of-stream'. 

This is a step towards realizing a 'finite' Samza job that terminates once data 
processing is complete.(as opposed to an infinite stream job that keeps 
running)  

RB changes:
- New interface for EndOfStreamListener
- New 'end-of-stream' state in the state-machine of AsyncStreamTask (Invariant: 
When end-of-stream is reached there are no buffered messages, no-callbacks are 
in-flight and no-window or commit call shall be in progress)
- Changes to allow clean shut-downs of the tasks/container/job for 
end-of-stream.

Design Doc and Implementation Notes: 
https://issues.apache.org/jira/secure/attachment/12825119/ProposalforEndofStreaminSamza.pdf


Diffs (updated)
-

  samza-api/src/main/java/org/apache/samza/system/IncomingMessageEnvelope.java 
cc860cf7eb4d514736913c1dceaa80534b61d71a 
  
samza-api/src/main/java/org/apache/samza/system/SystemStreamPartitionIterator.java
 a8f858aa7e4f4ce436f450cf439fe1a102983c64 
  samza-api/src/main/java/org/apache/samza/task/EndOfStreamListenerTask.java 
PRE-CREATION 
  samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java 
a510bb0c5914c772438930d27f100b4d360c1296 
  samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala 
89f6857014489aba2db4129bc2e26dfec5b10652 
  samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala 
a8355b944cad54faacf5eeb883d8f4b630440757 
  samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java 
ca913dea79fecbcecdfd1010dc794318055c5764 

Diff: https://reviews.apache.org/r/51346/diff/


Testing
---

Unit tests to test scenarios for inorder processing, out-of-order processing 
and commit semantics.


Thanks,

Jagadish Venkatraman



Re: Review Request 49212: RFC: SAMZA-855: Update kafka client to 0.10.0.0

2016-08-24 Thread Yi Pan (Data Infrastructure)

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/49212/#review146701
---




samza-kafka/src/main/scala/org/apache/samza/migration/KafkaCheckpointMigration.scala
 (line 37)


This whole class is deleted in another RB. Could you rebase w/ latest 
master?


- Yi Pan (Data Infrastructure)


On June 24, 2016, 7:45 p.m., Robert Crim wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/49212/
> ---
> 
> (Updated June 24, 2016, 7:45 p.m.)
> 
> 
> Review request for samza.
> 
> 
> Repository: samza
> 
> 
> Description
> ---
> 
> This is a WIP for updating the the kafka client libraries to 0.10+. So far, 
> I've updated the dependency and simply worked to get all existing tests 
> passing. The next steps are to further test/verify backwards compatiblity 
> with older brokers and moving the current `KafkaSystemFactory`, etc, to 
> `OldKafkaSystemFactory` and implementing the new clients.
> 
> 
> Diffs
> -
> 
>   build.gradle ba4a9d1 
>   gradle/dependency-versions.gradle 47c71bf 
>   
> samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
>  ea10cae 
>   
> samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala
>  4e97376 
>   
> samza-kafka/src/main/scala/org/apache/samza/config/RegExTopicGenerator.scala 
> 78467bf 
>   
> samza-kafka/src/main/scala/org/apache/samza/migration/KafkaCheckpointMigration.scala
>  5e8cc65 
>   
> samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
>  ba8de5c 
>   
> samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala
>  b373753 
>   
> samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
>  b574176 
>   samza-kafka/src/main/scala/org/apache/samza/util/KafkaUtil.scala a25ba62 
>   
> samza-kafka/src/test/java/org/apache/samza/system/kafka/MockKafkaProducer.java
>  6f498de 
>   samza-kafka/src/test/java/org/apache/samza/utils/TestUtils.java 2fa743f 
>   
> samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala
>  e6815da 
>   
> samza-kafka/src/test/scala/org/apache/samza/migration/TestKafkaCheckpointMigration.scala
>  504fc89 
>   
> samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala
>  f00405d 
>   
> samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemConsumer.scala
>  ece0359 
>   
> samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemProducer.scala
>  8e32bba 
>   
> samza-test/src/test/scala/org/apache/samza/test/integration/StreamTaskTestUtil.scala
>  8d7e3fe 
> 
> Diff: https://reviews.apache.org/r/49212/diff/
> 
> 
> Testing
> ---
> 
> Got `./gradlew clean check` passing. I've not been able to run the 
> integration tests (on any branch) but will do that next!
> 
> 
> Thanks,
> 
> Robert Crim
> 
>



Re: [DISCUSS] Samza 0.11.0 release

2016-08-24 Thread Nicolas Maquet
Hi,

We are looking at upgrading to Kafka 0.10.0 in part for the new message
format which includes a timestamp field. Kafka 0.10.0 is backwards
compatible with 0.8.x clients but we are concerned about the performance
impact, see
http://kafka.apache.org/documentation.html#upgrade_10_performance_impact.

Cheers,

Nicolas

On 25 Aug 2016 6:20 a.m., "Yi Pan"  wrote:

> Hi, Nicolas,
>
> Could you explain to me why Samza is blocking you from upgrading your Kafka
> brokers to 0.10? At LinkedIn, we are running Samza 0.10 w/ Kafka 0.10
> brokers. This is a valid combination since Kafka 0.10 brokers should be
> backward compatible w/ 0.8.2 clients (which is the version Samza uses). I
> would recommend that you try to upgrade the Kafka brokers w/ Samza 0.10. If
> there is any issues you see, we will help to investigate and fix.
>
> Meanwhile, we are planning to schedule Samza releases more regularly and
> frequent in terms of time. Hence, although SAMZA-855 is not planned in
> 0.11, it will be included in the immediate version after that.
>
> Let me know if you have any further concerns.
>
> On Mon, Aug 22, 2016 at 2:48 PM, Nicolas Maquet  wrote:
>
> > Hi all,
> >
> > Would it be possible to consider including the following in the 0.11.0
> > target release?
> >
> > SAMZA-855: Upgrade Samza's Kafka client version to 0.10.0.0
> >
> > We are looking at upgrading our Kafka cluster to 0.10 and Samza is
> > currently preventing us from doing so.
> >
> > Regards,
> >
> > Nicolas Maquet
> >
> >
> >
> > 2016-08-23 9:06 GMT+12:00 xinyu liu :
> >
> > > Hi, All,
> > >
> > > There have been quite a lot of new features added to master since 0.10
> > > release to warrant a new major release. At LinkedIn, we've done
> > functional
> > > and performance testing against master in the past weeks, and deployed
> > jobs
> > > with the latest build in production. We will continue to test for
> > stability
> > > in the next few weeks.
> > >
> > > Here are the JIRAs of main features that will be included in this
> release
> > > (sorted in chronological order):
> > >
> > > SAMZA-876: Add AvroDataFileHdfsWriter
> > > SAMZA-849: Split-deployment of samza framework
> > > SAMZA-905: Host Affinity - Continuous offset
> > > SAMZA-906: Host Affinity - Minimize task reassignment when container
> > count
> > > changes
> > > SAMZA-924: Disk Quotas: Add disk space monitoring
> > > SAMZA-956: Disk Quotas: Add throttler and disk quota enforcement
> > > SAMZA-680: Inverting JobCoordinator and SamzaAppMaster logic
> > > SAMZA-863: Multithreading support in Samza
> > >
> > > And the JIRAs that I think we should try and get done before 0.11.0 is
> > > released:
> > >
> > > SAMZA-980: Automatically restart job containers on change in upstream
> > > partitioning
> > > SAMZA-988: Update failure testing job configs with job coordinator
> > > properties
> > > SAMZA-997: Add documentation about disk space monitoring
> > > SAMZA-977: User doc for samza multithreading
> > > SAMZA-998: Documentation updates for refactored JC
> > > SAMZA-1000: Hello-samza latest is broken after Samza 10.1 release
> > >
> > >
> > > Here's what I purpose:
> > >
> > > 1. Cut an 0.11.0 release branch.
> > > 2. Work on getting as many of the pending JIRAs done as possible.
> > > 2. Target a release vote on the week of Sep 5.
> > >
> > > Thoughts?
> > >
> > > Xinyu
> > >
> >
>


Re: Debug Samza consumer lag issue

2016-08-24 Thread Jacob Maes
>
> Based on what you have described, the following should be true in 0.10.1:
> event-loop-ns = choose-ns + process-ns + window-ns (if necessary) +
> commit-ns (if necessary)

Yes, plus any time (e.g. due to an unlucky GC at just the right moment)
that happens outside those timers.  And no "if necessary" for window or
commit. There will be a small value for those methods even if they don't do
anything significant because the timer runs even for no-ops

Since you're on 10.1, there's another useful metric
"event-loop-utilization", which represents
(process-ns+window-ns+commit-ns)/event-loop-ns
(as you defined it). In other words, the proportion of time spend working
vs waiting.

On Wed, Aug 24, 2016 at 10:18 AM, David Yu  wrote:

> Great. It all makes sense now.
>
> With the SSD fix, we also upgrade to 0.10.1. So we should see pretty
> consistent process-ns (which we do).
>
> Based on what you have described, the following should be true in 0.10.1:
> event-loop-ns = choose-ns + process-ns + window-ns (if necessary) +
> commit-ns (if necessary)
>
> Is this correct?
> Thanks,
> David
>
> On Wed, Aug 24, 2016 at 11:27 AM Jacob Maes  wrote:
>
> > A couple other notes.
> >
> > Prior to Samza 10.1, the choose-ns was part of process-ns. So when
> > choose-ns and process-ns are both high (around 10,000,000 == 10ms, which
> is
> > the default poll timeout), that usually means the task is caught up. In
> > Samza 10.1 the same is true if ONLY choose-ns is high. process-ns is
> always
> > the time spent in the process() method.
> >
> > Based on the above, the metric numbers you provided after the SSD fix all
> > look reasonable. They're all sub-millisecond and since choose-ns and
> > process-ns are low, it seems that the container is chewing through
> messages
> > at a good rate.
> >
> > So I would conclude that the SSD fix was probably the right one and it
> just
> > took the job a while to catch up to the backlog of messages. Once it
> caught
> > up, the choose-ns and process-ns increased, which is normal when the
> > processor is waiting for new messages.
> >
> > -Jake
> >
> > On Wed, Aug 24, 2016 at 9:05 AM, Jacob Maes 
> wrote:
> >
> > > Hey David,
> > >
> > > Answering the most recent question first, since it's also the easiest.
> > :-)
> > >
> > > Is choose-ns the total number of ms used to choose a message from the
> > input
> > >> stream? What are some gating factors (e.g. serialization?) for this
> > >> metric?
> > >
> > > It's the amount of time the event loop spent getting new messsages for
> > > process(). It includes deserialization time and poll time which we
> added
> > > new metrics for, in Samza 10.1. Typically deserialization time is
> pretty
> > > consistent, so when you see a spike in choose-ns, it's usually because
> > the
> > > event loop is waiting for new messages. The two most common cases when
> > it's
> > > waiting are:
> > > 1. There are no new messages in the topic partition. This is good
> because
> > > it means the processor is caught up.
> > > 2. The consumer is slow and/or the buffer isn't large enough so the
> > > BrokerProxy isn't able to keep enough messages buffered to keep the
> event
> > > loop busy. This is uncommon because the buffer is defaulted to 50,000
> > > messages, which should be plenty. But if it happens, it's bad. To
> control
> > > this behavior, see the following properties in the config table (
> > > http://samza.apache.org/learn/documentation/0.10/jobs/
> > > configuration-table.html)
> > > systems.system-name.samza.fetch.threshold
> > > task.poll.interval.ms
> > >
> > >
> > >
> > > On Wed, Aug 24, 2016 at 8:52 AM, David Yu 
> > wrote:
> > >
> > >> More updates:
> > >> 1. process-envelopes rate finally stabilized and converged. Consumer
> lag
> > >> is
> > >> down to zero.
> > >> 2. avg choose-ns across containers dropped overtime
> > >>  > >> -08-24%2010.46.22.png?dl=0>,
> > >> which I assume is a good thing.
> > >>
> > >> My question:
> > >> Is choose-ns the total number of ms used to choose a message from the
> > >> input
> > >> stream? What are some gating factors (e.g. serialization?) for this
> > >> metric?
> > >>
> > >> Thanks,
> > >> David
> > >>
> > >> On Wed, Aug 24, 2016 at 12:34 AM David Yu 
> > >> wrote:
> > >>
> > >> > Some metric updates:
> > >> > 1. We started seeing some containers with a higher choose-ns
> > >> >  > >> -08-24%2000.26.07.png?dl=0>.
> > >> > Not sure what would be the cause of this.
> > >> > 2. We are seeing very different process-envelopes values across
> > >> containers
> > >> >  > >> -08-24%2000.21.05.png?dl=0>
> > >> > .
> > >> >
> > >> >
> > >> >
> > >> > On Tue, Aug 23, 2016 at 5:56 PM David Yu 
> > >> 

Re: [DISCUSS] Samza 0.11.0 release

2016-08-24 Thread Yi Pan
Hi, Nicolas,

Could you explain to me why Samza is blocking you from upgrading your Kafka
brokers to 0.10? At LinkedIn, we are running Samza 0.10 w/ Kafka 0.10
brokers. This is a valid combination since Kafka 0.10 brokers should be
backward compatible w/ 0.8.2 clients (which is the version Samza uses). I
would recommend that you try to upgrade the Kafka brokers w/ Samza 0.10. If
there is any issues you see, we will help to investigate and fix.

Meanwhile, we are planning to schedule Samza releases more regularly and
frequent in terms of time. Hence, although SAMZA-855 is not planned in
0.11, it will be included in the immediate version after that.

Let me know if you have any further concerns.

On Mon, Aug 22, 2016 at 2:48 PM, Nicolas Maquet  wrote:

> Hi all,
>
> Would it be possible to consider including the following in the 0.11.0
> target release?
>
> SAMZA-855: Upgrade Samza's Kafka client version to 0.10.0.0
>
> We are looking at upgrading our Kafka cluster to 0.10 and Samza is
> currently preventing us from doing so.
>
> Regards,
>
> Nicolas Maquet
>
>
>
> 2016-08-23 9:06 GMT+12:00 xinyu liu :
>
> > Hi, All,
> >
> > There have been quite a lot of new features added to master since 0.10
> > release to warrant a new major release. At LinkedIn, we've done
> functional
> > and performance testing against master in the past weeks, and deployed
> jobs
> > with the latest build in production. We will continue to test for
> stability
> > in the next few weeks.
> >
> > Here are the JIRAs of main features that will be included in this release
> > (sorted in chronological order):
> >
> > SAMZA-876: Add AvroDataFileHdfsWriter
> > SAMZA-849: Split-deployment of samza framework
> > SAMZA-905: Host Affinity - Continuous offset
> > SAMZA-906: Host Affinity - Minimize task reassignment when container
> count
> > changes
> > SAMZA-924: Disk Quotas: Add disk space monitoring
> > SAMZA-956: Disk Quotas: Add throttler and disk quota enforcement
> > SAMZA-680: Inverting JobCoordinator and SamzaAppMaster logic
> > SAMZA-863: Multithreading support in Samza
> >
> > And the JIRAs that I think we should try and get done before 0.11.0 is
> > released:
> >
> > SAMZA-980: Automatically restart job containers on change in upstream
> > partitioning
> > SAMZA-988: Update failure testing job configs with job coordinator
> > properties
> > SAMZA-997: Add documentation about disk space monitoring
> > SAMZA-977: User doc for samza multithreading
> > SAMZA-998: Documentation updates for refactored JC
> > SAMZA-1000: Hello-samza latest is broken after Samza 10.1 release
> >
> >
> > Here's what I purpose:
> >
> > 1. Cut an 0.11.0 release branch.
> > 2. Work on getting as many of the pending JIRAs done as possible.
> > 2. Target a release vote on the week of Sep 5.
> >
> > Thoughts?
> >
> > Xinyu
> >
>


Re: Debug Samza consumer lag issue

2016-08-24 Thread David Yu
Great. It all makes sense now.

With the SSD fix, we also upgrade to 0.10.1. So we should see pretty
consistent process-ns (which we do).

Based on what you have described, the following should be true in 0.10.1:
event-loop-ns = choose-ns + process-ns + window-ns (if necessary) +
commit-ns (if necessary)

Is this correct?
Thanks,
David

On Wed, Aug 24, 2016 at 11:27 AM Jacob Maes  wrote:

> A couple other notes.
>
> Prior to Samza 10.1, the choose-ns was part of process-ns. So when
> choose-ns and process-ns are both high (around 10,000,000 == 10ms, which is
> the default poll timeout), that usually means the task is caught up. In
> Samza 10.1 the same is true if ONLY choose-ns is high. process-ns is always
> the time spent in the process() method.
>
> Based on the above, the metric numbers you provided after the SSD fix all
> look reasonable. They're all sub-millisecond and since choose-ns and
> process-ns are low, it seems that the container is chewing through messages
> at a good rate.
>
> So I would conclude that the SSD fix was probably the right one and it just
> took the job a while to catch up to the backlog of messages. Once it caught
> up, the choose-ns and process-ns increased, which is normal when the
> processor is waiting for new messages.
>
> -Jake
>
> On Wed, Aug 24, 2016 at 9:05 AM, Jacob Maes  wrote:
>
> > Hey David,
> >
> > Answering the most recent question first, since it's also the easiest.
> :-)
> >
> > Is choose-ns the total number of ms used to choose a message from the
> input
> >> stream? What are some gating factors (e.g. serialization?) for this
> >> metric?
> >
> > It's the amount of time the event loop spent getting new messsages for
> > process(). It includes deserialization time and poll time which we added
> > new metrics for, in Samza 10.1. Typically deserialization time is pretty
> > consistent, so when you see a spike in choose-ns, it's usually because
> the
> > event loop is waiting for new messages. The two most common cases when
> it's
> > waiting are:
> > 1. There are no new messages in the topic partition. This is good because
> > it means the processor is caught up.
> > 2. The consumer is slow and/or the buffer isn't large enough so the
> > BrokerProxy isn't able to keep enough messages buffered to keep the event
> > loop busy. This is uncommon because the buffer is defaulted to 50,000
> > messages, which should be plenty. But if it happens, it's bad. To control
> > this behavior, see the following properties in the config table (
> > http://samza.apache.org/learn/documentation/0.10/jobs/
> > configuration-table.html)
> > systems.system-name.samza.fetch.threshold
> > task.poll.interval.ms
> >
> >
> >
> > On Wed, Aug 24, 2016 at 8:52 AM, David Yu 
> wrote:
> >
> >> More updates:
> >> 1. process-envelopes rate finally stabilized and converged. Consumer lag
> >> is
> >> down to zero.
> >> 2. avg choose-ns across containers dropped overtime
> >>  >> -08-24%2010.46.22.png?dl=0>,
> >> which I assume is a good thing.
> >>
> >> My question:
> >> Is choose-ns the total number of ms used to choose a message from the
> >> input
> >> stream? What are some gating factors (e.g. serialization?) for this
> >> metric?
> >>
> >> Thanks,
> >> David
> >>
> >> On Wed, Aug 24, 2016 at 12:34 AM David Yu 
> >> wrote:
> >>
> >> > Some metric updates:
> >> > 1. We started seeing some containers with a higher choose-ns
> >> >  >> -08-24%2000.26.07.png?dl=0>.
> >> > Not sure what would be the cause of this.
> >> > 2. We are seeing very different process-envelopes values across
> >> containers
> >> >  >> -08-24%2000.21.05.png?dl=0>
> >> > .
> >> >
> >> >
> >> >
> >> > On Tue, Aug 23, 2016 at 5:56 PM David Yu 
> >> wrote:
> >> >
> >> >> Hi, Jake,
> >> >>
> >> >> Thanks for your suggestions. Some of my answers inline:
> >> >>
> >> >> 1.
> >> >> On Tue, Aug 23, 2016 at 11:53 AM Jacob Maes 
> >> wrote:
> >> >>
> >> >>> Hey David,
> >> >>>
> >> >>> A few initial thoughts/questions:
> >> >>>
> >> >>>
> >> >>>1. Is this job using RocksDB to store the aggregations? If so, is
> >> it
> >> >>>running on a machine with SSDs? We've seen a few performance
> issues
> >> >>> related
> >> >>>to RocksDB.
> >> >>>   1. Not running on SSD causes slowness on disk
> >> >>
> >> >>  - [David] This definitely pointed me to the right direction in my
> >> >> investigation. We do use RocksDB and just realized that our YARN
> >> cluster is
> >> >> using c3.xlarge EC2 instances and is using a mixture of EBS and local
> >> SSD
> >> >> storage. After digging around, we noticed that some containers were
> >> >> persisting their KV stores in SSD while others were using EBS. We
> just
> 

Re: Debug Samza consumer lag issue

2016-08-24 Thread Jacob Maes
A couple other notes.

Prior to Samza 10.1, the choose-ns was part of process-ns. So when
choose-ns and process-ns are both high (around 10,000,000 == 10ms, which is
the default poll timeout), that usually means the task is caught up. In
Samza 10.1 the same is true if ONLY choose-ns is high. process-ns is always
the time spent in the process() method.

Based on the above, the metric numbers you provided after the SSD fix all
look reasonable. They're all sub-millisecond and since choose-ns and
process-ns are low, it seems that the container is chewing through messages
at a good rate.

So I would conclude that the SSD fix was probably the right one and it just
took the job a while to catch up to the backlog of messages. Once it caught
up, the choose-ns and process-ns increased, which is normal when the
processor is waiting for new messages.

-Jake

On Wed, Aug 24, 2016 at 9:05 AM, Jacob Maes  wrote:

> Hey David,
>
> Answering the most recent question first, since it's also the easiest. :-)
>
> Is choose-ns the total number of ms used to choose a message from the input
>> stream? What are some gating factors (e.g. serialization?) for this
>> metric?
>
> It's the amount of time the event loop spent getting new messsages for
> process(). It includes deserialization time and poll time which we added
> new metrics for, in Samza 10.1. Typically deserialization time is pretty
> consistent, so when you see a spike in choose-ns, it's usually because the
> event loop is waiting for new messages. The two most common cases when it's
> waiting are:
> 1. There are no new messages in the topic partition. This is good because
> it means the processor is caught up.
> 2. The consumer is slow and/or the buffer isn't large enough so the
> BrokerProxy isn't able to keep enough messages buffered to keep the event
> loop busy. This is uncommon because the buffer is defaulted to 50,000
> messages, which should be plenty. But if it happens, it's bad. To control
> this behavior, see the following properties in the config table (
> http://samza.apache.org/learn/documentation/0.10/jobs/
> configuration-table.html)
> systems.system-name.samza.fetch.threshold
> task.poll.interval.ms
>
>
>
> On Wed, Aug 24, 2016 at 8:52 AM, David Yu  wrote:
>
>> More updates:
>> 1. process-envelopes rate finally stabilized and converged. Consumer lag
>> is
>> down to zero.
>> 2. avg choose-ns across containers dropped overtime
>> > -08-24%2010.46.22.png?dl=0>,
>> which I assume is a good thing.
>>
>> My question:
>> Is choose-ns the total number of ms used to choose a message from the
>> input
>> stream? What are some gating factors (e.g. serialization?) for this
>> metric?
>>
>> Thanks,
>> David
>>
>> On Wed, Aug 24, 2016 at 12:34 AM David Yu 
>> wrote:
>>
>> > Some metric updates:
>> > 1. We started seeing some containers with a higher choose-ns
>> > > -08-24%2000.26.07.png?dl=0>.
>> > Not sure what would be the cause of this.
>> > 2. We are seeing very different process-envelopes values across
>> containers
>> > > -08-24%2000.21.05.png?dl=0>
>> > .
>> >
>> >
>> >
>> > On Tue, Aug 23, 2016 at 5:56 PM David Yu 
>> wrote:
>> >
>> >> Hi, Jake,
>> >>
>> >> Thanks for your suggestions. Some of my answers inline:
>> >>
>> >> 1.
>> >> On Tue, Aug 23, 2016 at 11:53 AM Jacob Maes 
>> wrote:
>> >>
>> >>> Hey David,
>> >>>
>> >>> A few initial thoughts/questions:
>> >>>
>> >>>
>> >>>1. Is this job using RocksDB to store the aggregations? If so, is
>> it
>> >>>running on a machine with SSDs? We've seen a few performance issues
>> >>> related
>> >>>to RocksDB.
>> >>>   1. Not running on SSD causes slowness on disk
>> >>
>> >>  - [David] This definitely pointed me to the right direction in my
>> >> investigation. We do use RocksDB and just realized that our YARN
>> cluster is
>> >> using c3.xlarge EC2 instances and is using a mixture of EBS and local
>> SSD
>> >> storage. After digging around, we noticed that some containers were
>> >> persisting their KV stores in SSD while others were using EBS. We just
>> >> updated our YARN config to use SSD only before redeployed our jobs. So
>> far
>> >> everything looks good. Will report back on the performance update.
>> >>
>> >>>   2. Prior to Samza 10.1, samza would excessively flush the store
>> to
>> >>>   disk, causing RocksDB compaction issues (stalls) - SAMZA-957
>> >>>
>> >> - [David] We did notice that the log cleaner thread died on one of our
>> >> brokers. Not sure if this was the same problem you pointed out.
>> Following
>> >> errors are logged:
>> >>
>> >> 2016-08-15 10:00:56,475 ERROR kafka.log.LogCleaner:
>> >> [kafka-log-cleaner-thread-0], Error due to
>> >>
>> >> 

Re: Debug Samza consumer lag issue

2016-08-24 Thread Jacob Maes
Hey David,

Answering the most recent question first, since it's also the easiest. :-)

Is choose-ns the total number of ms used to choose a message from the input
> stream? What are some gating factors (e.g. serialization?) for this metric?

It's the amount of time the event loop spent getting new messsages for
process(). It includes deserialization time and poll time which we added
new metrics for, in Samza 10.1. Typically deserialization time is pretty
consistent, so when you see a spike in choose-ns, it's usually because the
event loop is waiting for new messages. The two most common cases when it's
waiting are:
1. There are no new messages in the topic partition. This is good because
it means the processor is caught up.
2. The consumer is slow and/or the buffer isn't large enough so the
BrokerProxy isn't able to keep enough messages buffered to keep the event
loop busy. This is uncommon because the buffer is defaulted to 50,000
messages, which should be plenty. But if it happens, it's bad. To control
this behavior, see the following properties in the config table (
http://samza.apache.org/learn/documentation/0.10/jobs/configuration-table.html
)
systems.system-name.samza.fetch.threshold
task.poll.interval.ms



On Wed, Aug 24, 2016 at 8:52 AM, David Yu  wrote:

> More updates:
> 1. process-envelopes rate finally stabilized and converged. Consumer lag is
> down to zero.
> 2. avg choose-ns across containers dropped overtime
>  202016-08-24%2010.46.22.png?dl=0>,
> which I assume is a good thing.
>
> My question:
> Is choose-ns the total number of ms used to choose a message from the input
> stream? What are some gating factors (e.g. serialization?) for this metric?
>
> Thanks,
> David
>
> On Wed, Aug 24, 2016 at 12:34 AM David Yu  wrote:
>
> > Some metric updates:
> > 1. We started seeing some containers with a higher choose-ns
> >  202016-08-24%2000.26.07.png?dl=0>.
> > Not sure what would be the cause of this.
> > 2. We are seeing very different process-envelopes values across
> containers
> >  202016-08-24%2000.21.05.png?dl=0>
> > .
> >
> >
> >
> > On Tue, Aug 23, 2016 at 5:56 PM David Yu 
> wrote:
> >
> >> Hi, Jake,
> >>
> >> Thanks for your suggestions. Some of my answers inline:
> >>
> >> 1.
> >> On Tue, Aug 23, 2016 at 11:53 AM Jacob Maes 
> wrote:
> >>
> >>> Hey David,
> >>>
> >>> A few initial thoughts/questions:
> >>>
> >>>
> >>>1. Is this job using RocksDB to store the aggregations? If so, is it
> >>>running on a machine with SSDs? We've seen a few performance issues
> >>> related
> >>>to RocksDB.
> >>>   1. Not running on SSD causes slowness on disk
> >>
> >>  - [David] This definitely pointed me to the right direction in my
> >> investigation. We do use RocksDB and just realized that our YARN
> cluster is
> >> using c3.xlarge EC2 instances and is using a mixture of EBS and local
> SSD
> >> storage. After digging around, we noticed that some containers were
> >> persisting their KV stores in SSD while others were using EBS. We just
> >> updated our YARN config to use SSD only before redeployed our jobs. So
> far
> >> everything looks good. Will report back on the performance update.
> >>
> >>>   2. Prior to Samza 10.1, samza would excessively flush the store
> to
> >>>   disk, causing RocksDB compaction issues (stalls) - SAMZA-957
> >>>
> >> - [David] We did notice that the log cleaner thread died on one of our
> >> brokers. Not sure if this was the same problem you pointed out.
> Following
> >> errors are logged:
> >>
> >> 2016-08-15 10:00:56,475 ERROR kafka.log.LogCleaner:
> >> [kafka-log-cleaner-thread-0], Error due to
> >>
> >> java.lang.IllegalArgumentException: requirement failed: 5865800
> messages
> >> in segment session-store-2.0-tickets-changelog-9/
> 09548937.log
> >> but offset map can fit only 5033164. You can increase
> >> log.cleaner.dedupe.buffer.size or decrease log.cleaner.threads
> >>
> >> at scala.Predef$.require(Predef.scala:219)
> >>
> >> We had to cleanup the changelog topic and restart the broker to bring
> >> back the cleaner thread.
> >>
> >>>   3. When the RocksDB store is used as a queue, the iterator can
> >>> suffer
> >>>   performance issues due to RocksDBs tombstoning. (
> >>>
> >>> https://github.com/facebook/rocksdb/wiki/Implement-Queue-
> Service-Using-RocksDB
> >>>   )
> >>>
> >> - [David] We use RocksDB to keep track of opening sessions and use
> >> sessionId (a random hash) as the key. In that sense, this does not sound
> >> like a queue. But we do iterate and delete closed sessions during
> windowing
> >> on a minute by minute basis.
> >>
> >>2. Is the "messages-behind-high-watermark" metric non-zero?
> >>>
> >> -[David] Yes.
> >>
> 

Re: Debug Samza consumer lag issue

2016-08-24 Thread David Yu
More updates:
1. process-envelopes rate finally stabilized and converged. Consumer lag is
down to zero.
2. avg choose-ns across containers dropped overtime
,
which I assume is a good thing.

My question:
Is choose-ns the total number of ms used to choose a message from the input
stream? What are some gating factors (e.g. serialization?) for this metric?

Thanks,
David

On Wed, Aug 24, 2016 at 12:34 AM David Yu  wrote:

> Some metric updates:
> 1. We started seeing some containers with a higher choose-ns
> .
> Not sure what would be the cause of this.
> 2. We are seeing very different process-envelopes values across containers
> 
> .
>
>
>
> On Tue, Aug 23, 2016 at 5:56 PM David Yu  wrote:
>
>> Hi, Jake,
>>
>> Thanks for your suggestions. Some of my answers inline:
>>
>> 1.
>> On Tue, Aug 23, 2016 at 11:53 AM Jacob Maes  wrote:
>>
>>> Hey David,
>>>
>>> A few initial thoughts/questions:
>>>
>>>
>>>1. Is this job using RocksDB to store the aggregations? If so, is it
>>>running on a machine with SSDs? We've seen a few performance issues
>>> related
>>>to RocksDB.
>>>   1. Not running on SSD causes slowness on disk
>>
>>  - [David] This definitely pointed me to the right direction in my
>> investigation. We do use RocksDB and just realized that our YARN cluster is
>> using c3.xlarge EC2 instances and is using a mixture of EBS and local SSD
>> storage. After digging around, we noticed that some containers were
>> persisting their KV stores in SSD while others were using EBS. We just
>> updated our YARN config to use SSD only before redeployed our jobs. So far
>> everything looks good. Will report back on the performance update.
>>
>>>   2. Prior to Samza 10.1, samza would excessively flush the store to
>>>   disk, causing RocksDB compaction issues (stalls) - SAMZA-957
>>>
>> - [David] We did notice that the log cleaner thread died on one of our
>> brokers. Not sure if this was the same problem you pointed out. Following
>> errors are logged:
>>
>> 2016-08-15 10:00:56,475 ERROR kafka.log.LogCleaner:
>> [kafka-log-cleaner-thread-0], Error due to
>>
>> java.lang.IllegalArgumentException: requirement failed: 5865800 messages
>> in segment session-store-2.0-tickets-changelog-9/09548937.log
>> but offset map can fit only 5033164. You can increase
>> log.cleaner.dedupe.buffer.size or decrease log.cleaner.threads
>>
>> at scala.Predef$.require(Predef.scala:219)
>>
>> We had to cleanup the changelog topic and restart the broker to bring
>> back the cleaner thread.
>>
>>>   3. When the RocksDB store is used as a queue, the iterator can
>>> suffer
>>>   performance issues due to RocksDBs tombstoning. (
>>>
>>> https://github.com/facebook/rocksdb/wiki/Implement-Queue-Service-Using-RocksDB
>>>   )
>>>
>> - [David] We use RocksDB to keep track of opening sessions and use
>> sessionId (a random hash) as the key. In that sense, this does not sound
>> like a queue. But we do iterate and delete closed sessions during windowing
>> on a minute by minute basis.
>>
>>2. Is the "messages-behind-high-watermark" metric non-zero?
>>>
>> -[David] Yes.
>>
>>>3. The SamzaContainerMetrics might be useful too. Particularly
>>>"choose-ns" and "commit-ns"
>>>
>> -[David] We are seeing the following from one of the containers (after
>> the SSD fix mentioned above):
>> choose-ns=61353
>> commit-ns=306328 (what does this metric indicate? Is this in ms?)
>> process-ns=248260
>> window-ns=150717
>>
>>>4. The only time I've personally seen slowness on the producer is if
>>>it's configured for acks="all". What is the producer config from the
>>> log?
>>>
>> - [David] We did not override this. So should be the default value (1?).
>>
>>5. The window time is high, but since it's only called once per minute,
>>>it looks like it only represents 1% of the event loop utilization. So
>>> I
>>>don't think that's a smoking gun.
>>>
>>> -Jake
>>>
>>> On Tue, Aug 23, 2016 at 9:18 AM, David Yu 
>>> wrote:
>>>
>>> > Dear Samza guys,
>>> >
>>> > We are here for some debugging suggestions on our Samza job (0.10.0),
>>> which
>>> > lags behind on consumption after running for a couple of hours,
>>> regardless
>>> > of the number of containers allocated (currently 5).
>>> >
>>> > Briefly, the job aggregates events into sessions (in Avro) during
>>> process()
>>> > and emits snapshots of the open sessions using window() every minute.
>>> This
>>> > graph
>>> > >> > 202016-08-23%2010.33.16.png?dl=0>
>>> > shows
>>> > you where processing started to lag (red is the number of