Re: Review Request 48182: SAMZA-958: Make store/cache thread safe

2016-06-08 Thread Xinyu Liu


> On June 6, 2016, 6:22 p.m., Chris Pettitt wrote:
> > samza-test/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStores.scala,
> >  line 528
> > <https://reviews.apache.org/r/48182/diff/1-2/?file=1405222#file1405222line528>
> >
> > How about actually capturing the test failure and rethrowing from the 
> > main thread? It will give you a much more helpful error message.

Good suggestion. Add the try catch and rethrow.


- Xinyu


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/48182/#review136335
-------


On June 9, 2016, 12:33 a.m., Xinyu Liu wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/48182/
> ---
> 
> (Updated June 9, 2016, 12:33 a.m.)
> 
> 
> Review request for samza, Chris Pettitt, Navina Ramesh, and Yi Pan (Data 
> Infrastructure).
> 
> 
> Repository: samza
> 
> 
> Description
> ---
> 
> All the existing stores/cache need to be thread safe in order to be used by 
> multithreaded tasks. The following changes are made to ensure the thread 
> safety of the stores:
> 
> For CachedStore, use sychronized lock for each public function;
> For current InMemoryKeyValueStore, use ConcurrentSkipListMap as underlying 
> map for thread safety.
> For store Iterator, do not support remove functionality (throw 
> UnsupportedOperationException like RocksDb does today).
> 
> 
> Diffs
> -
> 
>   
> samza-kv-inmemory/src/main/scala/org/apache/samza/storage/kv/inmemory/InMemoryKeyValueStore.scala
>  72f25a354eaa98e8df379d07d9cc8613dfafd13a 
>   
> samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala
>  f0965aec5f3ec2a214dc40c70832c58273623749 
>   
> samza-kv-rocksdb/src/test/scala/org/apache/samza/storage/kv/TestRocksDbKeyValueStore.scala
>  b7f1cdc4dbaeea2413cee2ad60d74528f3950513 
>   samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStore.scala 
> c28f8db8cb59bd5415e78535877acc1e5bee0f67 
>   samza-kv/src/test/scala/org/apache/samza/storage/kv/TestCachedStore.scala 
> eee74473726cb2a36d0b75fe5c9df737440980bc 
>   
> samza-test/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStores.scala
>  23f8a1a6bee8ef38e0640a4e90778e53d982deeb 
> 
> Diff: https://reviews.apache.org/r/48182/diff/
> 
> 
> Testing
> ---
> 
> Unit tests and local deployment.
> 
> 
> Thanks,
> 
> Xinyu Liu
> 
>



Re: Review Request 48182: SAMZA-958: Make store/cache thread safe

2016-06-08 Thread Xinyu Liu

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/48182/
---

(Updated June 9, 2016, 12:33 a.m.)


Review request for samza, Chris Pettitt, Navina Ramesh, and Yi Pan (Data 
Infrastructure).


Changes
---

Updates on the unit tests based on Chris and Navina's feedback.


Repository: samza


Description
---

All the existing stores/cache need to be thread safe in order to be used by 
multithreaded tasks. The following changes are made to ensure the thread safety 
of the stores:

For CachedStore, use sychronized lock for each public function;
For current InMemoryKeyValueStore, use ConcurrentSkipListMap as underlying map 
for thread safety.
For store Iterator, do not support remove functionality (throw 
UnsupportedOperationException like RocksDb does today).


Diffs (updated)
-

  
samza-kv-inmemory/src/main/scala/org/apache/samza/storage/kv/inmemory/InMemoryKeyValueStore.scala
 72f25a354eaa98e8df379d07d9cc8613dfafd13a 
  
samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala
 f0965aec5f3ec2a214dc40c70832c58273623749 
  
samza-kv-rocksdb/src/test/scala/org/apache/samza/storage/kv/TestRocksDbKeyValueStore.scala
 b7f1cdc4dbaeea2413cee2ad60d74528f3950513 
  samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStore.scala 
c28f8db8cb59bd5415e78535877acc1e5bee0f67 
  samza-kv/src/test/scala/org/apache/samza/storage/kv/TestCachedStore.scala 
eee74473726cb2a36d0b75fe5c9df737440980bc 
  
samza-test/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStores.scala 
23f8a1a6bee8ef38e0640a4e90778e53d982deeb 

Diff: https://reviews.apache.org/r/48182/diff/


Testing
---

Unit tests and local deployment.


Thanks,

Xinyu Liu



Re: Review Request 48213: SAMZA-960: Make system producer thread safe

2016-06-08 Thread Xinyu Liu


> On June 8, 2016, 3:36 p.m., Chris Pettitt wrote:
> > samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala,
> >  line 89
> > <https://reviews.apache.org/r/48213/diff/2-3/?file=1406178#file1406178line89>
> >
> > You need to save the value of the producer before this. Something like:
> > 
> > ```
> > final currentProducer = producer;
> > if (currentProducer == null) {
> > ...
> > ```
> > 
> > And you need to replace all usages of producer in this method with 
> > currentProducer. This ensures that the producer is not changed out from 
> > under you midway through the method or later in the callback.

Great solution! I modified the code using local var currentProducer. Thanks.


- Xinyu


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/48213/#review136654
---


