Review Request 48811: SAMZA-968 - SequenceFileHdfsFileWriter does not close file properly

2016-06-16 Thread Benjamin Smith

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/48811/
---

Review request for samza.


Repository: samza


Description
---

Create writer with appropriate API to ensure stream is closed with writer, and 
not just flushed


Diffs
-

  
samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/writer/SequenceFileHdfsWriter.scala
 46ade333af6acf1f2646924854bc2290ff56b5cb 

Diff: https://reviews.apache.org/r/48811/diff/


Testing
---

Unit tests pass successfully.
Performing manual test in our code base to ensure that files close properly (we 
currently see them remain open and several hour delays in propogating writes to 
HDFS)

Open to suggestions for how to write a unit test for this use case.


Thanks,

Benjamin Smith



Re: Bug in SequenceFileHdfsFileWriter

2016-06-16 Thread Jagadish Venkatraman
Hi Benjamin,

SAMZA-968  is already
assigned to you.

Thanks,
Jagadish

On Thu, Jun 16, 2016 at 10:51 AM, Benjamin Smith <
ben.sm...@ranksoftwareinc.com> wrote:

> Sure, looks like a straightforward enough change.
>
>
> I've created: https://issues.apache.org/jira/browse/SAMZA-968
>
>
> I don't see anyway to assign it to myself though?
>
> 
> From: Yi Pan 
> Sent: Thursday, June 16, 2016 1:02:59 PM
> To: dev@samza.apache.org
> Subject: Re: Bug in SequenceFileHdfsFileWriter
>
> Hi, Benjamin,
>
> Thanks a lot for reporting this! It makes sense from reading the posts.
> Could you open a JIRA? Are you interested in assigning to yourself and
> contribute the fix?
>
> Thanks a lot again!
>
> -Yi
>
> On Thu, Jun 16, 2016 at 9:52 AM, Benjamin Smith <
> ben.sm...@ranksoftwareinc.com> wrote:
>
> >
> > Hello,
> >
> > I am working on a project where we are integrating Samza and Hive. As
> part
> > of this project, we ran into an issue where sequence files written from
> > Samza were taking a long time (hours) to completely sync with HDFS.
> >
> > After some Googling and digging into the code, it appears that the issue
> > is here:
> >
> >
> https://github.com/apache/samza/blob/master/samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/writer/SequenceFileHdfsWriter.scala#L111
> >
> > Writer.stream(dfs.create(path)) implies that the caller of
> > dfs.create(path) is responsible for closing the created stream
> explicitly.
> > This doesn't happen, and the SequenceFileHdfsWriter call to close will
> only
> > flush the stream.
> >
> > I believe the correct line should be:
> >
> > Writer.file(path)
> >
> > Or, SequenceFileHdfsWriter should explicitly track and close the stream.
> >
> > Thanks!
> >
> > Ben
> >
> > Refernece material:
> >
> >
> http://stackoverflow.com/questions/27916872/why-the-sequencefile-is-truncated
> >
> >
> https://apache.googlesource.com/hadoop-common/+/HADOOP-6685/src/java/org/apache/hadoop/io/SequenceFile.java#1238
> >
> >
>



-- 
Jagadish V,
Graduate Student,
Department of Computer Science,
Stanford University


Review Request 48808: Rebase samza-41 with master

2016-06-16 Thread Jagadish Venkatraman

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/48808/
---

Review request for samza.


Repository: samza


Description
---

Rebase samza-41 with master


Diffs
-

  docs/learn/documentation/versioned/jobs/configuration-table.html 
c1c822e03ce960c69b07eb44409a9af018823ef6 
  
samza-api/src/main/java/org/apache/samza/system/SystemStreamPartitionMatcher.java
 PRE-CREATION 
  samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala 
23a51b2175506d9e0dbbf7675d3b8061f2aacbe3 
  samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala 
e9a51083aff4dc316e94144f6242fe702ca73a68 
  
samza-core/src/main/scala/org/apache/samza/system/RangeSystemStreamPartitionMatcher.scala
 PRE-CREATION 
  
samza-core/src/main/scala/org/apache/samza/system/RegexSystemStreamPartitionMatcher.scala
 PRE-CREATION 
  
samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala 
55a879be9e6792a60ff685fbdc5411c197243ed0 
  
