Re: Review Request 48243: SAMZA-961: Async tasks and multithreading model

2016-07-11 Thread Xinyu Liu

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/48243/
---

(Updated July 11, 2016, 5:30 p.m.)


Review request for samza, Chris Pettitt, Navina Ramesh, and Yi Pan (Data 
Infrastructure).


Changes
---

Change to the new config names: 
- job.container.single.thread.mode
- job.container.thread.pool.size
- task.max.concurrency


Repository: samza


Description
---

This is the main part of the change, including the following:

- New API for AsyncStreamTask and callback.
- Multithread scheduling in AsyncRunLoop
- Callback management for asyn tasks


Diffs (updated)
-

  checkstyle/import-control.xml 7a09c7ed8ab372d2342f31e850ae09c605292eb2 
  samza-api/src/main/java/org/apache/samza/task/AsyncStreamTask.java 
PRE-CREATION 
  samza-api/src/main/java/org/apache/samza/task/TaskCallback.java PRE-CREATION 
  samza-core/src/main/java/org/apache/samza/container/RunLoopFactory.java 
PRE-CREATION 
  samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java PRE-CREATION 
  samza-core/src/main/java/org/apache/samza/task/AsyncStreamTaskAdapter.java 
PRE-CREATION 
  samza-core/src/main/java/org/apache/samza/task/CoordinatorRequests.java 
PRE-CREATION 
  samza-core/src/main/java/org/apache/samza/task/TaskCallbackFactory.java 
PRE-CREATION 
  samza-core/src/main/java/org/apache/samza/task/TaskCallbackImpl.java 
PRE-CREATION 
  samza-core/src/main/java/org/apache/samza/task/TaskCallbackListener.java 
PRE-CREATION 
  samza-core/src/main/java/org/apache/samza/task/TaskCallbackManager.java 
PRE-CREATION 
  
samza-core/src/main/java/org/apache/samza/task/TaskCallbackTimeoutException.java
 PRE-CREATION 
  samza-core/src/main/java/org/apache/samza/util/Utils.java PRE-CREATION 
  samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala 
00648e49f8c7a9bbf5634e18ba0f95feb244613e 
  samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala 
4d5ca4d3d2dd1542c5d073dfe6c13666ef5f51fc 
  samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala 
08a4debb06f9925ae741049abb2ee0df97b2243b 
  samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala 
3f25eca6e3dffc57360e8bd8c435177c2a9a910a 
  samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 
cf3c4c0ab08a59760bc899c6f2027755e933b350 
  
samza-core/src/main/scala/org/apache/samza/container/SamzaContainerMetrics.scala
 9e6641c3628290dc05e1eb5537e86bff9d37f92c 
  samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala 
d32a92976e43ca24033b48c91851ee706de7de6b 
  
samza-core/src/main/scala/org/apache/samza/container/TaskInstanceMetrics.scala 
8b863887cf584d2d7a9b18181c7b0cd1e9dfec00 
  samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala 
2efe836fc3b622cbe89e2042a37407f3cf732f58 
  samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java 
PRE-CREATION 
  samza-core/src/test/java/org/apache/samza/task/TestAsyncStreamAdapter.java 
PRE-CREATION 
  samza-core/src/test/java/org/apache/samza/task/TestCoordinatorRequests.java 
PRE-CREATION 
  samza-core/src/test/java/org/apache/samza/task/TestTaskCallbackImpl.java 
PRE-CREATION 
  samza-core/src/test/java/org/apache/samza/task/TestTaskCallbackManager.java 
PRE-CREATION 
  samza-core/src/test/scala/org/apache/samza/container/TestRunLoop.scala 
e280daa9626757cb4d17c0c03eed923277230c3e 
  samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala 
1358fdd8a386f5f81128ef871c72833d8cf11d86 
  samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala 
5457f0e05ae4d615b9c86f48a662c54b13828e78 
  samza-core/src/test/scala/org/apache/samza/system/TestSystemConsumers.scala 
09da62e0f9a10f7c3683345a309c6278ff01fb4b 

Diff: https://reviews.apache.org/r/48243/diff/


Testing
---

unit tests and local testing.


Thanks,

Xinyu Liu



Re: Review Request 48243: SAMZA-961: Async tasks and multithreading model

2016-06-16 Thread Chris Pettitt

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/48243/#review138013
---


Fix it, then Ship it!




Contingent on perf tests, of course.


samza-core/src/main/java/org/apache/samza/task/TaskCallbackManager.java (line 
88)


I would probably make this an AtomicLong as it is the only thing in the way 
of this being thread-safe and it is not obvious whether seqNum is used in a 
multi-threaded context.



samza-core/src/main/java/org/apache/samza/task/TaskCallbackManager.java (line 
94)


final


- Chris Pettitt