On June 8, 2016, 11:53 p.m., Xinyu Liu wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/48213/
> ---
> 
> (Updated June 8, 2016, 11:53 p.m.)
> 
> 
> Review request for samza, Chris Pettitt, Navina Ramesh, and Yi Pan (Data 
> Infrastructure).
> 
> 
> Repository: samza
> 
> 
> Description
> ---
> 
> All the system producers need to be thread safe in order to be used in 
> multithreaded tasks. The following are the changes 
> (ElasticSearchSystemProducer is already thread safe so no change made there):
> 
> In KafkaSystemProducer, remove the buggy retry logic and treat any exception 
> as fatal.
> In HdfsSystemProducer, add synchronization lock to all public methods.
> 
> 
> Diffs
> -
> 
>   
> samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsSystemProducer.scala
>  1f4b5c46436e44b7c7cd1a49689c4f43f1f6ed1b 
>   
> samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala
>  3769e103616dc0f1fd869706cc086e24cd926c48 
>   
> samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemProducerJava.java
>  04c9113fd6c3dd56c49ff46c8c1c0ff12f68e5e2 
>   
> samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemProducer.scala
>  8e32bba6ced090f0fc8d4e5176fe0788df36981d 
> 
> Diff: https://reviews.apache.org/r/48213/diff/
> 
> 
> Testing
> ---
> 
> Unit tests and local testing.
> 
> 
> Thanks,
> 
> Xinyu Liu
> 
>



Re: Review Request 48213: SAMZA-960: Make system producer thread safe

2016-06-08 Thread Xinyu Liu

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/48213/
---

(Updated June 8, 2016, 11:53 p.m.)


Review request for samza, Chris Pettitt, Navina Ramesh, and Yi Pan (Data 
Infrastructure).


Repository: samza


Description
---

All the system producers need to be thread safe in order to be used in 
multithreaded tasks. The following are the changes (ElasticSearchSystemProducer 
is already thread safe so no change made there):

In KafkaSystemProducer, remove the buggy retry logic and treat any exception as 
fatal.
In HdfsSystemProducer, add synchronization lock to all public methods.


Diffs (updated)
-

  
samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsSystemProducer.scala 
1f4b5c46436e44b7c7cd1a49689c4f43f1f6ed1b 
  
samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala
 3769e103616dc0f1fd869706cc086e24cd926c48 
  
samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemProducerJava.java
 04c9113fd6c3dd56c49ff46c8c1c0ff12f68e5e2 
  
samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemProducer.scala
 8e32bba6ced090f0fc8d4e5176fe0788df36981d 

Diff: https://reviews.apache.org/r/48213/diff/


Testing
---

Unit tests and local testing.


Thanks,

Xinyu Liu



Review Request 48243: SAMZA-961: Async tasks and multithreading model

2016-06-03 Thread Xinyu Liu

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/48243/
---

Review request for samza.


Repository: samza


Description
---

This is the main part of the change, including the following:

- New API for AsyncStreamTask and callback.
- Multithread scheduling in AsyncRunLoop
- Callback management for asyn tasks


Diffs
-

  checkstyle/import-control.xml 7a09c7ed8ab372d2342f31e850ae09c605292eb2 
  samza-api/src/main/java/org/apache/samza/task/AsyncStreamTask.java 
PRE-CREATION 
  samza-api/src/main/java/org/apache/samza/task/TaskCallback.java PRE-CREATION 
  samza-core/src/main/java/org/apache/samza/container/RunLoopFactory.java 
PRE-CREATION 
  samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java PRE-CREATION 
  samza-core/src/main/java/org/apache/samza/task/AsyncStreamTaskAdapter.java 
PRE-CREATION 
  samza-core/src/main/java/org/apache/samza/task/CoordinatorRequests.java 
PRE-CREATION 
  samza-core/src/main/java/org/apache/samza/task/TaskCallbackImpl.java 
PRE-CREATION 
  samza-core/src/main/java/org/apache/samza/task/TaskCallbackListener.java 
PRE-CREATION 
  samza-core/src/main/java/org/apache/samza/task/TaskCallbackManager.java 
PRE-CREATION 
  
samza-core/src/main/java/org/apache/samza/task/TaskCallbackTimeoutException.java
 PRE-CREATION 
  samza-core/src/main/java/org/apache/samza/util/Utils.java PRE-CREATION 
  samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala 
00648e49f8c7a9bbf5634e18ba0f95feb244613e 
  samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala 
08a4debb06f9925ae741049abb2ee0df97b2243b 
  samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala 
3f25eca6e3dffc57360e8bd8c435177c2a9a910a 
  samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 
cf3c4c0ab08a59760bc899c6f2027755e933b350 
  
samza-core/src/main/scala/org/apache/samza/container/SamzaContainerMetrics.scala
 9e6641c3628290dc05e1eb5537e86bff9d37f92c 
  samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala 
d32a92976e43ca24033b48c91851ee706de7de6b 
  
samza-core/src/main/scala/org/apache/samza/container/TaskInstanceMetrics.scala 
8b863887cf584d2d7a9b18181c7b0cd1e9dfec00 
  samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala 
2efe836fc3b622cbe89e2042a37407f3cf732f58 
  samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java 
PRE-CREATION 
  samza-core/src/test/java/org/apache/samza/task/TestAsyncStreamAdapter.java 
PRE-CREATION 
  samza-core/src/test/java/org/apache/samza/task/TestCoordinatorRequests.java 
PRE-CREATION 
  samza-core/src/test/java/org/apache/samza/task/TestTaskCallbackImpl.java 
PRE-CREATION 
  samza-core/src/test/java/org/apache/samza/task/TestTaskCallbackManager.java 