samza-core/src/test/scala/org/apache/samza/system/TestRangeSystemStreamPartitionMatcher.scala
 PRE-CREATION 
  
samza-core/src/test/scala/org/apache/samza/system/TestRegexSystemStreamPartitionMatcher.scala
 PRE-CREATION 

Diff: https://reviews.apache.org/r/48808/diff/


Testing
---


Thanks,

Jagadish Venkatraman



Re: Bug in SequenceFileHdfsFileWriter

2016-06-16 Thread Benjamin Smith
Sure, looks like a straightforward enough change.


I've created: https://issues.apache.org/jira/browse/SAMZA-968


I don't see anyway to assign it to myself though?


From: Yi Pan 
Sent: Thursday, June 16, 2016 1:02:59 PM
To: dev@samza.apache.org
Subject: Re: Bug in SequenceFileHdfsFileWriter

Hi, Benjamin,

Thanks a lot for reporting this! It makes sense from reading the posts.
Could you open a JIRA? Are you interested in assigning to yourself and
contribute the fix?

Thanks a lot again!

-Yi

On Thu, Jun 16, 2016 at 9:52 AM, Benjamin Smith <
ben.sm...@ranksoftwareinc.com> wrote:

>
> Hello,
>
> I am working on a project where we are integrating Samza and Hive. As part
> of this project, we ran into an issue where sequence files written from
> Samza were taking a long time (hours) to completely sync with HDFS.
>
> After some Googling and digging into the code, it appears that the issue
> is here:
>
> https://github.com/apache/samza/blob/master/samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/writer/SequenceFileHdfsWriter.scala#L111
>
> Writer.stream(dfs.create(path)) implies that the caller of
> dfs.create(path) is responsible for closing the created stream explicitly.
> This doesn't happen, and the SequenceFileHdfsWriter call to close will only
> flush the stream.
>
> I believe the correct line should be:
>
> Writer.file(path)
>
> Or, SequenceFileHdfsWriter should explicitly track and close the stream.
>
> Thanks!
>
> Ben
>
> Refernece material:
>
> http://stackoverflow.com/questions/27916872/why-the-sequencefile-is-truncated
>
> https://apache.googlesource.com/hadoop-common/+/HADOOP-6685/src/java/org/apache/hadoop/io/SequenceFile.java#1238
>
>


Re: Bug in SequenceFileHdfsFileWriter

2016-06-16 Thread Yi Pan
Hi, Benjamin,

Thanks a lot for reporting this! It makes sense from reading the posts.
Could you open a JIRA? Are you interested in assigning to yourself and
contribute the fix?

Thanks a lot again!

-Yi

On Thu, Jun 16, 2016 at 9:52 AM, Benjamin Smith <
ben.sm...@ranksoftwareinc.com> wrote:

>
> Hello,
>
> I am working on a project where we are integrating Samza and Hive. As part
> of this project, we ran into an issue where sequence files written from
> Samza were taking a long time (hours) to completely sync with HDFS.
>
> After some Googling and digging into the code, it appears that the issue
> is here:
>
> https://github.com/apache/samza/blob/master/samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/writer/SequenceFileHdfsWriter.scala#L111
>
> Writer.stream(dfs.create(path)) implies that the caller of
> dfs.create(path) is responsible for closing the created stream explicitly.
> This doesn't happen, and the SequenceFileHdfsWriter call to close will only
> flush the stream.
>
> I believe the correct line should be:
>
> Writer.file(path)
>
> Or, SequenceFileHdfsWriter should explicitly track and close the stream.
>
> Thanks!
>
> Ben
>
> Refernece material:
>
> http://stackoverflow.com/questions/27916872/why-the-sequencefile-is-truncated
>
> https://apache.googlesource.com/hadoop-common/+/HADOOP-6685/src/java/org/apache/hadoop/io/SequenceFile.java#1238
>
>


Re: Manually Commit Offsets?

2016-06-16 Thread Yi Pan
Hi, Jeremiah,

You can wait in the StreamTask.process(), which essentially will block the
whole container and no more Kafka messages will be delivered to the
StreamTask.

-Yi

On Wed, Jun 15, 2016 at 12:13 PM, Jeremiah Adams 
wrote:

> Thank you for the info.
>
> Is there any way to 'pause' the job or stop processing kafka from inside a
> StreamTask.process() method? That would work for me too.
>
>
> Jeremiah Adams
> Software Engineer
> www.helixeducation.com
> Blog | Twitter | Facebook | LinkedIn
>
> 
> From: Yi Pan 
> Sent: Wednesday, June 15, 2016 1:02 PM
> To: dev@samza.apache.org
> Subject: Re: Manually Commit Offsets?
>
> Hi, Jeremiah,
>
> CheckpointManager is an interface that would allow you to implement where
> to read/write your checkpoint offsets for each task. The exposed user API
> to checkpoint in Samza is through TaskCoordinator.commit(). Unfortunately,
> it does not allow you to commit at the granularity of a certain offset on a
> specific partition. The design choice we made here is that when user calls
> commit(), that means the user have processed all messages up to the current
> one successfully. The recommended pattern is that you will wait for all the
> current pending messages are processed, then call commit().
>
> Please let me know if you have further doubts.
>
> -Yi
>
> On Tue, Jun 14, 2016 at 2:13 PM, Jeremiah Adams  >
> wrote:
>
> > Thanks,
> >
> > I see no commit method in TaskContext. Unless I am missing something it
> is
> > TaskCoordinator. TaskCoordinator.commit() also does not look to give me
> the
> > ability to set the value of the checkpoint, just checkpoint after
> > unwrapping the incoming message. I need to treat the messages as if they
> > were not handled at all when the remote system is unavailable.
> >
> >
> > I have been looking at the CheckpointManager to do this but cannot see
> how
> > to wire it into my StreamTask.
> >
> >
> > Jeremiah Adams
> > Software Engineer
> > www.helixeducation.com
> > Blog | Twitter | Facebook | LinkedIn
> >
> > 
> > From: Yi Pan 
> > Sent: Tuesday, June 14, 2016 2:28 PM
> > To: dev@samza.apache.org
> > Subject: Re: Manually Commit Offsets?
> >
> > Sorry. Correction:
> >
> >
> > > 2) in your code, call TaskContext.commit() whenever you are ready to
> > > checkpoint.
> > >
> > >
> > *TaskCoordinator.commit()*
> >
> >
> >
> > >
> > > On Tue, Jun 14, 2016 at 10:16 AM, Jeremiah Adams <
> > > jad...@helixeducation.com> wrote:
> > >
> > >> We need to send messages to a remote service. I need to implement a
> > >> circuit breaker to address the scenario in which the remote system is
> > >> unavailable. I need to change the current offset to reprocess the
> > current
> > >> offset while the remote system is down. These concerns are similar to
> > those
> > >> outlined here: https://issues.apache.org/jira/browse/SAMZA-794?<
> > >> https://issues.apache.org/jira/browse/SAMZA-794>
> > >>
> > >> It looks like Samza's Checkpointing mechanism replaces kafka's
> > >> auto-commit feature and there is no API for manually manipulating the
> > >> Checkpointing?
> > >>
> > >> Can someone point me in the right direction?
> > >>
> > >> Thanks in advance.
> > >>
> > >>
> > >>
> > >> Jeremiah Adams
> > >> Software Engineer
> > >> www.helixeducation.com
> > >> Blog | Twitter<
> > >> https://twitter.com/HelixEducation> | Facebook<
> > >> https://www.facebook.com/HelixEducation> | LinkedIn<
> > >> http://www.linkedin.com/company/3609946>
> > >>
> > >
> > >
> >
>


Bug in SequenceFileHdfsFileWriter

2016-06-16 Thread Benjamin Smith

Hello,

I am working on a project where we are integrating Samza and Hive. As part of 
this project, we ran into an issue where sequence files written from Samza were 
taking a long time (hours) to completely sync with HDFS.

After some Googling and digging into the code, it appears that the issue is 
here:
https://github.com/apache/samza/blob/master/samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/writer/SequenceFileHdfsWriter.scala#L111

Writer.stream(dfs.create(path)) implies that the caller of dfs.create(path) is 
responsible for closing the created stream explicitly. This doesn't happen, and 
the SequenceFileHdfsWriter call to close will only flush the stream.

I believe the correct line should be:

Writer.file(path)

Or, SequenceFileHdfsWriter should explicitly track and close the stream.

Thanks!

Ben

Refernece material:
http://stackoverflow.com/questions/27916872/why-the-sequencefile-is-truncated
https://apache.googlesource.com/hadoop-common/+/HADOOP-6685/src/java/org/apache/hadoop/io/SequenceFile.java#1238