On June 15, 2016, 11:41 p.m., Xinyu Liu wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/48243/
> ---
> 
> (Updated June 15, 2016, 11:41 p.m.)
> 
> 
> Review request for samza, Chris Pettitt, Navina Ramesh, and Yi Pan (Data 
> Infrastructure).
> 
> 
> Repository: samza
> 
> 
> Description
> ---
> 
> This is the main part of the change, including the following:
> 
> - New API for AsyncStreamTask and callback.
> - Multithread scheduling in AsyncRunLoop
> - Callback management for asyn tasks
> 
> 
> Diffs
> -
> 
>   checkstyle/import-control.xml 7a09c7ed8ab372d2342f31e850ae09c605292eb2 
>   samza-api/src/main/java/org/apache/samza/task/AsyncStreamTask.java 
> PRE-CREATION 
>   samza-api/src/main/java/org/apache/samza/task/TaskCallback.java 
> PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/container/RunLoopFactory.java 
> PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java 
> PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/task/AsyncStreamTaskAdapter.java 
> PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/task/CoordinatorRequests.java 
> PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/task/TaskCallbackFactory.java 
> PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/task/TaskCallbackImpl.java 
> PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/task/TaskCallbackListener.java 
> PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/task/TaskCallbackManager.java 
> PRE-CREATION 
>   
> samza-core/src/main/java/org/apache/samza/task/TaskCallbackTimeoutException.java
>  PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/util/Utils.java PRE-CREATION 
>   samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala 
> 00648e49f8c7a9bbf5634e18ba0f95feb244613e 
>   samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala 
> 08a4debb06f9925ae741049abb2ee0df97b2243b 
>   samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala 
> 3f25eca6e3dffc57360e8bd8c435177c2a9a910a 
>   samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 
> cf3c4c0ab08a59760bc899c6f2027755e933b350 
>   
> samza-core/src/main/scala/org/apache/samza/container/SamzaContainerMetrics.scala
>  9e6641c3628290dc05e1eb5537e86bff9d37f92c 
>   samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala 
> d32a92976e43ca24033b48c91851ee706de7de6b 
>   
> samza-core/src/main/scala/org/apache/samza/container/TaskInstanceMetrics.scala
>  8b863887cf584d2d7a9b18181c7b0cd1e9dfec00 
>   samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala 
> 2efe836fc3b622cbe89e2042a37407f3cf732f58 
>   samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java 
> PRE-CREATION 
>   samza-core/src/test/java/org/apache/samza/task/TestAsyncStreamAdapter.java 
> PRE-CREATION 
>   samza-core/src/test/java/org/apache/samza/task/TestCoordinatorRequests.java 
> PRE-CREATION 
>   samza-core/src/test/java/org/apache/samza/task/TestTaskCallbackImpl.java 
> PRE-CREATION 
>   samza-core/src/test/java/org/apache/samza/task/TestTaskCallbackManager.java 
> PRE-CREATION 
>   samza-core/src/test/scala/org/apache/samza/container/TestRunLoop.scala 
> e280daa9626757cb4d17c0c03eed923277230c3e 
>   
> samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala 
> 1358fdd8a386f5f81128ef871c72833d8cf11d86 
>   samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala 
> 5457f0e05ae4d615b9c86f48a662c54b13828e78 
>   samza-core/src/test/scala/org/apache/samza/system/TestSystemConsumers.scala 
> 09da62e0f9a10f7c3683345a309c6278ff01fb4b 
> 
> Diff: https://reviews.apache.org/r/48243/diff/
> 
> 
> Testing
> ---
> 
> unit tests and local testing.
> 
> 
> Thanks,
> 
> Xinyu Liu
> 
>



Re: Review Request 48243: SAMZA-961: Async tasks and multithreading model

2016-06-16 Thread Chris Pettitt


> On June 15, 2016, 3:08 p.m., Chris Pettitt wrote:
> > samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java, line 188
> > 
> >
> > Do we need to handle the case that the SSP is not in the mappings or is 
> > that impossible?
> 
> Xinyu Liu wrote:
> For current version of Samza the SSPs is assigned to a task in the job 
> start up time so it's won't change during the life cycle of run loop. If this 
> is going to change in the future, we need to revisit the logic here.

I'd suggest being defensive since it is low cost. For this you would just need 
to do a null check on the worker before using it.


> On June 15, 2016, 3:08 p.m., Chris Pettitt wrote:
> > samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java, line 403
> > 
> >
> > Same comment as above re. run loop termination. You potentially could 
> > even have a abortRunLoop(Throwable) function that would make this super 
> > clear.
> 
> Xinyu Liu wrote:
> Added the AsyncRunLoop.abort(Throwable).

Cool, looks good!


- Chris


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/48243/#review137543
---