PRE-CREATION 
  samza-core/src/test/scala/org/apache/samza/container/TestRunLoop.scala 
e280daa9626757cb4d17c0c03eed923277230c3e 
  samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala 
1358fdd8a386f5f81128ef871c72833d8cf11d86 
  samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala 
5457f0e05ae4d615b9c86f48a662c54b13828e78 
  samza-core/src/test/scala/org/apache/samza/system/TestSystemConsumers.scala 
09da62e0f9a10f7c3683345a309c6278ff01fb4b 

Diff: https://reviews.apache.org/r/48243/diff/


Testing
---

unit tests and local testing.


Thanks,

Xinyu Liu



Re: Review Request 48213: SAMZA-960: Make system producer thread safe

2016-06-03 Thread Xinyu Liu

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/48213/
---

(Updated June 3, 2016, 10:09 p.m.)


Review request for samza, Chris Pettitt, Navina Ramesh, and Yi Pan (Data 
Infrastructure).


Changes
---

Fixes according to Chris's comments.


Repository: samza


Description
---

All the system producers need to be thread safe in order to be used in 
multithreaded tasks. The following are the changes (ElasticSearchSystemProducer 
is already thread safe so no change made there):

In KafkaSystemProducer, remove the buggy retry logic and treat any exception as 
fatal.
In HdfsSystemProducer, add synchronization lock to all public methods.


Diffs (updated)
-

  
samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsSystemProducer.scala 
1f4b5c46436e44b7c7cd1a49689c4f43f1f6ed1b 
  
samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala
 3769e103616dc0f1fd869706cc086e24cd926c48 
  
samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemProducerJava.java
 04c9113fd6c3dd56c49ff46c8c1c0ff12f68e5e2 
  
samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemProducer.scala
 8e32bba6ced090f0fc8d4e5176fe0788df36981d 

Diff: https://reviews.apache.org/r/48213/diff/


Testing
---

Unit tests and local testing.


Thanks,

Xinyu Liu



Review Request 48213: SAMZA-960: Make system producer thread safe

2016-06-03 Thread Xinyu Liu

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/48213/
---

Review request for samza, Chris Pettitt, Navina Ramesh, and Yi Pan (Data 
Infrastructure).


Repository: samza


Description
---

All the system producers need to be thread safe in order to be used in 
multithreaded tasks. The following are the changes (ElasticSearchSystemProducer 
is already thread safe so no change made there):

In KafkaSystemProducer, remove the buggy retry logic and treat any exception as 
fatal.
In HdfsSystemProducer, add synchronization lock to all public methods.


Diffs
-

  
samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsSystemProducer.scala 
1f4b5c46436e44b7c7cd1a49689c4f43f1f6ed1b 
  
samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala
 3769e103616dc0f1fd869706cc086e24cd926c48 
  
samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemProducerJava.java
 04c9113fd6c3dd56c49ff46c8c1c0ff12f68e5e2 
  
samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemProducer.scala
 8e32bba6ced090f0fc8d4e5176fe0788df36981d 

Diff: https://reviews.apache.org/r/48213/diff/


Testing
---

Unit tests and local testing.


Thanks,

Xinyu Liu



Re: [DISCUSS] Re-thinking the Samza Job Coordinator

2016-03-02 Thread xinyu liu
This is awesome! Samza will be able to run standalone or on different kinds
of clusters, which can serve a wide range of applications. Glad to see this
is happening.

Thanks,
Xinyu

On Tue, Mar 1, 2016 at 8:48 AM, Jagadish Venkatraman  wrote:

> Hi all,
>
> Currently, the only way to run distributed Samza containers is using Yarn.
> There have been requests to run Samza with other resource managers like
> Mesos, to run Samza as an embedded library, and to run in Docker containers
> managed by ECS/Kubernetes.
>
> I created SAMZA-881 for this proposing a re-design of the Samza
> coordinator.
>
>
>- The new re-design views the JobCoordinator as a component that can
>provide the containerId, and JobModel to the SamzaContainer.
>- There are pluggable interfaces defined for Leader Election, and
>interacting with a Resource Manager like Yarn/Mesos.
>- Primitives for dynamically refreshing the job model.
>
> I've attached a design doc with the Jira ticket, and I'd really appreciate
> your feedback on the ticket.
>
> Thanks,
> Jagadish
>
> --
> Jagadish V,
> Graduate Student,
> Department of Computer Science,
> Stanford University
>


Re: Understand Samza default metrics

2016-02-24 Thread Xinyu Liu
Thanks, Shadi. The doc is really useful!

@Milinda: thanks for pointing it out. Process-calls includes both
process-envelopes and process-null-envelopes, so it should be
process-envelopes in David's example.

Thanks,
Xinyu

On Wed, Feb 24, 2016 at 9:52 AM, Abdollahian Noghabi, Shadi <
abdol...@illinois.edu> wrote:

> I have attached the document to SAMZA-702.<
> https://issues.apache.org/jira/browse/SAMZA-702>
>
>
> On Feb 24, 2016, at 9:33 AM, Milinda Pathirage <mpath...@umail.iu.edu
> <mailto:mpath...@umail.iu.edu>> wrote:
>
> Hi Shadi,
>
> Attachment is not there in your mail. I think mailing list dropped the
> attachment. IMHO, we should create a JIRA issue and attach the doc to the
> issue so that we can move it to Samza docs.
>
> On Wed, Feb 24, 2016 at 12:27 PM, Abdollahian Noghabi, Shadi <
> abdol...@illinois.edu<mailto:abdol...@illinois.edu>> wrote:
>
> I have a document with some of the metrics. I had gathered these around
> last summer, so they may be out-of-date. I have attached the document to
> this email. Hope it can help.
>
>
>
>
>
>
> On Feb 24, 2016, at 7:10 AM, Milinda Pathirage <mpath...@umail.iu.edu
> <mailto:mpath...@umail.iu.edu>>
> wrote:
>
> Hi David and Xinyu,
>
> If you want to get the number of messages processed, "process-envelopes"
> is
> the correct metrics. "process-calls" gives measure the number of times
> RunLoop#process method is called. So "process-calls" get updated even
> without processing any messages (This happens when no new messages in
> input
> stream). "process-ns" can be used as the average time taken to process a
> message. But this average also includes time taken to process null
> messages. So I don't trust the accuracy of that metric.
>
> Each metric emitted by Samza contains a header which includes job name,
> job
> id, container name and metric timestamp. You can use it to calculate
> messages per second values.
>
> If you are using KV store, KeyValueStoreMetrics contains metrics such as
> bytes read, bytes write, puts and gets for each store.
>
> Thanks
> Milinda
>
> On Tue, Feb 23, 2016 at 8:26 PM, xinyu liu <xinyuliu...@gmail.com xinyuliu...@gmail.com>>
> wrote:
>
> Hi, David,
>
> I didn't find a wiki page that contains the descriptions of all Samza
> metrics. You can find the basic metrics by googling the following
> classes:
> SamzaContainerMetrics, TaskInstanceMetrics, SystemConsumersMetrics and
> SystemProducersMetrics. For your example, you can use the
> "process-calls"
> in SamzaContainerMetrics to get the processed message count, and divide
> the
> delta by time to get the messages processed per sec. In practice, you
> can
> either use JConsole to connect to the running Samza container or consume
> the MetricsSnapshot topic to get the detailed metrics.
>
> Thanks,
> Xinyu
>
> On Tue, Feb 23, 2016 at 4:51 PM, David Yu <david...@optimizely.com david...@optimizely.com>>
> wrote:
>
> Hi,
>
> Where can I find the detailed descriptions of the out of the box
> metrics
> provided by MetricsSnapshotReporterFactory and JmxReporterFactory?
>
> I'm interested in seeing the basic metrics of the my samza job (e.g.
> messages_processed_per_sec). But it's hard to ping point to the
> specific
> metric that shows me that.
>
> Thanks,
> David
>
>
>
>
>
> --
> Milinda Pathirage
>
> PhD Student | Research Assistant
> School of Informatics and Computing | Data to Insight Center
> Indiana University
>
> twitter: milindalakmal
> skype: milinda.pathirage
> blog: http://milinda.pathirage.org
>
>
>
>
> --
> Milinda Pathirage
>
> PhD Student | Research Assistant
> School of Informatics and Computing | Data to Insight Center
> Indiana University
>
> twitter: milindalakmal
> skype: milinda.pathirage
> blog: http://milinda.pathirage.org
>
>


Re: Understand Samza default metrics

2016-02-23 Thread xinyu liu
Hi, David,

I didn't find a wiki page that contains the descriptions of all Samza
metrics. You can find the basic metrics by googling the following classes:
SamzaContainerMetrics, TaskInstanceMetrics, SystemConsumersMetrics and
SystemProducersMetrics. For your example, you can use the "process-calls"
in SamzaContainerMetrics to get the processed message count, and divide the
delta by time to get the messages processed per sec. In practice, you can
either use JConsole to connect to the running Samza container or consume
the MetricsSnapshot topic to get the detailed metrics.

Thanks,
Xinyu

On Tue, Feb 23, 2016 at 4:51 PM, David Yu  wrote:

> Hi,
>
> Where can I find the detailed descriptions of the out of the box metrics
> provided by MetricsSnapshotReporterFactory and JmxReporterFactory?
>
> I'm interested in seeing the basic metrics of the my samza job (e.g.
> messages_processed_per_sec). But it's hard to ping point to the specific
> metric that shows me that.
>
> Thanks,
> David
>


Re: Review Request 43350: SAMZA-867 Fix job restart/shutdown in the event of a node outage.

2016-02-23 Thread Xinyu Liu

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/43350/#review120393
---


Fix it, then Ship it!




Looks good to me. A few minor suggestions below.


samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala (line 97)
<https://reviews.apache.org/r/43350/#comment181833>

Usually this is done in Scala using option.getOrElse. So instead of adding 
a new method, we can just do

config.getCommandClass.getOrElse(defaultValue).



samza-yarn/src/main/java/org/apache/samza/job/yarn/AbstractContainerAllocator.java
 (line 114)
<https://reviews.apache.org/r/43350/#comment181837>

SamzaException instead of NullPointerException?



samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaContainerLaunchException.java
 (line 27)
<https://reviews.apache.org/r/43350/#comment181838>

is serialVersionUID needed here? If so please use the autogenerated one.


- Xinyu Liu