Re: Review Request 48243: SAMZA-961: Async tasks and multithreading model

2016-06-16 Thread Chris Pettitt

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/48243/#review138013
---


Fix it, then Ship it!




Contingent on perf tests, of course.


samza-core/src/main/java/org/apache/samza/task/TaskCallbackManager.java (line 
88)


I would probably make this an AtomicLong as it is the only thing in the way 
of this being thread-safe and it is not obvious whether seqNum is used in a 
multi-threaded context.



samza-core/src/main/java/org/apache/samza/task/TaskCallbackManager.java (line 
94)


final


- Chris Pettitt


On June 15, 2016, 11:41 p.m., Xinyu Liu wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/48243/
> ---
> 
> (Updated June 15, 2016, 11:41 p.m.)
> 
> 
> Review request for samza, Chris Pettitt, Navina Ramesh, and Yi Pan (Data 
> Infrastructure).
> 
> 
> Repository: samza
> 
> 
> Description
> ---
> 
> This is the main part of the change, including the following:
> 
> - New API for AsyncStreamTask and callback.
> - Multithread scheduling in AsyncRunLoop
> - Callback management for asyn tasks
> 
> 
> Diffs
> -
> 
>   checkstyle/import-control.xml 7a09c7ed8ab372d2342f31e850ae09c605292eb2 
>   samza-api/src/main/java/org/apache/samza/task/AsyncStreamTask.java 
> PRE-CREATION 
>   samza-api/src/main/java/org/apache/samza/task/TaskCallback.java 
> PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/container/RunLoopFactory.java 
> PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java 
> PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/task/AsyncStreamTaskAdapter.java 
> PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/task/CoordinatorRequests.java 
> PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/task/TaskCallbackFactory.java 
> PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/task/TaskCallbackImpl.java 
> PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/task/TaskCallbackListener.java 
> PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/task/TaskCallbackManager.java 
> PRE-CREATION 
>   
> samza-core/src/main/java/org/apache/samza/task/TaskCallbackTimeoutException.java
>  PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/util/Utils.java PRE-CREATION 
>   samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala 
> 00648e49f8c7a9bbf5634e18ba0f95feb244613e 
>   samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala 
> 08a4debb06f9925ae741049abb2ee0df97b2243b 
>   samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala 
> 3f25eca6e3dffc57360e8bd8c435177c2a9a910a 
>   samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 
> cf3c4c0ab08a59760bc899c6f2027755e933b350 
>   
> samza-core/src/main/scala/org/apache/samza/container/SamzaContainerMetrics.scala
>  9e6641c3628290dc05e1eb5537e86bff9d37f92c 
>   samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala 
> d32a92976e43ca24033b48c91851ee706de7de6b 
>   
> samza-core/src/main/scala/org/apache/samza/container/TaskInstanceMetrics.scala
>  8b863887cf584d2d7a9b18181c7b0cd1e9dfec00 
>   samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala 
> 2efe836fc3b622cbe89e2042a37407f3cf732f58 
>   samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java 
> PRE-CREATION 
>   samza-core/src/test/java/org/apache/samza/task/TestAsyncStreamAdapter.java 
> PRE-CREATION 
>   samza-core/src/test/java/org/apache/samza/task/TestCoordinatorRequests.java 
> PRE-CREATION 
>   samza-core/src/test/java/org/apache/samza/task/TestTaskCallbackImpl.java 
> PRE-CREATION 
>   samza-core/src/test/java/org/apache/samza/task/TestTaskCallbackManager.java 
> PRE-CREATION 
>   samza-core/src/test/scala/org/apache/samza/container/TestRunLoop.scala 
> e280daa9626757cb4d17c0c03eed923277230c3e 
>   
> samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala 
> 1358fdd8a386f5f81128ef871c72833d8cf11d86 
>   samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala 
> 5457f0e05ae4d615b9c86f48a662c54b13828e78 
>   samza-core/src/test/scala/org/apache/samza/system/TestSystemConsumers.scala 
> 09da62e0f9a10f7c3683345a309c6278ff01fb4b 
> 
> Diff: https://reviews.apache.org/r/48243/diff/
> 
> 
> Testing
> ---
> 
> unit tests and local testing.
> 
> 
> Thanks,
> 
> Xinyu Liu
> 
>