On June 15, 2016, 11:41 p.m., Xinyu Liu wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/48243/
> ---
> 
> (Updated June 15, 2016, 11:41 p.m.)
> 
> 
> Review request for samza, Chris Pettitt, Navina Ramesh, and Yi Pan (Data 
> Infrastructure).
> 
> 
> Repository: samza
> 
> 
> Description
> ---
> 
> This is the main part of the change, including the following:
> 
> - New API for AsyncStreamTask and callback.
> - Multithread scheduling in AsyncRunLoop
> - Callback management for asyn tasks
> 
> 
> Diffs
> -
> 
>   checkstyle/import-control.xml 7a09c7ed8ab372d2342f31e850ae09c605292eb2 
>   samza-api/src/main/java/org/apache/samza/task/AsyncStreamTask.java 
> PRE-CREATION 
>   samza-api/src/main/java/org/apache/samza/task/TaskCallback.java 
> PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/container/RunLoopFactory.java 
> PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java 
> PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/task/AsyncStreamTaskAdapter.java 
> PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/task/CoordinatorRequests.java 
> PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/task/TaskCallbackFactory.java 
> PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/task/TaskCallbackImpl.java 
> PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/task/TaskCallbackListener.java 
> PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/task/TaskCallbackManager.java 
> PRE-CREATION 
>   
> samza-core/src/main/java/org/apache/samza/task/TaskCallbackTimeoutException.java
>  PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/util/Utils.java PRE-CREATION 
>   samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala 
> 00648e49f8c7a9bbf5634e18ba0f95feb244613e 
>   samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala 
> 08a4debb06f9925ae741049abb2ee0df97b2243b 
>   samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala 
> 3f25eca6e3dffc57360e8bd8c435177c2a9a910a 
>   samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 
> cf3c4c0ab08a59760bc899c6f2027755e933b350 
>   
> samza-core/src/main/scala/org/apache/samza/container/SamzaContainerMetrics.scala
>  9e6641c3628290dc05e1eb5537e86bff9d37f92c 
>   samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala 
> d32a92976e43ca24033b48c91851ee706de7de6b 
>   
> samza-core/src/main/scala/org/apache/samza/container/TaskInstanceMetrics.scala
>  8b863887cf584d2d7a9b18181c7b0cd1e9dfec00 
>   samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala 
> 2efe836fc3b622cbe89e2042a37407f3cf732f58 
>   samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java 
> PRE-CREATION 
>   samza-core/src/test/java/org/apache/samza/task/TestAsyncStreamAdapter.java 
> PRE-CREATION 
>   samza-core/src/test/java/org/apache/samza/task/TestCoordinatorRequests.java 
> PRE-CREATION 
>   samza-core/src/test/java/org/apache/samza/task/TestTaskCallbackImpl.java 
> PRE-CREATION 
>   samza-core/src/test/java/org/apache/samza/task/TestTaskCallbackManager.java 
> PRE-CREATION 
>   samza-core/src/test/scala/org/apache/samza/container/TestRunLoop.scala 
> e280daa9626757cb4d17c0c03eed923277230c3e 
>   
> samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala 
> 1358fdd8a386f5f81128ef871c72833d8cf11d86 
>   

Re: Review Request 48243: SAMZA-961: Async tasks and multithreading model

2016-06-15 Thread Xinyu Liu


> On June 15, 2016, 3:08 p.m., Chris Pettitt wrote:
> > samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java, line 188
> > 
> >
> > Do we need to handle the case that the SSP is not in the mappings or is 
> > that impossible?

For current version of Samza the SSPs is assigned to a task in the job start up 
time so it's won't change during the life cycle of run loop. If this is going 
to change in the future, we need to revisit the logic here.


> On June 15, 2016, 3:08 p.m., Chris Pettitt wrote:
> > samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java, line 403
> > 
> >
> > Same comment as above re. run loop termination. You potentially could 
> > even have a abortRunLoop(Throwable) function that would make this super 
> > clear.

Added the AsyncRunLoop.abort(Throwable).


- Xinyu


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/48243/#review137543
---


On June 15, 2016, 11:41 p.m., Xinyu Liu wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/48243/
> ---
> 
> (Updated June 15, 2016, 11:41 p.m.)
> 
> 
> Review request for samza, Chris Pettitt, Navina Ramesh, and Yi Pan (Data 
> Infrastructure).
> 
> 
> Repository: samza
> 
> 
> Description
> ---
> 
> This is the main part of the change, including the following:
> 
> - New API for AsyncStreamTask and callback.
> - Multithread scheduling in AsyncRunLoop
> - Callback management for asyn tasks
> 
> 
> Diffs
> -
> 
>   checkstyle/import-control.xml 7a09c7ed8ab372d2342f31e850ae09c605292eb2 
>   samza-api/src/main/java/org/apache/samza/task/AsyncStreamTask.java 
> PRE-CREATION 
>   samza-api/src/main/java/org/apache/samza/task/TaskCallback.java 
> PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/container/RunLoopFactory.java 
> PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java 
> PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/task/AsyncStreamTaskAdapter.java 
> PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/task/CoordinatorRequests.java 
> PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/task/TaskCallbackFactory.java 
> PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/task/TaskCallbackImpl.java 
> PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/task/TaskCallbackListener.java 
> PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/task/TaskCallbackManager.java 
> PRE-CREATION 
>   
> samza-core/src/main/java/org/apache/samza/task/TaskCallbackTimeoutException.java
>  PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/util/Utils.java PRE-CREATION 
>   samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala 
> 00648e49f8c7a9bbf5634e18ba0f95feb244613e 
>   samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala 
> 08a4debb06f9925ae741049abb2ee0df97b2243b 
>   samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala 
> 3f25eca6e3dffc57360e8bd8c435177c2a9a910a 
>   samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 
> cf3c4c0ab08a59760bc899c6f2027755e933b350 
>   
> samza-core/src/main/scala/org/apache/samza/container/SamzaContainerMetrics.scala
>  9e6641c3628290dc05e1eb5537e86bff9d37f92c 
>   samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala 
> d32a92976e43ca24033b48c91851ee706de7de6b 
>   
> samza-core/src/main/scala/org/apache/samza/container/TaskInstanceMetrics.scala
>  8b863887cf584d2d7a9b18181c7b0cd1e9dfec00 
>   samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala 
> 2efe836fc3b622cbe89e2042a37407f3cf732f58 
>   samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java 
> PRE-CREATION 
>   samza-core/src/test/java/org/apache/samza/task/TestAsyncStreamAdapter.java 
> PRE-CREATION 
>   samza-core/src/test/java/org/apache/samza/task/TestCoordinatorRequests.java 
> PRE-CREATION 
>   samza-core/src/test/java/org/apache/samza/task/TestTaskCallbackImpl.java 
> PRE-CREATION 
>   samza-core/src/test/java/org/apache/samza/task/TestTaskCallbackManager.java 
> PRE-CREATION 
>   samza-core/src/test/scala/org/apache/samza/container/TestRunLoop.scala 
> e280daa9626757cb4d17c0c03eed923277230c3e 
>   
> samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala 
> 1358fdd8a386f5f81128ef871c72833d8cf11d86 
>   samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala 
> 5457f0e05ae4d615b9c86f48a662c54b13828e78 
>   samza-core/src/test/scala/org/apache/samza/system/TestSystemConsumers.scala 
> 