On Feb. 23, 2016, 9:43 p.m., Jake Maes wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/43350/
> ---
> 
> (Updated Feb. 23, 2016, 9:43 p.m.)
> 
> 
> Review request for samza, Navina Ramesh, Jagadish Venkatraman, and Yi Pan 
> (Data Infrastructure).
> 
> 
> Repository: samza
> 
> 
> Description
> ---
> 
> * Throw a SamzaContainerStartException whenever the container start request 
> to the NM fails.
> * Catch the exception in the Allocator and re-request for a container on 
> ANY-HOST
> * Release the failed container
> * Also, refactored some methods in the ContainerUtil and ContainerAllocators 
> to simplify the code.
> * Finally, fix a logic bug in the HostAwareContainerAllocator wherein the 
> expiration logic was flipped.
> 
> 
> Diffs
> -
> 
>   samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala 
> 51e9e99e506c065938b18d081df6fe1ff24f79ec 
>   
> samza-yarn/src/main/java/org/apache/samza/job/yarn/AbstractContainerAllocator.java
>  2e192eecf412525f45846edb62b23d261d35cbda 
>   samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerAllocator.java 
> 31fcc572d499d4e17cdf1340d7072c1562ccfdb7 
>   
> samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerRequestState.java 
> 54db5e5b2b1b109d202e814809adbfd2bc84fb4b 
>   samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerUtil.java 
> 91fae98c074e1648e7168fb8e76d6e1e656816fc 
>   
> samza-yarn/src/main/java/org/apache/samza/job/yarn/HostAwareContainerAllocator.java
>  8e1db77db74e1dc01bb0ec64e102befd225123e9 
>   samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaAppState.java 
> bc5b606394351e4039d084914fe5898e6ff148a1 
>   
> samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaContainerLaunchException.java
>  PRE-CREATION 
>   samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaTaskManager.java 
> a3562a1155cbf24989200063b3ebd472b295db37 
>   
> samza-yarn/src/test/java/org/apache/samza/job/yarn/TestContainerAllocator.java
>  e2b45d7577dea5a4a71af22521c93a7fd75eaefc 
>   
> samza-yarn/src/test/java/org/apache/samza/job/yarn/TestHostAwareContainerAllocator.java
>  269d82479650b3bc2890d250da0391d34104b1eb 
>   
> samza-yarn/src/test/java/org/apache/samza/job/yarn/TestSamzaTaskManager.java 
> 88d9f24d16fc3d9842b387cfc22edaf1dfa6fd06 
>   
> samza-yarn/src/test/java/org/apache/samza/job/yarn/util/MockContainerAllocator.java
>  5fcad82fc10f87510e994896ed7380f1e636a18f 
>   
> samza-yarn/src/test/java/org/apache/samza/job/yarn/util/MockContainerListener.java
>  8fc0b9898ed4484b9db98da7f622ab61f35cd4b1 
>   
> samza-yarn/src/test/java/org/apache/samza/job/yarn/util/MockContainerRequestState.java
>  e7441e512e73b5d09740f252dc52efece640bea4 
>   
> samza-yarn/src/test/java/org/apache/samza/job/yarn/util/MockContainerUtil.java
>  4426ce6ac8808257dc00df50452df3d7555d6d0b 
>   
> samza-yarn/src/test/java/org/apache/samza/job/yarn/util/TestAMRMClientImpl.java
>  951e0f9504b87263eba891d367c9806248241c7d 
> 
> Diff: https://reviews.apache.org/r/43350/diff/
> 
> 
> Testing
> ---
> 
> Added 2 unit tests and I'm doing some manual testing in parallel with this 
> review. The manual test is to kill the NM for one of the containers of a job 
> that uses Host Affinity. When the RM detects the outage, it should notify the 
> AM which will try to restart the container on the same host. It will get a 
> connection error and at that point should retry on a DIFFERENT host.
> 
> The testing was successful for both scenarios. 
>

Re: Review Request 43053: allow warning instead of fail in case of invalid num of partitions in the checkpoint partition

2016-02-01 Thread Xinyu Liu

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/43053/#review117242
---


Fix it, then Ship it!




LGTM. One minor comment below.


samza-kafka/src/main/scala/org/apache/samza/util/KafkaUtil.scala (line 162)
<https://reviews.apache.org/r/43053/#comment178367>

nitpick: the var name of "msg1" seems not very well chosen. Shall we do 
warn(msg + "...") instead?


- Xinyu Liu


On Feb. 1, 2016, 6:25 p.m., Boris Shkolnik wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/43053/
> ---
> 
> (Updated Feb. 1, 2016, 6:25 p.m.)
> 
> 
> Review request for samza.
> 
> 
> Repository: samza
> 
> 
> Description
> ---
> 
> We have a validation code that verifies that checkpoint topic has the right 
> number of partitions (1).
> But, in some environments, it is difficult to repair or delete the invalid 
> topic. 
> This config will allow to by pass this validation (it will issue a warning 
> only) and to continue with a checkpoint topic with an incorrect number of 
> partitions. 
> The checkpoints are written into partion 0.
> 
> 
> Diffs
> -
> 
>   samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala 
> 1a8adae4d30fa198c90e8c177c7f17269c5953cd 
>   
> samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
>  787de1f62479a098bf251f072fca03bbf92f7c6d 
>   
> samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala
>  7db894091284794b7f5fac164eb55b5d78184a36 
>   
> samza-kafka/src/main/scala/org/apache/samza/migration/KafkaCheckpointMigration.scala
>  c6b1fe4bf3c3601502e014d582d86f8ea0850b20 
>   samza-kafka/src/main/scala/org/apache/samza/util/KafkaUtil.scala 
> f4311d1cda7c66c66544c5a3ac94a17cae62863a 
>   
> samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala
>  af4051b28df5eeaeaee527a24907a8e66441f743 
> 
> Diff: https://reviews.apache.org/r/43053/diff/
> 
> 
> Testing
> ---
> 
> 
> Thanks,
> 
> Boris Shkolnik
> 
>