Re: Review Request 48243: SAMZA-961: Async tasks and multithreading model

2016-06-16 Thread Chris Pettitt


> On June 15, 2016, 3:08 p.m., Chris Pettitt wrote:
> > samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java, line 188
> > 
> >
> > Do we need to handle the case that the SSP is not in the mappings or is 
> > that impossible?
> 
> Xinyu Liu wrote:
> For current version of Samza the SSPs is assigned to a task in the job 
> start up time so it's won't change during the life cycle of run loop. If this 
> is going to change in the future, we need to revisit the logic here.
> 
> Chris Pettitt wrote:
> I'd suggest being defensive since it is low cost. For this you would just 
> need to do a null check on the worker before using it.

Since you're constructing the mapping at construction and not allowing it to be 
modified it would be sufficient to just throw in assert here for now.


- Chris


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/48243/#review137543
---


On June 15, 2016, 11:41 p.m., Xinyu Liu wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/48243/
> ---
> 
> (Updated June 15, 2016, 11:41 p.m.)
> 
> 
> Review request for samza, Chris Pettitt, Navina Ramesh, and Yi Pan (Data 
> Infrastructure).
> 
> 
> Repository: samza
> 
> 
> Description
> ---
> 
> This is the main part of the change, including the following:
> 
> - New API for AsyncStreamTask and callback.
> - Multithread scheduling in AsyncRunLoop
> - Callback management for asyn tasks
> 
> 
> Diffs
> -
> 
>   checkstyle/import-control.xml 7a09c7ed8ab372d2342f31e850ae09c605292eb2 
>   samza-api/src/main/java/org/apache/samza/task/AsyncStreamTask.java 
> PRE-CREATION 
>   samza-api/src/main/java/org/apache/samza/task/TaskCallback.java 
> PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/container/RunLoopFactory.java 
> PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java 
> PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/task/AsyncStreamTaskAdapter.java 
> PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/task/CoordinatorRequests.java 
> PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/task/TaskCallbackFactory.java 
> PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/task/TaskCallbackImpl.java 
> PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/task/TaskCallbackListener.java 
> PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/task/TaskCallbackManager.java 
> PRE-CREATION 
>   
> samza-core/src/main/java/org/apache/samza/task/TaskCallbackTimeoutException.java
>  PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/util/Utils.java PRE-CREATION 
>   samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala 
> 00648e49f8c7a9bbf5634e18ba0f95feb244613e 
>   samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala 
> 08a4debb06f9925ae741049abb2ee0df97b2243b 
>   samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala 
> 3f25eca6e3dffc57360e8bd8c435177c2a9a910a 
>   samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 
> cf3c4c0ab08a59760bc899c6f2027755e933b350 
>   
> samza-core/src/main/scala/org/apache/samza/container/SamzaContainerMetrics.scala
>  9e6641c3628290dc05e1eb5537e86bff9d37f92c 
>   samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala 
> d32a92976e43ca24033b48c91851ee706de7de6b 
>   
> samza-core/src/main/scala/org/apache/samza/container/TaskInstanceMetrics.scala
>  8b863887cf584d2d7a9b18181c7b0cd1e9dfec00 
>   samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala 
> 2efe836fc3b622cbe89e2042a37407f3cf732f58 
>   samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java 
> PRE-CREATION 
>   samza-core/src/test/java/org/apache/samza/task/TestAsyncStreamAdapter.java 
> PRE-CREATION 
>   samza-core/src/test/java/org/apache/samza/task/TestCoordinatorRequests.java 
> PRE-CREATION 
>   samza-core/src/test/java/org/apache/samza/task/TestTaskCallbackImpl.java 
> PRE-CREATION 
>   samza-core/src/test/java/org/apache/samza/task/TestTaskCallbackManager.java 
> PRE-CREATION 
>   samza-core/src/test/scala/org/apache/samza/container/TestRunLoop.scala 
> e280daa9626757cb4d17c0c03eed923277230c3e 
>   
> samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala 
> 1358fdd8a386f5f81128ef871c72833d8cf11d86 
>   samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala 
> 5457f0e05ae4d615b9c86f48a662c54b13828e78 
>   samza-core/src/test/scala/org/apache/samza/system/TestSystemConsumers.scala 
> 09da62e0f9a10f7c3683345a309c6278ff01fb4b 
> 
> Diff: https://reviews.apache.org/r/48243/d

