Re: Samza container hang on exception

2016-08-31 Thread 李斯宁
If you cannot see the attachment, please try
http://note.youdao.com/noteshare?id=56b826c24af47a9fdb600490ce788710

On Thu, Sep 1, 2016 at 1:50 AM, Chinmay Soman 
wrote:

> Sorry dont see anything in the attachment. Can you please re-attach and
> re-send ?
>
> On Wed, Aug 31, 2016 at 3:27 AM, 李斯宁  wrote:
>
> > It seems upgrading does not solve the problem. All task hang in today's
> > "rush hour".
> > I attached log and jstack.
> >
> > The SAMZA-911 want to fix by stopping the process if failed too much
> > times.  But the process is still there and hanging.
> >
> > On Mon, Aug 22, 2016 at 1:14 PM, 李斯宁  wrote:
> >
> >> Thanks so much, I'll try.
> >>
> >> On Mon, Aug 22, 2016 at 6:26 AM, Yi Pan  wrote:
> >>
> >>> Hi, Sining,
> >>>
> >>> This is a known bug that is fixed in 0.10.1 (SAMZA-911). Please try to
> >>> upgrade to 0.10.1.
> >>>
> >>> Thanks!
> >>>
> >>> -Yi
> >>>
> >>> On Sun, Aug 21, 2016 at 5:55 AM, 李斯宁  wrote:
> >>>
> >>> > I have tried restart every kafka server.  The container did not
> >>> recover.
> >>> >
> >>> > log have something below:
> >>> >
> >>> > 2016-08-21 20:08:21 [WARN ](o.a.s.s.k.KafkaSystemProducer  :66 )
> >>> > Retrying send messsage due to RetriableException -
> >>> > org.apache.kafka.common.errors.NotLeaderForPartitionException: This
> >>> server
> >>> > is not the leader for that topic-partition.. Turn on debugging to
> get a
> >>> > full stack trace
> >>> > 2016-08-21 20:08:22 [WARN ](o.a.k.c.p.i.Sender :257)
> >>> Got
> >>> > error produce response with correlation id 4364 on topic-partition
> >>> > samzaMetrics-5, retrying (0 attempts left). Error:
> >>> NOT_LEADER_FOR_PARTITION
> >>> > 2016-08-21 20:08:23 [WARN ](o.a.k.c.p.i.Sender :257)
> >>> Got
> >>> > error produce response with correlation id 4367 on topic-partition
> >>> > samzaMetrics-5, retrying (29 attempts left). Error:
> >>> > NOT_LEADER_FOR_PARTITION
> >>> >
> >>> >
> >>> > jstack shows:
> >>> >
> >>> > "main" #1 prio=5 os_prio=0 tid=0x7f1ba401a000 nid=0x1a621 waiting
> >>> on
> >>> > condition [0x7f1bab976000]
> >>> > java.lang.Thread.State: TIMED_WAITING (sleeping)
> >>> > at java.lang.Thread.sleep(Native Method)
> >>> > at
> >>> > org.apache.samza.util.ExponentialSleepStrategy$RetryLoopState.sleep(
> >>> > ExponentialSleepStrategy.scala:105)
> >>> > at
> >>> > org.apache.samza.util.ExponentialSleepStrategy.run(
> >>> > ExponentialSleepStrategy.scala:91)
> >>> > at
> >>> > org.apache.samza.system.kafka.KafkaSystemProducer.send(
> >>> > KafkaSystemProducer.scala:91)
> >>> > at org.apache.samza.system.SystemProducers.send(SystemProducers
> >>> .scala:87)
> >>> > at
> >>> > org.apache.samza.task.TaskInstanceCollector.send(
> >>> > TaskInstanceCollector.scala:61)
> >>> > at toolbox.analyzer2.realtime.CommonWriter.write(
> CommonWriter.java:50)
> >>> > at toolbox.analyzer2.realtime.InitTask.lambda$process$0(InitTas
> >>> k.java:110)
> >>> > at toolbox.analyzer2.realtime.InitTask$$Lambda$4/938405008.emit
> >>> (Unknown
> >>> > Source)
> >>> > at
> >>> > toolbox.analyzer2.util.core.TransToKvProcessor.process(
> >>> > TransToKvProcessor.java:146)
> >>> > at toolbox.analyzer2.realtime.InitTask$2.emit(InitTask.java:119)
> >>> > at toolbox.analyzer2.util.core.JsonExpander.expand(JsonExpander
> >>> .java:47)
> >>> > at toolbox.analyzer2.realtime.InitTask.process(InitTask.java:128)
> >>> > at
> >>> > org.apache.samza.container.TaskInstance$$anonfun$process$
> >>> > 1.apply$mcV$sp(TaskInstance.scala:150)
> >>> > at
> >>> > org.apache.samza.container.TaskInstanceExceptionHandler.maybeHandle(
> >>> > TaskInstanceExceptionHandler.scala:54)
> >>> > at org.apache.samza.container.TaskInstance.process(TaskInstance
> >>> .scala:149)
> >>> > at
> >>> > org.apache.samza.container.RunLoop$$anonfun$process$1$$
> >>> > anonfun$apply$mcVJ$sp$2.apply(RunLoop.scala:122)
> >>> > at
> >>> > org.apache.samza.container.RunLoop$$anonfun$process$1$$
> >>> > anonfun$apply$mcVJ$sp$2.apply(RunLoop.scala:119)
> >>> > at scala.collection.immutable.List.foreach(List.scala:318)
> >>> > at
> >>> > org.apache.samza.container.RunLoop$$anonfun$process$1.
> >>> > apply$mcVJ$sp(RunLoop.scala:118)
> >>> > at
> >>> > org.apache.samza.util.TimerUtils$class.updateTimerAndGetDuration(
> >>> > TimerUtils.scala:51)
> >>> > at
> >>> > org.apache.samza.container.RunLoop.updateTimerAndGetDuration(
> >>> > RunLoop.scala:35)
> >>> > at org.apache.samza.container.RunLoop.process(RunLoop.scala:106)
> >>> > at org.apache.samza.container.RunLoop.run(RunLoop.scala:74)
> >>> > at org.apache.samza.container.SamzaContainer.run(SamzaContainer
> >>> .scala:553)
> >>>
> >>> > at
> >>> > org.apache.samza.container.SamzaContainer$.safeMain(
> >>> > SamzaContainer.scala:92)
> >>> > at org.apache.samza.container.SamzaContainer$.main(
> >>> > SamzaContainer.scala:66)
> >>> > at org.apache.samza.container.SamzaContainer.main(SamzaContaine
> >>> r.scala)
> >>> >
> >>> > May be partition leader has changed in rush hour and metrics writing
> >

Re: Review Request 51346: SAMZA-974 - Support finite datasources in Samza that have a notion of End-Of-Stream

2016-08-31 Thread Jagadish Venkatraman


> On Aug. 26, 2016, 7:51 p.m., Chris Pettitt wrote:
> > samza-api/src/main/java/org/apache/samza/system/IncomingMessageEnvelope.java,
> >  line 31
> > 
> >
> > How likely are we to collide with this? Thats the problem with using a 
> > user definable token. I see two options:
> > 
> > If null is not supported (and thus not usable by user-defined 
> > implementations) I would use that and mark it as reserved.
> > 
> > Otherwise I would probably do something more to make this unlikely to 
> > collide (call me paranoid). Something like use a NUL byte as the first 
> > character and document that offsets with such an encoding are reserved. I 
> > would also check that this sort of string doesn't make it to user code in 
> > the task.
> 
> Jagadish Venkatraman wrote:
> Returning a null is not possible (because a null offset could mean that 
> we don't have messages at this moment instead of meaning end-of-stream. While 
> we should poll again when a consumer returns null, we should not for the 
> END_OF_STREAM case.) Hence, I was hoping to use a special offset.
> 
> I like your suggestion of using a NUL byte as the first character (and 
> calling that out). I'll update the RB with that.