Re: Review Request 48243: SAMZA-961: Async tasks and multithreading model

2016-06-15 Thread Xinyu Liu


> On June 15, 2016, 6:49 a.m., Yi Pan (Data Infrastructure) wrote:
> > samza-core/src/main/java/org/apache/samza/container/RunLoopFactory.java, 
> > line 62
> > 
> >
> > It would be better to throw an exception when there is a mixture of 
> > sync and async tasks here, since we don't plan to support that mixture, 
> > right?

I changed it to the async task count so we will make sure it's 0 or the number 
of all tasks.


> On June 15, 2016, 6:49 a.m., Yi Pan (Data Infrastructure) wrote:
> > samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java, line 145
> > 
> >
> > check the new patch from Jake in SAMZA-951. We are now excluding the 
> > chooseNs from the activeNs to discount the wait on network I/O from 
> > activeNs. We should be consistent in AsyncRunLoop here as well.

chooseNS is excluded now.


> On June 15, 2016, 6:49 a.m., Yi Pan (Data Infrastructure) wrote:
> > samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java, line 212
> > 
> >
> > nit: it will improve the readability if defining INFINITE_WAIT_TIME=0L 
> > here. And in addition, if envelope == null, we should add some explanation 
> > on why we need to wait pollIntervalMs() here (i.e. I assume that's because 
> > that choose() will return null immediately w/o waiting for network I/O)?

I changed the code according to your comment below. So now 0 wait time is not 
needed anymore since latch.wait() is defaulted as wait time 0.


> On June 15, 2016, 6:49 a.m., Yi Pan (Data Infrastructure) wrote:
> > samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java, line 252
> > 
> >
> > Shouldn't we make this processed AtomicBoolean? It will make this 
> > PendingEnvelope thread-safe and the code simpler as return 
> > !processed.getAndSet(true);

Talked to Yi in person and I added the comment about this queue will only be 
accessed in run loop thread so no synchronization is needed.


> On June 15, 2016, 6:49 a.m., Yi Pan (Data Infrastructure) wrote:
> > samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java, line 310
> > 
> >
> > nit: This method does not return any object.

Fixed.