Re: Review Request 48243: SAMZA-961: Async tasks and multithreading model

2016-06-16 Thread Chris Pettitt


> On June 15, 2016, 3:08 p.m., Chris Pettitt wrote:
> > samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java, line 188
> > 
> >
> > Do we need to handle the case that the SSP is not in the mappings or is 
> > that impossible?
> 
> Xinyu Liu wrote:
> For current version of Samza the SSPs is assigned to a task in the job 
> start up time so it's won't change during the life cycle of run loop. If this 
> is going to change in the future, we need to revisit the logic here.

I'd suggest being defensive since it is low cost. For this you would just need 
to do a null check on the worker before using it.


> On June 15, 2016, 3:08 p.m., Chris Pettitt wrote:
> > samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java, line 403
> > 
> >
> > Same comment as above re. run loop termination. You potentially could 
> > even have a abortRunLoop(Throwable) function that would make this super 
> > clear.
> 
> Xinyu Liu wrote:
> Added the AsyncRunLoop.abort(Throwable).

Cool, looks good!


- Chris


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/48243/#review137543
---


On June 15, 2016, 11:41 p.m., Xinyu Liu wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/48243/
> ---
> 
> (Updated June 15, 2016, 11:41 p.m.)
> 
> 
> Review request for samza, Chris Pettitt, Navina Ramesh, and Yi Pan (Data 
> Infrastructure).
> 
> 
> Repository: samza
> 
> 
> Description
> ---
> 
> This is the main part of the change, including the following:
> 
> - New API for AsyncStreamTask and callback.
> - Multithread scheduling in AsyncRunLoop
> - Callback management for asyn tasks
> 
> 
> Diffs
> -
> 
>   checkstyle/import-control.xml 7a09c7ed8ab372d2342f31e850ae09c605292eb2 
>   samza-api/src/main/java/org/apache/samza/task/AsyncStreamTask.java 
> PRE-CREATION 
>   samza-api/src/main/java/org/apache/samza/task/TaskCallback.java 
> PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/container/RunLoopFactory.java 
> PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java 
> PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/task/AsyncStreamTaskAdapter.java 
> PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/task/CoordinatorRequests.java 
> PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/task/TaskCallbackFactory.java 
> PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/task/TaskCallbackImpl.java 
> PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/task/TaskCallbackListener.java 
> PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/task/TaskCallbackManager.java 
> PRE-CREATION 
>   
> samza-core/src/main/java/org/apache/samza/task/TaskCallbackTimeoutException.java
>  PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/util/Utils.java PRE-CREATION 
>   samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala 
> 00648e49f8c7a9bbf5634e18ba0f95feb244613e 
>   samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala 
> 08a4debb06f9925ae741049abb2ee0df97b2243b 
>   samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala 
> 3f25eca6e3dffc57360e8bd8c435177c2a9a910a 
>   samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 
> cf3c4c0ab08a59760bc899c6f2027755e933b350 
>   
> samza-core/src/main/scala/org/apache/samza/container/SamzaContainerMetrics.scala
>  9e6641c3628290dc05e1eb5537e86bff9d37f92c 
>   samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala 
> d32a92976e43ca24033b48c91851ee706de7de6b 
>   
> samza-core/src/main/scala/org/apache/samza/container/TaskInstanceMetrics.scala
>  8b863887cf584d2d7a9b18181c7b0cd1e9dfec00 
>   samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala 
> 2efe836fc3b622cbe89e2042a37407f3cf732f58 
>   samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java 
> PRE-CREATION 
>   samza-core/src/test/java/org/apache/samza/task/TestAsyncStreamAdapter.java 
> PRE-CREATION 
>   samza-core/src/test/java/org/apache/samza/task/TestCoordinatorRequests.java 
> PRE-CREATION 
>   samza-core/src/test/java/org/apache/samza/task/TestTaskCallbackImpl.java 
> PRE-CREATION 
>   samza-core/src/test/java/org/apache/samza/task/TestTaskCallbackManager.java 
> PRE-CREATION 
>   samza-core/src/test/scala/org/apache/samza/container/TestRunLoop.scala 
> e280daa9626757cb4d17c0c03eed923277230c3e 
>   
> samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala 
> 1358fdd8a386f5f81128ef871c72833d8cf11d86 
>   samza