There seem to be inter-operability versions of strings in Java vs strings in 
scala (esp - around handling NUL bytes in the string- Scala appears to strip 
out NUL bytes). Hence, I've used a "SAMZA_INTERNAL_END_OF_STREAM" as a string. 
Let me know if there's a better way to handle this.


- Jagadish


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/51346/#review147015
---


On Aug. 30, 2016, 12:32 a.m., Jagadish Venkatraman wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/51346/
> ---
> 
> (Updated Aug. 30, 2016, 12:32 a.m.)
> 
> 
> Review request for samza, Boris Shkolnik, Chris Pettitt, Yi Pan (Data 
> Infrastructure), Navina Ramesh, and Xinyu Liu.
> 
> 
> Repository: samza
> 
> 
> Description
> ---
> 
> Samza currently works with unbounded data sources (kafka streams). However, 
> for bounded data sources like HDFS files, snapshot files which are not 
> infinite, we need a notion of 'end-of-stream'. 
> 
> This is a step towards realizing a 'finite' Samza job that terminates once 
> data processing is complete.(as opposed to an infinite stream job that keeps 
> running)  
> 
> RB changes:
> - New interface for EndOfStreamListener
> - New 'end-of-stream' state in the state-machine of AsyncStreamTask 
> (Invariant: When end-of-stream is reached there are no buffered messages, 
> no-callbacks are in-flight and no-window or commit call shall be in progress)
> - Changes to allow clean shut-downs of the tasks/container/job for 
> end-of-stream.
> 
> Design Doc and Implementation Notes: 
> https://issues.apache.org/jira/secure/attachment/12825119/ProposalforEndofStreaminSamza.pdf
> 
> 
> Diffs
> -
> 
>   build.gradle 1d4eb74b1294318db8454631ddd0901596121ab2 
>   
> samza-api/src/main/java/org/apache/samza/system/IncomingMessageEnvelope.java 
> cc860cf7eb4d514736913c1dceaa80534b61d71a 
>   
> samza-api/src/main/java/org/apache/samza/system/SystemStreamPartitionIterator.java
>  a8f858aa7e4f4ce436f450cf439fe1a102983c64 
>   samza-api/src/main/java/org/apache/samza/task/EndOfStreamListenerTask.java 
> PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java 
> a510bb0c5914c772438930d27f100b4d360c1296 
>   samza-core/src/main/java/org/apache/samza/task/AsyncStreamTaskAdapter.java 
> 1fc645673b7547b642830df5639c0b4fcd11c0d5 
>   samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala 
> 89f6857014489aba2db4129bc2e26dfec5b10652 
>   samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java 
> ca913dea79fecbcecdfd1010dc794318055c5764 
> 
> Diff: https://reviews.apache.org/r/51346/diff/
> 
> 
> Testing
> ---
> 
> Unit tests to test scenarios for inorder processing, out-of-order processing 
> and commit semantics.
> 
> 
> Thanks,
> 
> Jagadish Venkatraman
> 
>



Re: Review Request 51516: SAMZA-702: Document the significance of all the different metrics emitted by Samza out of the box

2016-08-31 Thread Navina Ramesh

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/51516/#review147523
---


Fix it, then Ship it!




Overall, the patch looks pretty good! The only roadblock I see is that with 
SAMZA-680, we introduced "ContainerProcessManagerMetrics" which would replace 
"SamzaAppMasterMetrics". This is kind of problematic because it seems to expose 
2 copies of essentially the same metrics - under different "groups" or "class 
names". I am not sure why we have 2 copies of the same metric? 
@vjagadish : why do we have 2 copies of these metrics? What is the plan going 
forward? 

Otherwise, +1 for this patch. Thanks a lot!


docs/learn/documentation/versioned/container/metrics-table.html (line 538)


With SAMZA-837, I think we removed the usage of this metric and only seems 
to be used as a dummy check in a unit test. Can you please mark this as 
deprecated and remove it from the unit test class?



docs/learn/documentation/versioned/container/metrics-table.html (line 795)


did you mean per-system here?


- Navina Ramesh


On Aug. 30, 2016, 7:36 a.m., Branislav Cogic wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/51516/
> ---
> 
> (Updated Aug. 30, 2016, 7:36 a.m.)
> 
> 
> Review request for samza.
> 
> 
> Bugs: SAMZA-702
> https://issues.apache.org/jira/browse/SAMZA-702
> 
> 
> Repository: samza
> 
> 
> Description
> ---
> 
> All the metrics documented in a metrics-table.
> 
> Few counters and timer removed because they are not used:
> "send-calls" counter and "chooser-update-ns" timer from SamzaContainerMetrics
> "batch-resets" counter from BootstrapingChooserMetrics
> 
> 
> Diffs
> -
> 
>   docs/_layouts/default.html 7beb734ddeaecb7a6369f7d2a5d4e0c67655269c 
>   docs/learn/documentation/versioned/container/metrics-table.html 
> PRE-CREATION 
>   docs/learn/documentation/versioned/container/metrics.md 
> b053b792097400536ea385cb3db720f6f71da017 
>   
> samza-core/src/main/scala/org/apache/samza/container/SamzaContainerMetrics.scala
>  1e7515e8e8eb5ff2f769bea3184ce49308bada9a 
>   
> samza-core/src/main/scala/org/apache/samza/system/chooser/BootstrappingChooser.scala
>  1cd8e0637e2192460a9e9fe078c735444be8eb97 
> 
> Diff: https://reviews.apache.org/r/51516/diff/
> 
> 
> Testing
> ---
> 
> Site ran locally using local-site-test.sh
> 
> 
> Thanks,
> 
> Branislav Cogic
> 
>



Re: Review Request 51516: SAMZA-702: Document the significance of all the different metrics emitted by Samza out of the box

2016-08-31 Thread Navina Ramesh

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/51516/#review147518
---




docs/learn/documentation/versioned/container/metrics-table.html (line 367)


I think this metric just represents the number of partitions of a 
particular system that were empty and were provided to the consumer to poll for 
new messages. I don't think the correlation between 
$system-ssp-fetches-per-poll and $system-polls holds. Please correct me if I 
have misunderstood this.



docs/learn/documentation/versioned/container/metrics-table.html (line 375)


nit: number of messages that were chosen (by the MessageChooser) for a 
particular system stream partition



docs/learn/documentation/versioned/container/metrics-table.html (line 379)


nit: Average time spent polling all underlying systems for new messages (in 
nanoseconds)


- Navina Ramesh