> On June 15, 2016, 6:49 a.m., Yi Pan (Data Infrastructure) wrote:
> > samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java, line 515
> > 
> >
> > nit: maybe it reads better w/ a name "hasPendingOps()"? Then, the logic 
> > in blockIfBusy() is also better understood as:
> > if (worker.state.isReady() && (envelope != null || 
> > worker.state.hasPendingOps()) {
> >   // should continue running since the worker state is ready and there 
> > is either new message or some pending operations for the worker
> >   }

Agree. i changed the name of the function.


> On June 15, 2016, 6:49 a.m., Yi Pan (Data Infrastructure) wrote:
> > samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java, line 578
> > 
> >
> > Don't quite get this point. It seems to me that the broadcast stream 
> > message will be called as worker.state.insertEnvelope(envelope) for all 
> > tasks, which will essentially create a PendingEnvelope for each task. 
> > Hence, w/ the current code, it will call consumerMultiplexer.tryUpdate() 
> > from each task???
> 
> Chris Pettitt wrote:
> I had the same comment on the last pass. I don't understand how this is 
> supposed to work. Might be worth discussing this in person with Xinyu and 
> then either fixing the code or the doc as appropriate.

Thanks both of you to catch this. I messed up the code during the task state 
refactoring. Now the logic should be fixed.


- Xinyu


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/48243/#review137369
---


On June 15, 2016, 11:41 p.m., Xinyu Liu wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/48243/
> ---
> 
> (Updated June 15, 2016, 11:41 p.m.)
> 
> 
> Review request for samza, Chris Pettitt, Navina Ramesh, and Yi Pan (Data 
> Infrastructure).
> 
> 
> Repository: samza
> 
> 
> Description
> ---
> 
> This is the main part of the change, including the following:
> 
> - New API for AsyncStreamTask and callback.
> - Multithread scheduling in AsyncRunLoop
> - Callback 

Re: Review Request 48243: SAMZA-961: Async tasks and multithreading model

2016-06-15 Thread Xinyu Liu

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/48243/
---

(Updated June 15, 2016, 11:41 p.m.)


Review request for samza, Chris Pettitt, Navina Ramesh, and Yi Pan (Data 
Infrastructure).


Changes
---

Updates based on Yi and Chris's feedback.


Repository: samza


Description
---

This is the main part of the change, including the following:

- New API for AsyncStreamTask and callback.
- Multithread scheduling in AsyncRunLoop
- Callback management for asyn tasks


Diffs (updated)
-

  checkstyle/import-control.xml 7a09c7ed8ab372d2342f31e850ae09c605292eb2 
  samza-api/src/main/java/org/apache/samza/task/AsyncStreamTask.java 
PRE-CREATION 
  samza-api/src/main/java/org/apache/samza/task/TaskCallback.java PRE-CREATION 
  samza-core/src/main/java/org/apache/samza/container/RunLoopFactory.java 
PRE-CREATION 
  samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java PRE-CREATION 
  samza-core/src/main/java/org/apache/samza/task/AsyncStreamTaskAdapter.java 
PRE-CREATION 
  samza-core/src/main/java/org/apache/samza/task/CoordinatorRequests.java 
PRE-CREATION 
  samza-core/src/main/java/org/apache/samza/task/TaskCallbackFactory.java 
PRE-CREATION 
  samza-core/src/main/java/org/apache/samza/task/TaskCallbackImpl.java 
PRE-CREATION 
  samza-core/src/main/java/org/apache/samza/task/TaskCallbackListener.java 
PRE-CREATION 
  samza-core/src/main/java/org/apache/samza/task/TaskCallbackManager.java 
PRE-CREATION 
  
samza-core/src/main/java/org/apache/samza/task/TaskCallbackTimeoutException.java
 PRE-CREATION 
  samza-core/src/main/java/org/apache/samza/util/Utils.java PRE-CREATION 
  samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala 
00648e49f8c7a9bbf5634e18ba0f95feb244613e 
  samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala 
08a4debb06f9925ae741049abb2ee0df97b2243b 
  samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala 
3f25eca6e3dffc57360e8bd8c435177c2a9a910a 
  samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 
cf3c4c0ab08a59760bc899c6f2027755e933b350 
  
samza-core/src/main/scala/org/apache/samza/container/SamzaContainerMetrics.scala
 9e6641c3628290dc05e1eb5537e86bff9d37f92c 
  samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala 
d32a92976e43ca24033b48c91851ee706de7de6b 
  
samza-core/src/main/scala/org/apache/samza/container/TaskInstanceMetrics.scala 
8b863887cf584d2d7a9b18181c7b0cd1e9dfec00 
  samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala 
2efe836fc3b622cbe89e2042a37407f3cf732f58 
  samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java 
PRE-CREATION 
  samza-core/src/test/java/org/apache/samza/task/TestAsyncStreamAdapter.java 
PRE-CREATION 
  samza-core/src/test/java/org/apache/samza/task/TestCoordinatorRequests.java 
PRE-CREATION 
  samza-core/src/test/java/org/apache/samza/task/TestTaskCallbackImpl.java 
PRE-CREATION 
  samza-core/src/test/java/org/apache/samza/task/TestTaskCallbackManager.java 
PRE-CREATION 
  samza-core/src/test/scala/org/apache/samza/container/TestRunLoop.scala 
e280daa9626757cb4d17c0c03eed923277230c3e 
  samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala 
1358fdd8a386f5f81128ef871c72833d8cf11d86 
  samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala 
5457f0e05ae4d615b9c86f48a662c54b13828e78 
  samza-core/src/test/scala/org/apache/samza/system/TestSystemConsumers.scala 
09da62e0f9a10f7c3683345a309c6278ff01fb4b 

Diff: https://reviews.apache.org/r/48243/diff/


Testing
---

unit tests and local testing.


Thanks,

Xinyu Liu



Re: Review Request 48243: SAMZA-961: Async tasks and multithreading model

2016-06-15 Thread Chris Pettitt

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/48243/#review137543
---




samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java (lines 88 - 91)


Wrap the result in a Collections.immutableMap to ensure it cannot later be 
changed.



samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java (line 92)


Wrap in immutable map to ensure this cannot be later changed.



samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java (line 183)


This is not tue for all cases (e.g. process). You could probably just pull 
this comment.



samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java (line 188)


Do we need to handle the case that the SSP is not in the mappings or is 
that impossible?



samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java (line 368)


It might be worth noting that this will cause the run loop to terminate.



samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java (line 403)


Same comment as above re. run loop termination. You potentially could even 
have a abortRunLoop(Throwable) function that would make this super clear.


We should sync up on how to coordinate the disk quota changes and this change.

- Chris Pettitt


On June 9, 2016, 7:49 p.m., Xinyu Liu wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/48243/
> ---
> 
> (Updated June 9, 2016, 7:49 p.m.)
> 
> 
> Review request for samza, Chris Pettitt, Navina Ramesh, and Yi Pan (Data 
> Infrastructure).
> 
> 
> Repository: samza
> 
> 
> Description
> ---
> 
> This is the main part of the change, including the following:
> 
> - New API for AsyncStreamTask and callback.
> - Multithread scheduling in AsyncRunLoop
> - Callback management for asyn tasks
> 
> 
> Diffs
> -
> 
>   checkstyle/import-control.xml 7a09c7ed8ab372d2342f31e850ae09c605292eb2 
>   samza-api/src/main/java/org/apache/samza/task/AsyncStreamTask.java 
> PRE-CREATION 
>   samza-api/src/main/java/org/apache/samza/task/TaskCallback.java 
> PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/container/RunLoopFactory.java 
> PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java 
> PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/task/AsyncStreamTaskAdapter.java 
> PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/task/CoordinatorRequests.java 
> PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/task/TaskCallbackFactory.java 
> PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/task/TaskCallbackImpl.java 
> PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/task/TaskCallbackListener.java 
> PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/task/TaskCallbackManager.java 
> PRE-CREATION 
>   
> samza-core/src/main/java/org/apache/samza/task/TaskCallbackTimeoutException.java
>  PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/util/Utils.java PRE-CREATION 
>   samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala 
> 00648e49f8c7a9bbf5634e18ba0f95feb244613e 
>   samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala 
> 08a4debb06f9925ae741049abb2ee0df97b2243b 
>   samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala 
> 3f25eca6e3dffc57360e8bd8c435177c2a9a910a 
>   samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 
> cf3c4c0ab08a59760bc899c6f2027755e933b350 
>   
> samza-core/src/main/scala/org/apache/samza/container/SamzaContainerMetrics.scala
>  9e6641c3628290dc05e1eb5537e86bff9d37f92c 
>   samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala 
> d32a92976e43ca24033b48c91851ee706de7de6b 
>   
> samza-core/src/main/scala/org/apache/samza/container/TaskInstanceMetrics.scala
>  8b863887cf584d2d7a9b18181c7b0cd1e9dfec00 
>   samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala 
> 2efe836fc3b622cbe89e2042a37407f3cf732f58 
>   samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java 
> PRE-CREATION 
>   samza-core/src/test/java/org/apache/samza/task/TestAsyncStreamAdapter.java 
> PRE-CREATION 
>   samza-core/src/test/java/org/apache/samza/task/TestCoordinatorRequests.java 
> PRE-CREATION 
>   samza-core/src/test/java/org/apache/samza/task/TestTaskCallbackImpl.java 
> PRE-CREATION 
>   

Re: Review Request 48243: SAMZA-961: Async tasks and multithreading model

2016-06-15 Thread Chris Pettitt


> On June 15, 2016, 6:49 a.m., Yi Pan (Data Infrastructure) wrote:
> > samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java, line 578
> > 
> >
> > Don't quite get this point. It seems to me that the broadcast stream 
> > message will be called as worker.state.insertEnvelope(envelope) for all 
> > tasks, which will essentially create a PendingEnvelope for each task. 
> > Hence, w/ the current code, it will call consumerMultiplexer.tryUpdate() 
> > from each task???

I had the same comment on the last pass. I don't understand how this is 
supposed to work. Might be worth discussing this in person with Xinyu and then 
either fixing the code or the doc as appropriate.


- Chris


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/48243/#review137369
---


On June 9, 2016, 7:49 p.m., Xinyu Liu wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/48243/
> ---
> 
> (Updated June 9, 2016, 7:49 p.m.)
> 
> 
> Review request for samza, Chris Pettitt, Navina Ramesh, and Yi Pan (Data 
> Infrastructure).
> 
> 
> Repository: samza
> 
> 
> Description
> ---
> 
> This is the main part of the change, including the following:
> 
> - New API for AsyncStreamTask and callback.
> - Multithread scheduling in AsyncRunLoop
> - Callback management for asyn tasks
> 
> 
> Diffs
> -
> 
>   checkstyle/import-control.xml 7a09c7ed8ab372d2342f31e850ae09c605292eb2 
>   samza-api/src/main/java/org/apache/samza/task/AsyncStreamTask.java 
> PRE-CREATION 
>   samza-api/src/main/java/org/apache/samza/task/TaskCallback.java 
> PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/container/RunLoopFactory.java 
> PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java 
> PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/task/AsyncStreamTaskAdapter.java 
> PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/task/CoordinatorRequests.java 
> PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/task/TaskCallbackFactory.java 
> PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/task/TaskCallbackImpl.java 
> PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/task/TaskCallbackListener.java 
> PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/task/TaskCallbackManager.java 
> PRE-CREATION 
>   
> samza-core/src/main/java/org/apache/samza/task/TaskCallbackTimeoutException.java
>  PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/util/Utils.java PRE-CREATION 
>   samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala 
> 00648e49f8c7a9bbf5634e18ba0f95feb244613e 
>   samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala 
> 08a4debb06f9925ae741049abb2ee0df97b2243b 
>   samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala 
> 3f25eca6e3dffc57360e8bd8c435177c2a9a910a 
>   samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 
> cf3c4c0ab08a59760bc899c6f2027755e933b350 
>   
> samza-core/src/main/scala/org/apache/samza/container/SamzaContainerMetrics.scala
>  9e6641c3628290dc05e1eb5537e86bff9d37f92c 
>   samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala 
> d32a92976e43ca24033b48c91851ee706de7de6b 
>   
> samza-core/src/main/scala/org/apache/samza/container/TaskInstanceMetrics.scala
>  8b863887cf584d2d7a9b18181c7b0cd1e9dfec00 
>   samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala 
> 2efe836fc3b622cbe89e2042a37407f3cf732f58 
>   samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java 
> PRE-CREATION 
>   samza-core/src/test/java/org/apache/samza/task/TestAsyncStreamAdapter.java 
> PRE-CREATION 
>   samza-core/src/test/java/org/apache/samza/task/TestCoordinatorRequests.java 
> PRE-CREATION 
>   samza-core/src/test/java/org/apache/samza/task/TestTaskCallbackImpl.java 
> PRE-CREATION 
>   samza-core/src/test/java/org/apache/samza/task/TestTaskCallbackManager.java 
> PRE-CREATION 
>   samza-core/src/test/scala/org/apache/samza/container/TestRunLoop.scala 
> e280daa9626757cb4d17c0c03eed923277230c3e 
>   
> samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala 
> 1358fdd8a386f5f81128ef871c72833d8cf11d86 
>   samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala 
> 5457f0e05ae4d615b9c86f48a662c54b13828e78 
>   samza-core/src/test/scala/org/apache/samza/system/TestSystemConsumers.scala 
> 09da62e0f9a10f7c3683345a309c6278ff01fb4b 
> 
> Diff: https://reviews.apache.org/r/48243/diff/
> 
> 
> Testing
> ---
> 
> unit tests and local testing.
> 
> 
> Thanks,
> 
> Xinyu Liu
> 
>



Re: Review Request 48243: SAMZA-961: Async tasks and multithreading model

2016-06-09 Thread Xinyu Liu

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/48243/
---

(Updated June 9, 2016, 7:49 p.m.)


Review request for samza, Chris Pettitt, Navina Ramesh, and Yi Pan (Data 
Infrastructure).


Changes
---

Fixed a bug in AsyncTaskWorder.process. Prevoiusly we are generating the 
callback here before calling taskInstance.process(). Actually 
taskInstance.process() has some ssp caughtup check so might not call 
task.process(). So instead of passing in the callback, I pass in a callback 
factory to generate the callback only when task.process is called.

Other updates are based on Chris's feedback.


Repository: samza


Description
---

This is the main part of the change, including the following:

- New API for AsyncStreamTask and callback.
- Multithread scheduling in AsyncRunLoop
- Callback management for asyn tasks


Diffs (updated)
-

  checkstyle/import-control.xml 7a09c7ed8ab372d2342f31e850ae09c605292eb2 
  samza-api/src/main/java/org/apache/samza/task/AsyncStreamTask.java 
PRE-CREATION 
  samza-api/src/main/java/org/apache/samza/task/TaskCallback.java PRE-CREATION 
  samza-core/src/main/java/org/apache/samza/container/RunLoopFactory.java 
PRE-CREATION 
  samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java PRE-CREATION 
  samza-core/src/main/java/org/apache/samza/task/AsyncStreamTaskAdapter.java 
PRE-CREATION 
  samza-core/src/main/java/org/apache/samza/task/CoordinatorRequests.java 
PRE-CREATION 
  samza-core/src/main/java/org/apache/samza/task/TaskCallbackFactory.java 
PRE-CREATION 
  samza-core/src/main/java/org/apache/samza/task/TaskCallbackImpl.java 
PRE-CREATION 
  samza-core/src/main/java/org/apache/samza/task/TaskCallbackListener.java 
PRE-CREATION 
  samza-core/src/main/java/org/apache/samza/task/TaskCallbackManager.java 
PRE-CREATION 
  
samza-core/src/main/java/org/apache/samza/task/TaskCallbackTimeoutException.java
 PRE-CREATION 
  samza-core/src/main/java/org/apache/samza/util/Utils.java PRE-CREATION 
  samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala 
00648e49f8c7a9bbf5634e18ba0f95feb244613e 
  samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala 
08a4debb06f9925ae741049abb2ee0df97b2243b 
  samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala 
3f25eca6e3dffc57360e8bd8c435177c2a9a910a 
  samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 
cf3c4c0ab08a59760bc899c6f2027755e933b350 
  
samza-core/src/main/scala/org/apache/samza/container/SamzaContainerMetrics.scala
 9e6641c3628290dc05e1eb5537e86bff9d37f92c 
  samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala 
d32a92976e43ca24033b48c91851ee706de7de6b 
  
samza-core/src/main/scala/org/apache/samza/container/TaskInstanceMetrics.scala 
8b863887cf584d2d7a9b18181c7b0cd1e9dfec00 
  samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala 
2efe836fc3b622cbe89e2042a37407f3cf732f58 
  samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java 
PRE-CREATION 
  samza-core/src/test/java/org/apache/samza/task/TestAsyncStreamAdapter.java 
PRE-CREATION 
  samza-core/src/test/java/org/apache/samza/task/TestCoordinatorRequests.java 
PRE-CREATION 
  samza-core/src/test/java/org/apache/samza/task/TestTaskCallbackImpl.java 
PRE-CREATION 
  samza-core/src/test/java/org/apache/samza/task/TestTaskCallbackManager.java 
PRE-CREATION 
  samza-core/src/test/scala/org/apache/samza/container/TestRunLoop.scala 
e280daa9626757cb4d17c0c03eed923277230c3e 
  samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala 
1358fdd8a386f5f81128ef871c72833d8cf11d86 
  samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala 
5457f0e05ae4d615b9c86f48a662c54b13828e78 
  samza-core/src/test/scala/org/apache/samza/system/TestSystemConsumers.scala 
09da62e0f9a10f7c3683345a309c6278ff01fb4b 

Diff: https://reviews.apache.org/r/48243/diff/


Testing
---

unit tests and local testing.


Thanks,

Xinyu Liu



Re: Review Request 48243: SAMZA-961: Async tasks and multithreading model

2016-06-08 Thread Chris Pettitt

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/48243/#review136357
---



Biggest take away on this pass is that the async run loop code would be more 
readable without the two level deep anonymous inner classes. Surfacing how 
these classes interact with eachother should make this easier to review.


samza-core/src/main/java/org/apache/samza/container/RunLoopFactory.java (line 
73)


It's been a while and I'm not sure if we discussed: is the goal to 
ultimately just switch over to async task for both cases (async and sync)?



samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java (line 118)


As far as I can tell, this will only run window and commit in the thread 
pool but otherwise will run process on the current thread.



samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java (lines 123 - 
128)


I believe this was copied from the original run loop. It seems to me that 
it would be nice to have the run loop have lifecycle methods (e.g. shutdown) 
and have some other application setup class (the container?) that is 
responsible for tying shutdown of the process to shutting down the run loop. 
This would allow the run loop lifecycle to be decouple from the process 
lifecycle at no loss on generality.



samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java (lines 275 - 
276)


Only for commit and window, right?



samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java (lines 437 - 
442)


This is a bit confusing to me. I see that we're enqueueing an evelope 
wrapped in a pending envelope per task worker, but I don't see how we would end 
up dequeueing the same pending envelope twice. Could we expand on the 
documentation to make it clearer?

Also, depending on how we end up dequeueing twice we need to ensure that 
the state change (mark processed) is visible. Is that the case? The state is 
mutable and we're not using anything to publish the state change (as far as I 
can see). Either we should ensure this state is visible across threads, e.g. 
with a volatile or CAS, or we should document why it doesn't need to be.



samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java (line 518)


This is the second level deep non-static inner class. At this point 
reasoning about the interactions between everything is getting pretty 
difficult. Instead, could we make the interactions a clearer by making these 
inner classes top-level, package private? I suspect this might involve 
shuffling some things around. For example, maybe pendingEnvelopeQueue really 
belongs in workerState and the worker inserts and removes via the state 
instance.

Having a smaller non-static inner class, e.g. for a callback, is reasonable 
if it is not too complicated, but both of these, as is, are a bit too heavy.



samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala (lines 71 - 
75)


Same comment as for the async run loop.


- Chris Pettitt


On June 4, 2016, 1:18 a.m., Xinyu Liu wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/48243/
> ---
> 
> (Updated June 4, 2016, 1:18 a.m.)
> 
> 
> Review request for samza, Chris Pettitt, Navina Ramesh, and Yi Pan (Data 
> Infrastructure).
> 
> 
> Repository: samza
> 
> 
> Description
> ---
> 
> This is the main part of the change, including the following:
> 
> - New API for AsyncStreamTask and callback.
> - Multithread scheduling in AsyncRunLoop
> - Callback management for asyn tasks
> 
> 
> Diffs
> -
> 
>   checkstyle/import-control.xml 7a09c7ed8ab372d2342f31e850ae09c605292eb2 
>   samza-api/src/main/java/org/apache/samza/task/AsyncStreamTask.java 
> PRE-CREATION 
>   samza-api/src/main/java/org/apache/samza/task/TaskCallback.java 
> PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/container/RunLoopFactory.java 
> PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java 
> PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/task/AsyncStreamTaskAdapter.java 
> PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/task/CoordinatorRequests.java 
> PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/task/TaskCallbackImpl.java 
> PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/task/TaskCallbackListener.java 
> PRE-CREATION 
>