Review Request 41912: SAMZA-850: Yarn Job Validation Tool

2016-01-04 Thread Xinyu Liu

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/41912/
---

Review request for samza, Boris Shkolnik, Navina Ramesh, and Yi Pan (Data 
Infrastructure).


Repository: samza


Description
---

We've seen a strong need for validating running samza jobs programmatically. 
Consider the following scenarios:
1) running samza as a service (as in SAMZA-849), which requires automated job 
validation when upgrading the framework for large number of jobs;
2) production monitoring, automated validation can run periodically and trigger 
alerts;
3) integration testing, which can use validation tool to check the job status.

So a tool is needed to provide the checkout of job health. Minimally it will 
check:
- job submission
- app running
- container count

It should also be able to extend and validate the job metrics through metrics 
system, such as JMX. The user will be able to validate against important 
metrics such as process_count or message_behind_high_watermark.


Diffs
-

  checkstyle/import-control.xml 53cb8b447240fea08d98ccfb12ed24bec6cbf67c 
  samza-api/src/main/java/org/apache/samza/metrics/MetricsAccessor.java 
PRE-CREATION 
  
samza-api/src/main/java/org/apache/samza/metrics/MetricsValidationFailureException.java
 PRE-CREATION 
  samza-api/src/main/java/org/apache/samza/metrics/MetricsValidator.java 
PRE-CREATION 
  samza-core/src/main/java/org/apache/samza/job/model/JobModel.java 
9445a30c9e605a3623873ed09eedfd5140af98f7 
  samza-core/src/main/java/org/apache/samza/metrics/JmxMetricsAccessor.java 
PRE-CREATION 
  samza-core/src/main/java/org/apache/samza/metrics/JmxUtil.java PRE-CREATION 
  samza-core/src/main/scala/org/apache/samza/metrics/reporter/JmxReporter.scala 
e9661023a04f39d059d879fea2140cb57af3b546 
  samza-core/src/test/java/org/apache/samza/metrics/TestJmxMetricsAccessor.java 
PRE-CREATION 
  samza-shell/src/main/bash/validate-yarn-job.sh PRE-CREATION 
  
samza-yarn/src/main/java/org/apache/samza/validation/YarnJobValidationTool.java 
PRE-CREATION 
  
samza-yarn/src/test/java/org/apache/samza/validation/MockMetricsValidator.java 
PRE-CREATION 
  
samza-yarn/src/test/java/org/apache/samza/validation/TestYarnJobValidationTool.java
 PRE-CREATION 

Diff: https://reviews.apache.org/r/41912/diff/


Testing
---

Unit tests added to cover all the new methods/classes.


Thanks,

Xinyu Liu



Re: [VOTE] Samza 0.10.0 Release Candidate 2

2015-12-09 Thread xinyu liu
+1 on my side. I also ran the gradle build and unit tests without failure.

Thanks,
Xinyu

On Wed, Dec 9, 2015 at 9:54 AM, Tao Feng  wrote:

> +1 from my side(non-binding). I download the package and successfully run
> all the unit tests without failure.
>
> On Tue, Dec 8, 2015 at 3:38 PM, Yi Pan  wrote:
>
> > Hey all,
> >
> >
> > This is a call for a vote on a release of Apache Samza 0.10.0. Thanks to
> > everyone who has contributed to this release. We are very glad to see
> some
> > new contributors in this release.
> >
> >
> > The release candidate can be downloaded from here:
> >
> >
> > http://home.apache.org/~nickpan47/samza-0.10.0-rc2/
> >
> >
> > The release candidate is signed with pgp key 911402D8, which is
> >
> > included in the repository's KEYS file:
> >
> >
> >
> >
> https://git-wip-us.apache.org/repos/asf?p=samza.git;a=blob;f=KEYS;h=66cbd15cddbbd798c3529e9a8b7f052aab0037a7
> >
> >
> > and can also be found on keyservers:
> >
> > http://pgp.mit.edu/pks/lookup?op=get=0x911402D8
> >
> >
> > The git tag is release-0.10.0-rc2 and signed with the same pgp key:
> >
> >
> >
> >
> https://git-wip-us.apache.org/repos/asf?p=samza.git;a=tag;h=4a37a3c4754b94805646522fc6644f2dd998e828
> >
> >
> > Test binaries have been published to Maven's staging repository, and are
> >
> > available here:
> >
> >
> > https://repository.apache.org/content/repositories/orgapachesamza-1011/
> >
> >
> > Note that the binaries were built with JDK7 without incident. This is the
> > first version of Samza that does not support JDK6 any more.
> >
> >
> > 128 issues were resolved for this release:
> >
> >
> >
> >
> https://issues.apache.org/jira/issues/?jql=project%20%3D%20SAMZA%20AND%20fixVersion%20%3D%200.10.0%20AND%20status%20in%20(Resolved%2C%20Closed)
> >
> >
> > The vote will be open for 72 hours ( end in 4:00pm Friday, 12/11/2015 ).
> >
> > Please download the release candidate, check the hashes/signature, build
> it
> >
> > and test it, and then please vote:
> >
> >
> > [ ] +1 approve
> >
> > [ ] +0 no opinion
> >
> > [ ] -1 disapprove (and reason why)
> >
> >
> > +1 from my side for the release.
> >
> >
> > Yi Pan
> >
> > nickpa...@gmail.com
> >
>