On Aug. 30, 2016, 7:36 a.m., Branislav Cogic wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/51516/
> ---
> 
> (Updated Aug. 30, 2016, 7:36 a.m.)
> 
> 
> Review request for samza.
> 
> 
> Bugs: SAMZA-702
> https://issues.apache.org/jira/browse/SAMZA-702
> 
> 
> Repository: samza
> 
> 
> Description
> ---
> 
> All the metrics documented in a metrics-table.
> 
> Few counters and timer removed because they are not used:
> "send-calls" counter and "chooser-update-ns" timer from SamzaContainerMetrics
> "batch-resets" counter from BootstrapingChooserMetrics
> 
> 
> Diffs
> -
> 
>   docs/_layouts/default.html 7beb734ddeaecb7a6369f7d2a5d4e0c67655269c 
>   docs/learn/documentation/versioned/container/metrics-table.html 
> PRE-CREATION 
>   docs/learn/documentation/versioned/container/metrics.md 
> b053b792097400536ea385cb3db720f6f71da017 
>   
> samza-core/src/main/scala/org/apache/samza/container/SamzaContainerMetrics.scala
>  1e7515e8e8eb5ff2f769bea3184ce49308bada9a 
>   
> samza-core/src/main/scala/org/apache/samza/system/chooser/BootstrappingChooser.scala
>  1cd8e0637e2192460a9e9fe078c735444be8eb97 
> 
> Diff: https://reviews.apache.org/r/51516/diff/
> 
> 
> Testing
> ---
> 
> Site ran locally using local-site-test.sh
> 
> 
> Thanks,
> 
> Branislav Cogic
> 
>



Re: Review Request 51516: SAMZA-702: Document the significance of all the different metrics emitted by Samza out of the box

2016-08-31 Thread Navina Ramesh

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/51516/#review147506
---



Still going through the list. Thanks for compiling this. This is awesome work 
and super-useful for the Samza community!


docs/learn/documentation/versioned/container/metrics-table.html (line 231)


Can you rephrase the description as "Number of commit method calls at 
TaskInstance level" so as to clearly distinguish it from Container's commit 
method call metrics? I understand it belongs to a different group. It will be 
better to distinguish them clearly.

Or you can add a subtitle common to all metrics under "TaskInstanceMetrics" 
implying that the following apply to each TaskInstance



docs/learn/documentation/versioned/container/metrics-table.html (line 243)


nit: "Number of messages actually processed by a task"



docs/learn/documentation/versioned/container/metrics-table.html (line 247)


I actually don't see any difference between "send-calls" and 
"messages-sent". Both are technically the same. We should consider removing one 
of them. At least, deprecating one of them in the current version.


- Navina Ramesh


On Aug. 30, 2016, 7:36 a.m., Branislav Cogic wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/51516/
> ---
> 
> (Updated Aug. 30, 2016, 7:36 a.m.)
> 
> 
> Review request for samza.
> 
> 
> Bugs: SAMZA-702
> https://issues.apache.org/jira/browse/SAMZA-702
> 
> 
> Repository: samza
> 
> 
> Description
> ---
> 
> All the metrics documented in a metrics-table.
> 
> Few counters and timer removed because they are not used:
> "send-calls" counter and "chooser-update-ns" timer from SamzaContainerMetrics
> "batch-resets" counter from BootstrapingChooserMetrics
> 
> 
> Diffs
> -
> 
>   docs/_layouts/default.html 7beb734ddeaecb7a6369f7d2a5d4e0c67655269c 
>   docs/learn/documentation/versioned/container/metrics-table.html 
> PRE-CREATION 
>   docs/learn/documentation/versioned/container/metrics.md 
> b053b792097400536ea385cb3db720f6f71da017 
>   
> samza-core/src/main/scala/org/apache/samza/container/SamzaContainerMetrics.scala
>  1e7515e8e8eb5ff2f769bea3184ce49308bada9a 
>   
> samza-core/src/main/scala/org/apache/samza/system/chooser/BootstrappingChooser.scala
>  1cd8e0637e2192460a9e9fe078c735444be8eb97 
> 
> Diff: https://reviews.apache.org/r/51516/diff/
> 
> 
> Testing
> ---
> 
> Site ran locally using local-site-test.sh
> 
> 
> Thanks,
> 
> Branislav Cogic
> 
>



Re: [DISCUSS] End of Stream Feature in Samza

2016-08-31 Thread Jagadish Venkatraman
Thanks for reviewing and the comments. Please find my replies inline.








*"The offset in the partition that the message was received from. Ifthis is
the last message in the SSP, this field is set to END_OF_STREAM.Such a
message is not delivered to the actual StreamTask implementation."Does this
mean the last message is not delivered to the Task? Does the sourceprovide
that info? If that is the case, then it kind of creates contractbetween any
bounded system consumer and samza. Or did you mean to say thatwe assume
end-of-stream has been reached when there is no message returnedon poll? *


>> It means that the message is not delivered to the "*StreamTask
implementation*" ie. user code. Yes, we are creating a contract between
SystemConsumer and Samza - " the SystemConsumer implementation will
generate an IncomingMessageEnvelope by invoking buildEndOfStream()"



*" Please do clarify. I think what is missing in this document ishow to
"detect" an end-of-stream from the source."*

>>"Detecting" end of stream from the source should be simply parsing the
offset from the message that the consumer returns during poll() and
checking if it is end-of-stream. I'm happy to add this to the document.

2. Does this design preclude the possibility of consuming bounded and
unbounded stream partitions in the task ?

>> Nope, It does not. However, the container would not terminate (if you
have a source that has not reached end of stream yet)

3. During checkpoint, let's say some of the partitions have reached EOF. Do
we write a special offset in the checkpoint message that indicates that it
has reached end of stream and don't need to poll anymore?

>> No, we do not. The dance in the AsyncRunLoop state's machine / flow
control in https://reviews.apache.org/r/51346/ guarantees that
end-of-stream SSPs are not polled any more.







On Wed, Aug 31, 2016 at 2:01 PM, Navina Ramesh  wrote:

> Hi Jagadish,
> Thanks for sharing the design with the community. I have a couple of
> questions that were not very clear from the design document.
>
> 1. Under mechanism for indicating the end-of-stream to Samza, you mention
> "The offset in the partition that the message was received from. If
> this is the last message in the SSP, this field is set to END_OF_STREAM.
> Such a message is not delivered to the actual StreamTask implementation."
> Does this mean the last message is not delivered to the Task? How do you
> identify that it is indeed the last message in the SSP? Does the source
> provide that info? If that is the case, then it kind of creates contract
> between any bounded system consumer and samza. Or did you mean to say that
> we assume end-of-stream has been reached when there is no message returned
> on poll?  Please do clarify. I think what is missing in this document is
> how to "detect" an end-of-stream from the source.
>
> 2. Does this design preclude the possibility of consuming bounded and
> unbounded stream partitions in the task ?
>
> 3. During checkpoint, let's say some of the partitions have reached EOF. Do
> we write a special offset in the checkpoint message that indicates that it
> has reached end of stream and don't need to poll anymore?
>
> Thanks!
> Navina
>
> On Tue, Aug 30, 2016 at 4:50 PM, Julian Hyde  wrote:
>
> >
> > > On Aug 30, 2016, at 4:44 PM, Xinyu Liu  wrote:
> > >
> > > It's very exciting that Samza is adding support of bounded input
> streams.
> >
> > +1!
> >
> >
>
>
> --
> Navina R.
>



-- 
Jagadish V,
Graduate Student,
Department of Computer Science,
Stanford University


Re: Samza Mesos

2016-08-31 Thread Yi Pan
Hi, Sriram,

Yes, that's the correct direction to go.

Cheers!

-Yi

On Wed, Aug 31, 2016 at 12:39 PM, Sriram Ramachandrasekaran <
sri.ram...@gmail.com> wrote:

> Thanks Jagadish.
> So, in essence, I should be looking at samza-11 branch for the final API
> against which I would have to write the Mesos integration pieces?
>
> On Thu, Sep 1, 2016 at 12:56 AM, Jagadish Venkatraman <
> jagadish1...@gmail.com> wrote:
>
> > Hi Sriram,
> >
> > I had started prototyping it (purely to ensure that the Samza API makes
> > sense with Mesos). The exact API on the Samza-11 trunk is slightly
> > different, but hopefully there're similarities -
> > https://github.com/apache/samza/blob/master/samza-core/
> > src/main/java/org/apache/samza/clustermanager/
> ClusterResourceManager.java
> >
> > Find a stub implementation here: (that encapsulates a fair bit of boiler
> > plate from Mesos driver creation etc.)
> > https://github.com/vjagadish/samza-clone/commit/
> > 9e5ed9f1774dadf079ad33913ff7f20ed58bc8dc
> >
> > A version of the prototype with the Old API: here
> >  375/samza-mesos/src/main/
> > scala/org/apache/samza/job/mesos>
> >
> > Some interesting implementation  notes:
> > - Mesos did not (yet) support a preferred host request. However, that
> could
> > be implemented via dynamic reservations
> > .
> > - My discussions with the Mesos community here:
> > https://mail-archives.apache.org/mod_mbox/mesos-user/201602.mbox/%
> > 3ccamd3yjgxmhg4rtw4gbxgf9msmbv6abzbgpql6ejq5gwmot0...@mail.gmail.com%3E
> > - MESOS-4616 has more context.
> >
> > It'd be awesome you can take a stab at Mesos integration - I'm happy to
> > help out in whatever way I can.
> >
> > Thank you,
> > Jagadish
> >
> > On Wed, Aug 31, 2016 at 10:45 AM, Sriram Ramachandrasekaran <
> > sri.ram...@gmail.com> wrote:
> >
> > > Yi,
> > > That's a good amount of history to know. I will take a look at 680 and
> > then
> > > see if I can implement something as well. If there's some stuff that's
> > > already done, would be glad to re-use it too.
> > > Thanks again
> > >
> > > On Wed, Aug 31, 2016 at 10:58 PM, Yi Pan  wrote:
> > >
> > > > Hi, Sriram,
> > > >
> > > > The story behind delaying the integration of SAMZA-375 is that there
> > are
> > > > tons of repeated code in SamzaAppMaster that exist in both samza-yarn
> > and
> > > > Mesos. W/o the change we recently made in SAMZA-680, we are going to
> > copy
> > > > the SamzaAppMaster code for every distributed execution system that
> we
> > > > added support in Samza. Now, w/ the change in SAMZA-680, we have
> > inverted
> > > > the JobCoordinator and the AppMaster logic, which makes it much
> easier
> > to
> > > > have pluggable distributed cluster management system in Samza. As
> > stated
> > > in
> > > > the JIRA, all we need is now a Mesos-specific implementation of
> > > > ClusterResourceManager that can talk to Mesos for container
> > > > request/allocation.
> > > >
> > > > @Jagadish, I remember that you did some proto-type integration w/
> Mesos
> > > > based on SAMZA-680. Would you mind to share some example code for
> that?
> > > >
> > > > Thanks!
> > > >
> > > > -Yi
> > > >
> > > > On Tue, Aug 30, 2016 at 8:35 PM, Sriram Ramachandrasekaran <
> > > > sri.ram...@gmail.com> wrote:
> > > >
> > > > > Folks,
> > > > >
> > > > > We've been using Samza in Production from beginning of this year.
> > It's
> > > > been
> > > > > quite stable for our needs, although, we don't use it heavily yet.
> > One
> > > of
> > > > > the things we would like to know is, where is Samza Mesos
> integration
> > > in
> > > > > the roadmap? I know, SAMZA-375
> > > > >  is specifically
> > > > towards
> > > > > that, but, is there something stopping the community from
> integration
> > > > into
> > > > > mainline?
> > > > >
> > > > > I ask this because, we run our Samza jobs on YARN right now and we
> > use
> > > > > Mesos infra for other workloads. I really don't want to manage 2
> > infra
> > > > > components which are supposed to do exactly the same thing. We've
> > built
> > > > > enough tooling around Mesos infra, so, wouldn't want to move away
> > from
> > > it
> > > > > too.
> > > > >
> > > > > The options we're evaluating are:
> > > > > 1. Move to KStreams and get away from YARN
> > > > > 2. Explore Samza-Mesos integration so that, we can reduce
> "explicit"
> > > > > dependency on Kafka.
> > > > >
> > > > >
> > > > > Some clarity on this would really help us.
> > > > > Sriram
> > > > >
> > > > > --
> > > > > It's just about how deep your longing is!
> > > > >
> > > >
> > >
> > >
> > >
> > > --
> > > It's just about how deep your longing is!
> > >
> >
> >
> >
> > --
> > Jagadish V,
> > Graduate Student,
> > Department of Computer Science,
> > Stanford University
> >
>
>
>
> --
> It's just about how deep your longing is!
>


Re: [DISCUSS] End of Stream Feature in Samza

2016-08-31 Thread Navina Ramesh
Hi Jagadish,
Thanks for sharing the design with the community. I have a couple of
questions that were not very clear from the design document.

1. Under mechanism for indicating the end-of-stream to Samza, you mention
"The offset in the partition that the message was received from. If
this is the last message in the SSP, this field is set to END_OF_STREAM.
Such a message is not delivered to the actual StreamTask implementation."
Does this mean the last message is not delivered to the Task? How do you
identify that it is indeed the last message in the SSP? Does the source
provide that info? If that is the case, then it kind of creates contract
between any bounded system consumer and samza. Or did you mean to say that
we assume end-of-stream has been reached when there is no message returned
on poll?  Please do clarify. I think what is missing in this document is
how to "detect" an end-of-stream from the source.

2. Does this design preclude the possibility of consuming bounded and
unbounded stream partitions in the task ?

3. During checkpoint, let's say some of the partitions have reached EOF. Do
we write a special offset in the checkpoint message that indicates that it
has reached end of stream and don't need to poll anymore?

Thanks!
Navina

On Tue, Aug 30, 2016 at 4:50 PM, Julian Hyde  wrote:

>
> > On Aug 30, 2016, at 4:44 PM, Xinyu Liu  wrote:
> >
> > It's very exciting that Samza is adding support of bounded input streams.
>
> +1!
>
>


-- 
Navina R.


Re: Samza Mesos

2016-08-31 Thread Sriram Ramachandrasekaran
Thanks Jagadish.
So, in essence, I should be looking at samza-11 branch for the final API
against which I would have to write the Mesos integration pieces?

On Thu, Sep 1, 2016 at 12:56 AM, Jagadish Venkatraman <
jagadish1...@gmail.com> wrote:

> Hi Sriram,
>
> I had started prototyping it (purely to ensure that the Samza API makes
> sense with Mesos). The exact API on the Samza-11 trunk is slightly
> different, but hopefully there're similarities -
> https://github.com/apache/samza/blob/master/samza-core/
> src/main/java/org/apache/samza/clustermanager/ClusterResourceManager.java
>
> Find a stub implementation here: (that encapsulates a fair bit of boiler
> plate from Mesos driver creation etc.)
> https://github.com/vjagadish/samza-clone/commit/
> 9e5ed9f1774dadf079ad33913ff7f20ed58bc8dc
>
> A version of the prototype with the Old API: here
>  scala/org/apache/samza/job/mesos>
>
> Some interesting implementation  notes:
> - Mesos did not (yet) support a preferred host request. However, that could
> be implemented via dynamic reservations
> .
> - My discussions with the Mesos community here:
> https://mail-archives.apache.org/mod_mbox/mesos-user/201602.mbox/%
> 3ccamd3yjgxmhg4rtw4gbxgf9msmbv6abzbgpql6ejq5gwmot0...@mail.gmail.com%3E
> - MESOS-4616 has more context.
>
> It'd be awesome you can take a stab at Mesos integration - I'm happy to
> help out in whatever way I can.
>
> Thank you,
> Jagadish
>
> On Wed, Aug 31, 2016 at 10:45 AM, Sriram Ramachandrasekaran <
> sri.ram...@gmail.com> wrote:
>
> > Yi,
> > That's a good amount of history to know. I will take a look at 680 and
> then
> > see if I can implement something as well. If there's some stuff that's
> > already done, would be glad to re-use it too.
> > Thanks again
> >
> > On Wed, Aug 31, 2016 at 10:58 PM, Yi Pan  wrote:
> >
> > > Hi, Sriram,
> > >
> > > The story behind delaying the integration of SAMZA-375 is that there
> are
> > > tons of repeated code in SamzaAppMaster that exist in both samza-yarn
> and
> > > Mesos. W/o the change we recently made in SAMZA-680, we are going to
> copy
> > > the SamzaAppMaster code for every distributed execution system that we
> > > added support in Samza. Now, w/ the change in SAMZA-680, we have
> inverted
> > > the JobCoordinator and the AppMaster logic, which makes it much easier
> to
> > > have pluggable distributed cluster management system in Samza. As
> stated
> > in
> > > the JIRA, all we need is now a Mesos-specific implementation of
> > > ClusterResourceManager that can talk to Mesos for container
> > > request/allocation.
> > >
> > > @Jagadish, I remember that you did some proto-type integration w/ Mesos
> > > based on SAMZA-680. Would you mind to share some example code for that?
> > >
> > > Thanks!
> > >
> > > -Yi
> > >
> > > On Tue, Aug 30, 2016 at 8:35 PM, Sriram Ramachandrasekaran <
> > > sri.ram...@gmail.com> wrote:
> > >
> > > > Folks,
> > > >
> > > > We've been using Samza in Production from beginning of this year.
> It's
> > > been
> > > > quite stable for our needs, although, we don't use it heavily yet.
> One
> > of
> > > > the things we would like to know is, where is Samza Mesos integration
> > in
> > > > the roadmap? I know, SAMZA-375
> > > >  is specifically
> > > towards
> > > > that, but, is there something stopping the community from integration
> > > into
> > > > mainline?
> > > >
> > > > I ask this because, we run our Samza jobs on YARN right now and we
> use
> > > > Mesos infra for other workloads. I really don't want to manage 2
> infra
> > > > components which are supposed to do exactly the same thing. We've
> built
> > > > enough tooling around Mesos infra, so, wouldn't want to move away
> from
> > it
> > > > too.
> > > >
> > > > The options we're evaluating are:
> > > > 1. Move to KStreams and get away from YARN
> > > > 2. Explore Samza-Mesos integration so that, we can reduce "explicit"
> > > > dependency on Kafka.
> > > >
> > > >
> > > > Some clarity on this would really help us.
> > > > Sriram
> > > >
> > > > --
> > > > It's just about how deep your longing is!
> > > >
> > >
> >
> >
> >
> > --
> > It's just about how deep your longing is!
> >
>
>
>
> --
> Jagadish V,
> Graduate Student,
> Department of Computer Science,
> Stanford University
>



-- 
It's just about how deep your longing is!


Re: Samza Mesos

2016-08-31 Thread Jagadish Venkatraman
Hi Sriram,

I had started prototyping it (purely to ensure that the Samza API makes
sense with Mesos). The exact API on the Samza-11 trunk is slightly
different, but hopefully there're similarities -
https://github.com/apache/samza/blob/master/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterResourceManager.java

Find a stub implementation here: (that encapsulates a fair bit of boiler
plate from Mesos driver creation etc.)
https://github.com/vjagadish/samza-clone/commit/9e5ed9f1774dadf079ad33913ff7f20ed58bc8dc

A version of the prototype with the Old API: here


Some interesting implementation  notes:
- Mesos did not (yet) support a preferred host request. However, that could
be implemented via dynamic reservations
.
- My discussions with the Mesos community here:
https://mail-archives.apache.org/mod_mbox/mesos-user/201602.mbox/%3ccamd3yjgxmhg4rtw4gbxgf9msmbv6abzbgpql6ejq5gwmot0...@mail.gmail.com%3E
- MESOS-4616 has more context.

It'd be awesome you can take a stab at Mesos integration - I'm happy to
help out in whatever way I can.

Thank you,
Jagadish

On Wed, Aug 31, 2016 at 10:45 AM, Sriram Ramachandrasekaran <
sri.ram...@gmail.com> wrote:

> Yi,
> That's a good amount of history to know. I will take a look at 680 and then
> see if I can implement something as well. If there's some stuff that's
> already done, would be glad to re-use it too.
> Thanks again
>
> On Wed, Aug 31, 2016 at 10:58 PM, Yi Pan  wrote:
>
> > Hi, Sriram,
> >
> > The story behind delaying the integration of SAMZA-375 is that there are
> > tons of repeated code in SamzaAppMaster that exist in both samza-yarn and
> > Mesos. W/o the change we recently made in SAMZA-680, we are going to copy
> > the SamzaAppMaster code for every distributed execution system that we
> > added support in Samza. Now, w/ the change in SAMZA-680, we have inverted
> > the JobCoordinator and the AppMaster logic, which makes it much easier to
> > have pluggable distributed cluster management system in Samza. As stated
> in
> > the JIRA, all we need is now a Mesos-specific implementation of
> > ClusterResourceManager that can talk to Mesos for container
> > request/allocation.
> >
> > @Jagadish, I remember that you did some proto-type integration w/ Mesos
> > based on SAMZA-680. Would you mind to share some example code for that?
> >
> > Thanks!
> >
> > -Yi
> >
> > On Tue, Aug 30, 2016 at 8:35 PM, Sriram Ramachandrasekaran <
> > sri.ram...@gmail.com> wrote:
> >
> > > Folks,
> > >
> > > We've been using Samza in Production from beginning of this year. It's
> > been
> > > quite stable for our needs, although, we don't use it heavily yet. One
> of
> > > the things we would like to know is, where is Samza Mesos integration
> in
> > > the roadmap? I know, SAMZA-375
> > >  is specifically
> > towards
> > > that, but, is there something stopping the community from integration
> > into
> > > mainline?
> > >
> > > I ask this because, we run our Samza jobs on YARN right now and we use
> > > Mesos infra for other workloads. I really don't want to manage 2 infra
> > > components which are supposed to do exactly the same thing. We've built
> > > enough tooling around Mesos infra, so, wouldn't want to move away from
> it
> > > too.
> > >
> > > The options we're evaluating are:
> > > 1. Move to KStreams and get away from YARN
> > > 2. Explore Samza-Mesos integration so that, we can reduce "explicit"
> > > dependency on Kafka.
> > >
> > >
> > > Some clarity on this would really help us.
> > > Sriram
> > >
> > > --
> > > It's just about how deep your longing is!
> > >
> >
>
>
>
> --
> It's just about how deep your longing is!
>



-- 
Jagadish V,
Graduate Student,
Department of Computer Science,
Stanford University


Re: Review Request 51563: SAMZA-1010: API won't compile: CheckStyle violation in org.apache.samza.storage.StorageEngineFactory

2016-08-31 Thread Oleg Chubaryov

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/51563/
---

(Updated Aug. 31, 2016, 7:19 p.m.)


Review request for samza.


Bugs: SAMZA-1010
https://issues.apache.org/jira/browse/SAMZA-1010


Repository: samza


Description
---

SAMZA-1010: API won't compile: CheckStyle violation in 
org.apache.samza.storage.StorageEngineFactory


Diffs
-

  samza-api/src/main/java/org/apache/samza/storage/StorageEngineFactory.java 