Review Request 41071: SAMZA-843: fix heap usage increase caused by container timer change

2015-12-07 Thread Xinyu Liu

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/41071/
---

Review request for samza and Yi Pan (Data Infrastructure).


Repository: samza


Description
---

After the change of container timer metrics (chooseNs, windowNs, processNs, and 
commitNs) from millisecond to nanosecond, we noticed a dramatic increase of 
memory heap usage in one of our production job. After investigation we found 
that the SlidingTimeWindowReservoir.update(duration) will be called much more 
frequently due to the duration is non-zero after the nanosecond change (In 
contrast, it is often zero when using millisecond). Within the 5-minute window, 
the storage inside SlidingTimeWindowReservoir increases a lot for a high qps 
job (for our job with around 10K qps, it increases the heap from <5M to 100M). 
It causes long GCs and degrades the job performance. The fix will make the 
SlidingTimeWindowReservoir collision buffer default to be 1 and configurable 
from the constructor.


Diffs
-

  
samza-api/src/main/java/org/apache/samza/metrics/SlidingTimeWindowReservoir.java
 df543599771b7cda3be6ea702a85091cf61883d7 
  samza-api/src/main/java/org/apache/samza/metrics/Timer.java 
b49d14758116660ac6b26cc7e2459b293343e47e 
  
samza-api/src/test/java/org/apache/samza/metrics/TestSlidingTimeWindowReservoir.java
 d392b3258b7aca4323d663159c4e545e113277bb 
  samza-api/src/test/java/org/apache/samza/metrics/TestTimer.java 
63c183f9283c58006de23a342e61717e062030c3 
  samza-core/src/test/scala/org/apache/samza/container/TestRunLoop.scala 
b4d6f351f97a7c3a29c133b4ba3876ce4b6baca2 

Diff: https://reviews.apache.org/r/41071/diff/


Testing
---


Thanks,

Xinyu Liu



Re: Review Request 39806: SAMZA-798 : Performance and stability issue after combining checkpoint and coordinator stream

2015-11-02 Thread Xinyu Liu


> On Oct. 30, 2015, 6:06 p.m., Xinyu Liu wrote:
> > samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala, 
> > line 132
> > <https://reviews.apache.org/r/39806/diff/1/?file=1112956#file1112956line132>
> >
> > I think scala prefers to use the companion object as the factory to 
> > create new instance (code before change). Is there any reason for this 
> > change?
> 
> Navina Ramesh wrote:
> I didn't know that about scala. But what is the advantage behind it? It 
> only seems to obscure the functionality of an instance, imo. Let me know why. 
> I don't mind either way. :)

My understanding is that using the companion object apply() method is the 
"functional" way to create new object, instead of new object(see 
http://stackoverflow.com/questions/9737352/what-is-the-apply-function-in-scala).
 So it's pretty popular in Scala. It does looks cleaner to me if there are 
multiple constructors defined or creating subclass objects, plus the scala lib 
uses this pattern a lot, like Map(), List(), ...


- Xinyu


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/39806/#review104583
---


On Nov. 2, 2015, 5:50 a.m., Navina Ramesh wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/39806/
> ---
> 
> (Updated Nov. 2, 2015, 5:50 a.m.)
> 
> 
> Review request for samza, Chris Riccomini, Jake Maes, Jagadish Venkatraman, 
> and Yi Pan (Data Infrastructure).
> 
> 
> Bugs: SAMZA-798
> https://issues.apache.org/jira/browse/SAMZA-798
> 
> 
> Repository: samza
> 
> 
> Description
> ---
> 
> Adding interfaces for CheckpointManager, CheckpointManagerFactory and moving 
> Checkpoint to api
> 
> 
> Adding KafkaCheckpointLogKey, KafkaCheckpointManager and 
> KafkaCheckpointManagerFactory back from 0.9.1
> 
> 
> Changed SamzaContainer and OffsetManager
> 
> 
> Removed checkpointmanager in JC and modified TaskModel to remove 
> offsetMapping. Container will continue to use offsetmanager for fetching 
> offsets
> 
> 
> Fixed OffsetManager bugs
> 
> 
> Got rid of all compile errors during build with -x test
> 
> 
> Fixing Jackson object mapper for TaskModel
> 
> 
> Commented tests in checkpoint manager and fixed other failing tests
> 
> 
> Refactored KCM and moved generic functions like createTopic & validateTopic 
> to kafkaUtil.scala
> 
> 
> KCM unit tests work
> 
> 
> Got rid of old migration code and its test. Got rid of redundant KCM
> 
> 
> Commented out migration related tests in jobrunner
> 
> 
> Moved migration code from old.checkpoint package
> 
> 
> Fixed 1 migration test
> 
> 
> Fixed checkpoint migration and its unit tests
> 
> 
> Removed migration related tests from TestKafkaCheckpointManager
> 
> 
> Removed some commented lines and fixed a test in TestJobCoordinator
> 
> 
> Deleted CheckpointManager and SetCheckpoint
> 
> 
> Diffs
> -
> 
>   docs/learn/documentation/versioned/jobs/configuration-table.html 
> 4adac09305cbdb07b0d2cd9f8b189df1c290 
>   samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointManager.java 
> PRE-CREATION 
>   
> samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointManagerFactory.java
>  PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/checkpoint/Checkpoint.java  
>   samza-core/src/main/java/org/apache/samza/checkpoint/CheckpointManager.java 
> 0185751c28979e50b1bddc28c90339defd94200b 
>   
> samza-core/src/main/java/org/apache/samza/coordinator/stream/messages/SetCheckpoint.java
>  21afa8569801150e81b4c14ee21a9077dfa1895f 
>   samza-core/src/main/java/org/apache/samza/job/model/TaskModel.java 
> e00c49d5255c0af6d44e251aed4e8360cd3026c5 
>   
> samza-core/src/main/java/org/apache/samza/serializers/model/JsonTaskModelMixIn.java
>  172358a5428c9789e0883fc0e5ad3e5c3398478a 
>   samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala 
> 2e3aeb8fd5a86aa39464adff9e75aca96622ebad 
>   samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala 
> 1464acc7ec6592a21c3cdf96f34847e094e9e5e3 
>   
> samza-core/src/main/scala/org/apache/samza/checkpoint/file/FileSystemCheckpointManager.scala
>  PRE-CREATION 
>   samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 
> 0b73403018b895879ed2c0538a5cd495813d2eae 
>   samza-core/src/main/scala/org/apach

Re: Review Request 39806: SAMZA-798 : Performance and stability issue after combining checkpoint and coordinator stream

2015-10-30 Thread Xinyu Liu

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/39806/#review104583
---

Ship it!


Some minor suggestions. Overall the change looks good. Thanks for the quick fix!


samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointManager.java 
(line 29)
<https://reviews.apache.org/r/39806/#comment162814>

Just a nitpick: for java interfaces, all methods are public, so no need to 
add public keywords. Could you please remove it from all the interface methods?



samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala 
<https://reviews.apache.org/r/39806/#comment162818>

Any reason to remove the factory method?



samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala 
(line 123)
<https://reviews.apache.org/r/39806/#comment162817>

I think scala prefers to use the companion object as the factory to create 
new instance (code before change). Is there any reason for this change?


- Xinyu Liu


On Oct. 30, 2015, 7:09 a.m., Navina Ramesh wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/39806/
> ---
> 
> (Updated Oct. 30, 2015, 7:09 a.m.)
> 
> 
> Review request for samza, Chris Riccomini, Jake Maes, Jagadish Venkatraman, 
> and Yi Pan (Data Infrastructure).
> 
> 
> Bugs: SAMZA-798
> https://issues.apache.org/jira/browse/SAMZA-798
> 
> 
> Repository: samza
> 
> 
> Description
> ---
> 
> Adding interfaces for CheckpointManager, CheckpointManagerFactory and moving 
> Checkpoint to api
> 
> 
> Adding KafkaCheckpointLogKey, KafkaCheckpointManager and 
> KafkaCheckpointManagerFactory back from 0.9.1
> 
> 
> Changed SamzaContainer and OffsetManager
> 
> 
> Removed checkpointmanager in JC and modified TaskModel to remove 
> offsetMapping. Container will continue to use offsetmanager for fetching 
> offsets
> 
> 
> Fixed OffsetManager bugs
> 
> 
> Got rid of all compile errors during build with -x test
> 
> 
> Fixing Jackson object mapper for TaskModel
> 
> 
> Commented tests in checkpoint manager and fixed other failing tests
> 
> 
> Refactored KCM and moved generic functions like createTopic & validateTopic 
> to kafkaUtil.scala
> 
> 
> KCM unit tests work
> 
> 
> Got rid of old migration code and its test. Got rid of redundant KCM
> 
> 
> Commented out migration related tests in jobrunner
> 
> 
> Moved migration code from old.checkpoint package
> 
> 
> Fixed 1 migration test
> 
> 
> Fixed checkpoint migration and its unit tests
> 
> 
> Removed migration related tests from TestKafkaCheckpointManager
> 
> 
> Removed some commented lines and fixed a test in TestJobCoordinator
> 
> 
> Deleted CheckpointManager and SetCheckpoint
> 
> 
> Diffs
> -
> 
>   samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointManager.java 
> PRE-CREATION 
>   
> samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointManagerFactory.java
>  PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/checkpoint/Checkpoint.java  
>   samza-core/src/main/java/org/apache/samza/checkpoint/CheckpointManager.java 
> 0185751c28979e50b1bddc28c90339defd94200b 
>   
> samza-core/src/main/java/org/apache/samza/coordinator/stream/messages/SetCheckpoint.java
>  21afa8569801150e81b4c14ee21a9077dfa1895f 
>   samza-core/src/main/java/org/apache/samza/job/model/TaskModel.java 
> e00c49d5255c0af6d44e251aed4e8360cd3026c5 
>   
> samza-core/src/main/java/org/apache/samza/serializers/model/JsonTaskModelMixIn.java
>  172358a5428c9789e0883fc0e5ad3e5c3398478a 
>   samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala 
> 2e3aeb8fd5a86aa39464adff9e75aca96622ebad 
>   samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala 
> 1464acc7ec6592a21c3cdf96f34847e094e9e5e3 
>   samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 
> 0b73403018b895879ed2c0538a5cd495813d2eae 
>   samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala 
> 03299cb7cb93d43165a74206113497462d8119e9 
>   
> samza-core/src/main/scala/org/apache/samza/migration/JobRunnerMigration.scala 
> 374e27e8233a27132019d429f6fa1f131db3fe15 
>   
> samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamWrappedConsumer.java
>  dd04d28e54e7afe0cc6d6c2aa508911a14e668bf 
>   
> samza-core/src/test/java/org/apache/samza/serializers/model/TestSamza

<    1   2   3