800deeb 

Diff: https://reviews.apache.org/r/51563/diff/


Testing
---


Thanks,

Oleg Chubaryov



Re: Samza container hang on exception

2016-08-31 Thread Chinmay Soman
Sorry dont see anything in the attachment. Can you please re-attach and
re-send ?

On Wed, Aug 31, 2016 at 3:27 AM, 李斯宁  wrote:

> It seems upgrading does not solve the problem. All task hang in today's
> "rush hour".
> I attached log and jstack.
>
> The SAMZA-911 want to fix by stopping the process if failed too much
> times.  But the process is still there and hanging.
>
> On Mon, Aug 22, 2016 at 1:14 PM, 李斯宁  wrote:
>
>> Thanks so much, I'll try.
>>
>> On Mon, Aug 22, 2016 at 6:26 AM, Yi Pan  wrote:
>>
>>> Hi, Sining,
>>>
>>> This is a known bug that is fixed in 0.10.1 (SAMZA-911). Please try to
>>> upgrade to 0.10.1.
>>>
>>> Thanks!
>>>
>>> -Yi
>>>
>>> On Sun, Aug 21, 2016 at 5:55 AM, 李斯宁  wrote:
>>>
>>> > I have tried restart every kafka server.  The container did not
>>> recover.
>>> >
>>> > log have something below:
>>> >
>>> > 2016-08-21 20:08:21 [WARN ](o.a.s.s.k.KafkaSystemProducer  :66 )
>>> > Retrying send messsage due to RetriableException -
>>> > org.apache.kafka.common.errors.NotLeaderForPartitionException: This
>>> server
>>> > is not the leader for that topic-partition.. Turn on debugging to get a
>>> > full stack trace
>>> > 2016-08-21 20:08:22 [WARN ](o.a.k.c.p.i.Sender :257)
>>> Got
>>> > error produce response with correlation id 4364 on topic-partition
>>> > samzaMetrics-5, retrying (0 attempts left). Error:
>>> NOT_LEADER_FOR_PARTITION
>>> > 2016-08-21 20:08:23 [WARN ](o.a.k.c.p.i.Sender :257)
>>> Got
>>> > error produce response with correlation id 4367 on topic-partition
>>> > samzaMetrics-5, retrying (29 attempts left). Error:
>>> > NOT_LEADER_FOR_PARTITION
>>> >
>>> >
>>> > jstack shows:
>>> >
>>> > "main" #1 prio=5 os_prio=0 tid=0x7f1ba401a000 nid=0x1a621 waiting
>>> on
>>> > condition [0x7f1bab976000]
>>> > java.lang.Thread.State: TIMED_WAITING (sleeping)
>>> > at java.lang.Thread.sleep(Native Method)
>>> > at
>>> > org.apache.samza.util.ExponentialSleepStrategy$RetryLoopState.sleep(
>>> > ExponentialSleepStrategy.scala:105)
>>> > at
>>> > org.apache.samza.util.ExponentialSleepStrategy.run(
>>> > ExponentialSleepStrategy.scala:91)
>>> > at
>>> > org.apache.samza.system.kafka.KafkaSystemProducer.send(
>>> > KafkaSystemProducer.scala:91)
>>> > at org.apache.samza.system.SystemProducers.send(SystemProducers
>>> .scala:87)
>>> > at
>>> > org.apache.samza.task.TaskInstanceCollector.send(
>>> > TaskInstanceCollector.scala:61)
>>> > at toolbox.analyzer2.realtime.CommonWriter.write(CommonWriter.java:50)
>>> > at toolbox.analyzer2.realtime.InitTask.lambda$process$0(InitTas
>>> k.java:110)
>>> > at toolbox.analyzer2.realtime.InitTask$$Lambda$4/938405008.emit
>>> (Unknown
>>> > Source)
>>> > at
>>> > toolbox.analyzer2.util.core.TransToKvProcessor.process(
>>> > TransToKvProcessor.java:146)
>>> > at toolbox.analyzer2.realtime.InitTask$2.emit(InitTask.java:119)
>>> > at toolbox.analyzer2.util.core.JsonExpander.expand(JsonExpander
>>> .java:47)
>>> > at toolbox.analyzer2.realtime.InitTask.process(InitTask.java:128)
>>> > at
>>> > org.apache.samza.container.TaskInstance$$anonfun$process$
>>> > 1.apply$mcV$sp(TaskInstance.scala:150)
>>> > at
>>> > org.apache.samza.container.TaskInstanceExceptionHandler.maybeHandle(
>>> > TaskInstanceExceptionHandler.scala:54)
>>> > at org.apache.samza.container.TaskInstance.process(TaskInstance
>>> .scala:149)
>>> > at
>>> > org.apache.samza.container.RunLoop$$anonfun$process$1$$
>>> > anonfun$apply$mcVJ$sp$2.apply(RunLoop.scala:122)
>>> > at
>>> > org.apache.samza.container.RunLoop$$anonfun$process$1$$
>>> > anonfun$apply$mcVJ$sp$2.apply(RunLoop.scala:119)
>>> > at scala.collection.immutable.List.foreach(List.scala:318)
>>> > at
>>> > org.apache.samza.container.RunLoop$$anonfun$process$1.
>>> > apply$mcVJ$sp(RunLoop.scala:118)
>>> > at
>>> > org.apache.samza.util.TimerUtils$class.updateTimerAndGetDuration(
>>> > TimerUtils.scala:51)
>>> > at
>>> > org.apache.samza.container.RunLoop.updateTimerAndGetDuration(
>>> > RunLoop.scala:35)
>>> > at org.apache.samza.container.RunLoop.process(RunLoop.scala:106)
>>> > at org.apache.samza.container.RunLoop.run(RunLoop.scala:74)
>>> > at org.apache.samza.container.SamzaContainer.run(SamzaContainer
>>> .scala:553)
>>>
>>> > at
>>> > org.apache.samza.container.SamzaContainer$.safeMain(
>>> > SamzaContainer.scala:92)
>>> > at org.apache.samza.container.SamzaContainer$.main(
>>> > SamzaContainer.scala:66)
>>> > at org.apache.samza.container.SamzaContainer.main(SamzaContaine
>>> r.scala)
>>> >
>>> > May be partition leader has changed in rush hour and metrics writing
>>> method
>>> > do not recognize that and retry again and again?
>>> >
>>> > Any response is appreciated :)
>>> >
>>> > On Sun, Aug 21, 2016 at 8:00 PM, 李斯宁  wrote:
>>> >
>>> > > at the last of the container's log, prints these:
>>> > >
>>> > > 2016-08-21 19:57:01 [WARN ](o.a.s.s.k.KafkaSystemProducer  :66 )
>>> > Retrying send messsage due to RetriableException -
>>> org.apache.kafka.common.
>>> > errors.NotLeaderF

Re: Samza Mesos

2016-08-31 Thread Sriram Ramachandrasekaran
Yi,
That's a good amount of history to know. I will take a look at 680 and then
see if I can implement something as well. If there's some stuff that's
already done, would be glad to re-use it too.
Thanks again

On Wed, Aug 31, 2016 at 10:58 PM, Yi Pan  wrote:

> Hi, Sriram,
>
> The story behind delaying the integration of SAMZA-375 is that there are
> tons of repeated code in SamzaAppMaster that exist in both samza-yarn and
> Mesos. W/o the change we recently made in SAMZA-680, we are going to copy
> the SamzaAppMaster code for every distributed execution system that we
> added support in Samza. Now, w/ the change in SAMZA-680, we have inverted
> the JobCoordinator and the AppMaster logic, which makes it much easier to
> have pluggable distributed cluster management system in Samza. As stated in
> the JIRA, all we need is now a Mesos-specific implementation of
> ClusterResourceManager that can talk to Mesos for container
> request/allocation.
>
> @Jagadish, I remember that you did some proto-type integration w/ Mesos
> based on SAMZA-680. Would you mind to share some example code for that?
>
> Thanks!
>
> -Yi
>
> On Tue, Aug 30, 2016 at 8:35 PM, Sriram Ramachandrasekaran <
> sri.ram...@gmail.com> wrote:
>
> > Folks,
> >
> > We've been using Samza in Production from beginning of this year. It's
> been
> > quite stable for our needs, although, we don't use it heavily yet. One of
> > the things we would like to know is, where is Samza Mesos integration in
> > the roadmap? I know, SAMZA-375
> >  is specifically
> towards
> > that, but, is there something stopping the community from integration
> into
> > mainline?
> >
> > I ask this because, we run our Samza jobs on YARN right now and we use
> > Mesos infra for other workloads. I really don't want to manage 2 infra
> > components which are supposed to do exactly the same thing. We've built
> > enough tooling around Mesos infra, so, wouldn't want to move away from it
> > too.
> >
> > The options we're evaluating are:
> > 1. Move to KStreams and get away from YARN
> > 2. Explore Samza-Mesos integration so that, we can reduce "explicit"
> > dependency on Kafka.
> >
> >
> > Some clarity on this would really help us.
> > Sriram
> >
> > --
> > It's just about how deep your longing is!
> >
>



-- 
It's just about how deep your longing is!


Re: Samza Mesos

2016-08-31 Thread Yi Pan
Hi, Sriram,

The story behind delaying the integration of SAMZA-375 is that there are
tons of repeated code in SamzaAppMaster that exist in both samza-yarn and
Mesos. W/o the change we recently made in SAMZA-680, we are going to copy
the SamzaAppMaster code for every distributed execution system that we
added support in Samza. Now, w/ the change in SAMZA-680, we have inverted
the JobCoordinator and the AppMaster logic, which makes it much easier to
have pluggable distributed cluster management system in Samza. As stated in
the JIRA, all we need is now a Mesos-specific implementation of
ClusterResourceManager that can talk to Mesos for container
request/allocation.

@Jagadish, I remember that you did some proto-type integration w/ Mesos
based on SAMZA-680. Would you mind to share some example code for that?

Thanks!

-Yi

On Tue, Aug 30, 2016 at 8:35 PM, Sriram Ramachandrasekaran <
sri.ram...@gmail.com> wrote:

> Folks,
>
> We've been using Samza in Production from beginning of this year. It's been
> quite stable for our needs, although, we don't use it heavily yet. One of
> the things we would like to know is, where is Samza Mesos integration in
> the roadmap? I know, SAMZA-375
>  is specifically towards
> that, but, is there something stopping the community from integration into
> mainline?
>
> I ask this because, we run our Samza jobs on YARN right now and we use
> Mesos infra for other workloads. I really don't want to manage 2 infra
> components which are supposed to do exactly the same thing. We've built
> enough tooling around Mesos infra, so, wouldn't want to move away from it
> too.
>
> The options we're evaluating are:
> 1. Move to KStreams and get away from YARN
> 2. Explore Samza-Mesos integration so that, we can reduce "explicit"
> dependency on Kafka.
>
>
> Some clarity on this would really help us.
> Sriram
>
> --
> It's just about how deep your longing is!
>


Re: Samza container hang on exception

2016-08-31 Thread 李斯宁
It seems upgrading does not solve the problem. All task hang in today's
"rush hour".
I attached log and jstack.

The SAMZA-911 want to fix by stopping the process if failed too much
times.  But the process is still there and hanging.

On Mon, Aug 22, 2016 at 1:14 PM, 李斯宁  wrote:

> Thanks so much, I'll try.
>
> On Mon, Aug 22, 2016 at 6:26 AM, Yi Pan  wrote:
>
>> Hi, Sining,
>>
>> This is a known bug that is fixed in 0.10.1 (SAMZA-911). Please try to
>> upgrade to 0.10.1.
>>
>> Thanks!
>>
>> -Yi
>>
>> On Sun, Aug 21, 2016 at 5:55 AM, 李斯宁  wrote:
>>
>> > I have tried restart every kafka server.  The container did not recover.
>> >
>> > log have something below:
>> >
>> > 2016-08-21 20:08:21 [WARN ](o.a.s.s.k.KafkaSystemProducer  :66 )
>> > Retrying send messsage due to RetriableException -
>> > org.apache.kafka.common.errors.NotLeaderForPartitionException: This
>> server
>> > is not the leader for that topic-partition.. Turn on debugging to get a
>> > full stack trace
>> > 2016-08-21 20:08:22 [WARN ](o.a.k.c.p.i.Sender :257) Got
>> > error produce response with correlation id 4364 on topic-partition
>> > samzaMetrics-5, retrying (0 attempts left). Error:
>> NOT_LEADER_FOR_PARTITION
>> > 2016-08-21 20:08:23 [WARN ](o.a.k.c.p.i.Sender :257) Got
>> > error produce response with correlation id 4367 on topic-partition
>> > samzaMetrics-5, retrying (29 attempts left). Error:
>> > NOT_LEADER_FOR_PARTITION
>> >
>> >
>> > jstack shows:
>> >
>> > "main" #1 prio=5 os_prio=0 tid=0x7f1ba401a000 nid=0x1a621 waiting on
>> > condition [0x7f1bab976000]
>> > java.lang.Thread.State: TIMED_WAITING (sleeping)
>> > at java.lang.Thread.sleep(Native Method)
>> > at
>> > org.apache.samza.util.ExponentialSleepStrategy$RetryLoopState.sleep(
>> > ExponentialSleepStrategy.scala:105)
>> > at
>> > org.apache.samza.util.ExponentialSleepStrategy.run(
>> > ExponentialSleepStrategy.scala:91)
>> > at
>> > org.apache.samza.system.kafka.KafkaSystemProducer.send(
>> > KafkaSystemProducer.scala:91)
>> > at org.apache.samza.system.SystemProducers.send(SystemProducers
>> .scala:87)
>> > at
>> > org.apache.samza.task.TaskInstanceCollector.send(
>> > TaskInstanceCollector.scala:61)
>> > at toolbox.analyzer2.realtime.CommonWriter.write(CommonWriter.java:50)
>> > at toolbox.analyzer2.realtime.InitTask.lambda$process$0(InitTas
>> k.java:110)
>> > at toolbox.analyzer2.realtime.InitTask$$Lambda$4/938405008.emit(Unknown
>> > Source)
>> > at
>> > toolbox.analyzer2.util.core.TransToKvProcessor.process(
>> > TransToKvProcessor.java:146)
>> > at toolbox.analyzer2.realtime.InitTask$2.emit(InitTask.java:119)
>> > at toolbox.analyzer2.util.core.JsonExpander.expand(JsonExpander
>> .java:47)
>> > at toolbox.analyzer2.realtime.InitTask.process(InitTask.java:128)
>> > at
>> > org.apache.samza.container.TaskInstance$$anonfun$process$
>> > 1.apply$mcV$sp(TaskInstance.scala:150)
>> > at
>> > org.apache.samza.container.TaskInstanceExceptionHandler.maybeHandle(
>> > TaskInstanceExceptionHandler.scala:54)
>> > at org.apache.samza.container.TaskInstance.process(TaskInstance
>> .scala:149)
>> > at
>> > org.apache.samza.container.RunLoop$$anonfun$process$1$$
>> > anonfun$apply$mcVJ$sp$2.apply(RunLoop.scala:122)
>> > at
>> > org.apache.samza.container.RunLoop$$anonfun$process$1$$
>> > anonfun$apply$mcVJ$sp$2.apply(RunLoop.scala:119)
>> > at scala.collection.immutable.List.foreach(List.scala:318)
>> > at
>> > org.apache.samza.container.RunLoop$$anonfun$process$1.
>> > apply$mcVJ$sp(RunLoop.scala:118)
>> > at
>> > org.apache.samza.util.TimerUtils$class.updateTimerAndGetDuration(
>> > TimerUtils.scala:51)
>> > at
>> > org.apache.samza.container.RunLoop.updateTimerAndGetDuration(
>> > RunLoop.scala:35)
>> > at org.apache.samza.container.RunLoop.process(RunLoop.scala:106)
>> > at org.apache.samza.container.RunLoop.run(RunLoop.scala:74)
>> > at org.apache.samza.container.SamzaContainer.run(SamzaContainer
>> .scala:553)
>> > at
>> > org.apache.samza.container.SamzaContainer$.safeMain(
>> > SamzaContainer.scala:92)
>> > at org.apache.samza.container.SamzaContainer$.main(
>> > SamzaContainer.scala:66)
>> > at org.apache.samza.container.SamzaContainer.main(SamzaContainer.scala)
>> >
>> > May be partition leader has changed in rush hour and metrics writing
>> method
>> > do not recognize that and retry again and again?
>> >
>> > Any response is appreciated :)
>> >
>> > On Sun, Aug 21, 2016 at 8:00 PM, 李斯宁  wrote:
>> >
>> > > at the last of the container's log, prints these:
>> > >
>> > > 2016-08-21 19:57:01 [WARN ](o.a.s.s.k.KafkaSystemProducer  :66 )
>> > Retrying send messsage due to RetriableException -
>> org.apache.kafka.common.
>> > errors.NotLeaderForPartitionException: This server is not the leader
>> for
>> > that topic-partition.. Turn on debugging to get a full stack trace
>> > > 2016-08-21 19:57:11 [WARN ](o.a.s.s.k.KafkaSystemProducer  :66 )
>> > Retrying send messsage due to RetriableException -
>> org.apache.kafka.common.

Re: Review Request 51142: SAMZA-967: HDFS System Consumer

2016-08-31 Thread Navina Ramesh

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/51142/#review147414
---



Still reviewing. Will continue tomorrow. Thanks!


gradle/dependency-versions.gradle (line 39)


Why is this dependency introduced? Is it possible to get rid of this 
dependency ?



samza-hdfs/src/main/java/org/apache/samza/system/hdfs/HdfsSystemAdmin.java 
(line 64)


nit: Unused variable



samza-hdfs/src/main/java/org/apache/samza/system/hdfs/HdfsSystemConsumer.java 
(line 53)


Can you add some javadocs for this config?
Ideally, we want all configs to be wrapped in a HdfsConfig object , that 
provides appropriate config accessors and default values.



samza-hdfs/src/main/java/org/apache/samza/system/hdfs/HdfsSystemConsumer.java 
(line 58)


I understand what staging directory is because I recently read the design 
doc. Can you please add some javadoc about what this staging directory is used 
for?



samza-hdfs/src/main/java/org/apache/samza/system/hdfs/partitioner/FileSystemAdapter.java
 (line 25)


Please add some javadocs for interfaces



samza-hdfs/src/main/java/org/apache/samza/system/hdfs/reader/AvroFileHdfsReader.java
 (line 135)


Very nice and useful javadoc!



samza-hdfs/src/main/java/org/apache/samza/system/hdfs/reader/HdfsReaderFactory.java
 (line 53)


If it is not implemented, I suggest removing it and simply adding a comment 
that another potential readertype could be "PLAIN" text


- Navina Ramesh


On Aug. 29, 2016, 5:27 p.m., Hai Lu wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/51142/
> ---
> 
> (Updated Aug. 29, 2016, 5:27 p.m.)
> 
> 
> Review request for samza, Chris Pettitt, Yi Pan (Data Infrastructure), and 
> Navina Ramesh.
> 
> 
> Bugs: SAMZA-967
> https://issues.apache.org/jira/browse/SAMZA-967
> 
> 
> Repository: samza
> 
> 
> Description
> ---
> 
> Add HDFS System Consumer: 
> 
> 1. System admin, partitioner
> 2. System consumer with metrics
> 
> 
> Diffs
> -
> 
>   build.gradle 1d4eb74b1294318db8454631ddd0901596121ab2 
>   gradle/dependency-versions.gradle 47c71bfde027835682889407261d4798b629d214 
>   samza-hdfs/src/main/java/org/apache/samza/system/hdfs/HdfsSystemAdmin.java 
> PRE-CREATION 
>   
> samza-hdfs/src/main/java/org/apache/samza/system/hdfs/HdfsSystemConsumer.java 
> PRE-CREATION 
>   
> samza-hdfs/src/main/java/org/apache/samza/system/hdfs/PartitionDescriptionUtil.java
>  PRE-CREATION 
>   
> samza-hdfs/src/main/java/org/apache/samza/system/hdfs/partitioner/DirectoryPartitioner.java
>  PRE-CREATION 
>   
> samza-hdfs/src/main/java/org/apache/samza/system/hdfs/partitioner/FileSystemAdapter.java
>  PRE-CREATION 
>   
> samza-hdfs/src/main/java/org/apache/samza/system/hdfs/partitioner/HdfsFileSystemAdapter.java
>  PRE-CREATION 
>   
> samza-hdfs/src/main/java/org/apache/samza/system/hdfs/reader/AvroFileHdfsReader.java
>  PRE-CREATION 
>   
> samza-hdfs/src/main/java/org/apache/samza/system/hdfs/reader/HdfsReaderFactory.java
>  PRE-CREATION 
>   
> samza-hdfs/src/main/java/org/apache/samza/system/hdfs/reader/MultiFileHdfsReader.java
>  PRE-CREATION 
>   
> samza-hdfs/src/main/java/org/apache/samza/system/hdfs/reader/SingleFileHdfsReader.java
>  PRE-CREATION 
>   
> samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsSystemAdmin.scala 
> 92eb4472533db67dca01f075cb460581b4bdac0d 
>   
> samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsSystemFactory.scala
>  ef3c20a097ddf2feecaf8b0ad4587ea4bf6570b7 
>   
> samza-hdfs/src/test/java/org/apache/samza/system/hdfs/TestPartitionDesctiptionUtil.java
>  PRE-CREATION 
>   
> samza-hdfs/src/test/java/org/apache/samza/system/hdfs/partitioner/TestDirectoryPartitioner.java
>  PRE-CREATION 
>   
> samza-hdfs/src/test/java/org/apache/samza/system/hdfs/partitioner/TestHdfsFileSystemAdapter.java
>  PRE-CREATION 
>   
> samza-hdfs/src/test/java/org/apache/samza/system/hdfs/reader/TestAvroFileHdfsReader.java
>  PRE-CREATION 
>   samza-hdfs/src/test/resources/partitioner/testfile01 PRE-CREATION 
>   samza-hdfs/src/test/resources/partitioner/testfile02 PRE-CREATION 
>   
> samza-hdfs/src/test/scala/org/apache/samza/system/hdfs/TestHdfsSystemProducerTestSuite.scala
>  261310d03de204718621f601117f016da14841df 
>   samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJobFactory.scala 
> 4e328a5f8c2b496a71e36c106339b7af263c96c7