[jira] [Reopened] (SAMZA-2005) Validate Sql

2019-05-16 Thread Weiqing Yang (JIRA)


 [ 
https://issues.apache.org/jira/browse/SAMZA-2005?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Weiqing Yang reopened SAMZA-2005:
-

> Validate Sql
> 
>
> Key: SAMZA-2005
> URL: https://issues.apache.org/jira/browse/SAMZA-2005
> Project: Samza
>  Issue Type: Sub-task
>Reporter: Weiqing Yang
>Priority: Major
>
> We can implement Samza SQL validation: Syntax, resource presence, schema 
> presence, etc. (including unit tests with Samza SQL test framework)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (SAMZA-1902) Support different data systems

2019-05-16 Thread Weiqing Yang (JIRA)


 [ 
https://issues.apache.org/jira/browse/SAMZA-1902?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Weiqing Yang resolved SAMZA-1902.
-
Resolution: Fixed

> Support different data systems
> --
>
> Key: SAMZA-1902
> URL: https://issues.apache.org/jira/browse/SAMZA-1902
> Project: Samza
>  Issue Type: Sub-task
>Reporter: Weiqing Yang
>Priority: Major
>
> Currently the Shell can only talk to Kafka system, but we may need to use a 
> general way to connect to different systems.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (SAMZA-2027) Config support in through Set statement

2019-05-16 Thread Weiqing Yang (JIRA)


 [ 
https://issues.apache.org/jira/browse/SAMZA-2027?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Weiqing Yang resolved SAMZA-2027.
-
Resolution: Fixed

> Config support in through Set statement
> ---
>
> Key: SAMZA-2027
> URL: https://issues.apache.org/jira/browse/SAMZA-2027
> Project: Samza
>  Issue Type: Sub-task
>  Components: sql
>Reporter: Weiqing Yang
>Priority: Major
>
> Supports environment variables for SqlExecutors. User can now set environment 
> variables for the shell itself and for any SqlExecutor, by executing the 
> 'SET' command in the shell or by using a configuration file. User can also 
> choose to use any SqlExecutor they want in the same way.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (SAMZA-2005) Validate Sql

2019-05-16 Thread Weiqing Yang (JIRA)


 [ 
https://issues.apache.org/jira/browse/SAMZA-2005?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Weiqing Yang resolved SAMZA-2005.
-
Resolution: Fixed

> Validate Sql
> 
>
> Key: SAMZA-2005
> URL: https://issues.apache.org/jira/browse/SAMZA-2005
> Project: Samza
>  Issue Type: Sub-task
>Reporter: Weiqing Yang
>Priority: Major
>
> We can implement Samza SQL validation: Syntax, resource presence, schema 
> presence, etc. (including unit tests with Samza SQL test framework)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Reopened] (SAMZA-1901) Samza SQL Shell

2019-05-16 Thread Weiqing Yang (JIRA)


 [ 
https://issues.apache.org/jira/browse/SAMZA-1901?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Weiqing Yang reopened SAMZA-1901:
-

> Samza SQL Shell
> ---
>
> Key: SAMZA-1901
> URL: https://issues.apache.org/jira/browse/SAMZA-1901
> Project: Samza
>  Issue Type: New Feature
>  Components: sql
>Reporter: Weiqing Yang
>Priority: Major
> Fix For: 0.15.0
>
>
> This Jira implements the first version of Samza sql shell. 
> Please refer to the attached document for more details about the shell, 
> including the tech choices, features, design decisions, how to build, run and 
> debug the shell.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (SAMZA-1901) Samza SQL Shell

2019-05-16 Thread Weiqing Yang (JIRA)


 [ 
https://issues.apache.org/jira/browse/SAMZA-1901?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Weiqing Yang resolved SAMZA-1901.
-
Resolution: Fixed

> Samza SQL Shell
> ---
>
> Key: SAMZA-1901
> URL: https://issues.apache.org/jira/browse/SAMZA-1901
> Project: Samza
>  Issue Type: New Feature
>  Components: sql
>Reporter: Weiqing Yang
>Priority: Major
> Fix For: 0.15.0
>
>
> This Jira implements the first version of Samza sql shell. 
> Please refer to the attached document for more details about the shell, 
> including the tech choices, features, design decisions, how to build, run and 
> debug the shell.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[samza] branch 1.2.0 created (now f80659d)

2019-05-16 Thread boryas
This is an automated email from the ASF dual-hosted git repository.

boryas pushed a change to branch 1.2.0
in repository https://gitbox.apache.org/repos/asf/samza.git.


  at f80659d  replaced verstion to be 1.2.0

This branch includes the following new commits:

 new f80659d  replaced verstion to be 1.2.0

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.




[samza] 01/01: replaced verstion to be 1.2.0

2019-05-16 Thread boryas
This is an automated email from the ASF dual-hosted git repository.

boryas pushed a commit to branch 1.2.0
in repository https://gitbox.apache.org/repos/asf/samza.git

commit f80659d999e9e6b7b3377abeacccdd863e42b19b
Author: Boris S 
AuthorDate: Thu May 16 17:34:18 2019 -0700

replaced verstion to be 1.2.0
---
 gradle.properties  | 2 +-
 samza-test/src/main/config/join/README | 8 
 samza-test/src/main/python/configs/tests.json  | 2 +-
 samza-test/src/main/python/stream_processor.py | 2 +-
 4 files changed, 7 insertions(+), 7 deletions(-)

diff --git a/gradle.properties b/gradle.properties
index 20b4b9f..11dd29f 100644
--- a/gradle.properties
+++ b/gradle.properties
@@ -15,7 +15,7 @@
 # specific language governing permissions and limitations
 # under the License.
 group=org.apache.samza
-version=1.1.1-SNAPSHOT
+version=1.2.0
 scalaSuffix=2.11
 
 gradleVersion=4.8
diff --git a/samza-test/src/main/config/join/README 
b/samza-test/src/main/config/join/README
index 37bb354..4130553 100644
--- a/samza-test/src/main/config/join/README
+++ b/samza-test/src/main/config/join/README
@@ -44,17 +44,17 @@ Deploy Zookeeper, YARN and Kafka:
 > cd $HELLO_SAMZA_SRC
 > for i in zookeeper kafka yarn; do ./bin/grid install $i; ./bin/grid start 
 > $i; done
 
-Update the "yarn.package.path" to 
$DEPLOY_DIR/samza-test_2.11-1.1.1-SNAPSHOT.tgz
+Update the "yarn.package.path" to $DEPLOY_DIR/samza-test_2.11-1.2.0.tgz
 > cd $SAMZA_SRC
 > vi samza-test/src/main/config/join/common.properties
-yarn.package.path=file:///path/to/samza-hello-samza/deploy/samza-test_2.11-1.1.1-SNAPSHOT.tgz
+yarn.package.path=file:///path/to/samza-hello-samza/deploy/samza-test_2.11-1.2.0.tgz
 
 Then release and extract the test tarball:
 > cd $SAMZA_SRC
 > ./gradlew releaseTestJobs
-> cp samza-test/build/distributions/samza-test_2.11-1.1.1-SNAPSHOT.tgz 
$DEPLOY_DIR
+> cp samza-test/build/distributions/samza-test_2.11-1.2.0.tgz $DEPLOY_DIR
 > mkdir $DEPLOY_DIR/samza
-> tar -xvf $DEPLOY_DIR/samza-test_2.11-1.1.1-SNAPSHOT.tgz -C $DEPLOY_DIR/samza
+> tar -xvf $DEPLOY_DIR/samza-test_2.11-1.2.0.tgz -C $DEPLOY_DIR/samza
 
 Finally, create the kafka topics and start the samza jobs:
 > ./bin/setup-int-test.sh $DEPLOY_DIR
diff --git a/samza-test/src/main/python/configs/tests.json 
b/samza-test/src/main/python/configs/tests.json
index 0b0ab92..e537367 100644
--- a/samza-test/src/main/python/configs/tests.json
+++ b/samza-test/src/main/python/configs/tests.json
@@ -1,5 +1,5 @@
 {
-  "samza_executable": "samza-test_2.11-1.1.1-SNAPSHOT.tgz",
+  "samza_executable": "samza-test_2.11-1.2.0.tgz",
   "samza_install_path": "deploy/smoke_tests",
   "samza_config_factory": 
"org.apache.samza.config.factories.PropertiesConfigFactory"
 }
diff --git a/samza-test/src/main/python/stream_processor.py 
b/samza-test/src/main/python/stream_processor.py
index dacd59c..2fef894 100644
--- a/samza-test/src/main/python/stream_processor.py
+++ b/samza-test/src/main/python/stream_processor.py
@@ -43,7 +43,7 @@ class StreamProcessor:
 logger.info('Running processor start command: 
{0}'.format(self.processor_start_command))
 self.deployment_config = {
 'install_path': 
os.path.join(runtime.get_active_config('remote_install_path'), 
'deploy/{0}'.format(self.processor_id)),
-'executable': 'samza-test_2.11-1.0.1-SNAPSHOT.tgz',
+'executable': 'samza-test_2.11-1.2.0.tgz',
 'post_install_cmds': [],
 'start_command': self.processor_start_command,
 'stop_command': '',



[GitHub] [samza] weisong44 commented on a change in pull request #1031: SAMZA-2191: Batch support for table APIs

2019-05-16 Thread GitBox
weisong44 commented on a change in pull request #1031: SAMZA-2191: Batch 
support for table APIs
URL: https://github.com/apache/samza/pull/1031#discussion_r284931281
 
 

 ##
 File path: docs/learn/documentation/versioned/api/table-api.md
 ##
 @@ -296,6 +299,29 @@ Couchbase is supported as remote table. See
 
[`CouchbaseTableReadFunction`](https://github.com/apache/samza/blob/master/samza-kv-couchbase/src/main/java/org/apache/samza/table/remote/couchbase/CouchbaseTableReadFunction.java)
 and 
 
[`CouchbaseTableWriteFunction`](https://github.com/apache/samza/blob/master/samza-kv-couchbase/src/main/java/org/apache/samza/table/remote/couchbase/CouchbaseTableWriteFunction.java).
 
+### Batching
+
+Remote Table has built-in client-side batching support in both of its sync and 
async executions.
+This is useful when a remote data store supports batch processing and is not 
sophisticated enough
+to handle heavy inbound requests. 
+
+ Configuration
+
+Batching can be enabled with 
[`RemoteTableDescriptor`](https://github.com/apache/samza/blob/master/samza-core/src/main/java/org/apache/samza/table/remote/RemoteTableDescriptor.java)
+by providing a 
[`BatchProvider`](https://github.com/apache/samza/samza-api/src/main/java/org/apache/samza/table/batching/BatchProvider.java)
+The user can choose:
+ 
+1. A 
[`CompactBatchProvider`](https://github.com/apache/samza/samza-core/src/main/java/org/apache/samza/table/batching/CompactBatchProvider.java)
 which provides a batch such that
+   the operations are compacted the the key. For update operations, the latter 
update will override the value of the previous one when they have the same key. 
For query operations,
 
 Review comment:
   Remove "the"


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [samza] weisong44 commented on a change in pull request #1031: SAMZA-2191: Batch support for table APIs

2019-05-16 Thread GitBox
weisong44 commented on a change in pull request #1031: SAMZA-2191: Batch 
support for table APIs
URL: https://github.com/apache/samza/pull/1031#discussion_r284935113
 
 

 ##
 File path: 
samza-test/src/test/java/org/apache/samza/test/table/TestRemoteTableWithBatchEndToEnd.java
 ##
 @@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.test.table;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import org.apache.samza.application.StreamApplication;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.operators.KV;
+import org.apache.samza.runtime.LocalApplicationRunner;
+import org.apache.samza.serializers.NoOpSerde;
+import org.apache.samza.system.descriptors.DelegatingSystemDescriptor;
+import org.apache.samza.system.descriptors.GenericInputDescriptor;
+import org.apache.samza.table.Table;
+import org.apache.samza.table.batching.CompactBatchProvider;
+import org.apache.samza.table.descriptors.RemoteTableDescriptor;
+import org.apache.samza.table.remote.TableRateLimiter;
+import org.apache.samza.table.remote.TableReadFunction;
+import org.apache.samza.table.remote.TableWriteFunction;
+import org.apache.samza.test.harness.IntegrationTestHarness;
+import org.apache.samza.test.util.Base64Serializer;
+import org.apache.samza.util.RateLimiter;
+import org.junit.Assert;
+import org.junit.Test;
+
+import static org.apache.samza.test.table.TestTableData.*;
+import static org.mockito.Mockito.*;
+
+
+public class TestRemoteTableWithBatchEndToEnd extends IntegrationTestHarness {
+
+  static Map> writtenRecords = new HashMap<>();
+
+  static class InMemoryReadFunction implements TableReadFunction {
+private final String serializedProfiles;
+private transient Map profileMap;
+
+private InMemoryReadFunction(String profiles) {
+  this.serializedProfiles = profiles;
+}
+
+private void readObject(ObjectInputStream in) throws IOException, 
ClassNotFoundException {
+  in.defaultReadObject();
+  Profile[] profiles = 
Base64Serializer.deserialize(this.serializedProfiles, Profile[].class);
+  this.profileMap = Arrays.stream(profiles).collect(Collectors.toMap(p -> 
p.getMemberId(), Function.identity()));
+}
+
+@Override
+public CompletableFuture getAsync(Integer key) {
+  return CompletableFuture.completedFuture(profileMap.get(key));
+}
+
+@Override
+public boolean isRetriable(Throwable exception) {
+  return false;
+}
+
+static InMemoryReadFunction getInMemoryReadFunction(String 
serializedProfiles) {
+  return new InMemoryReadFunction(serializedProfiles);
+}
+  }
+
+  static class InMemoryWriteFunction implements TableWriteFunction {
+private transient List records;
+private String testName;
+
+public InMemoryWriteFunction(String testName) {
+  this.testName = testName;
+}
+
+// Verify serializable functionality
+private void readObject(ObjectInputStream in) throws IOException, 
ClassNotFoundException {
+  in.defaultReadObject();
+
+  // Write to the global list for verification
+  records = writtenRecords.get(testName);
+}
+
+@Override
+public CompletableFuture putAsync(Integer key, EnrichedPageView 
record) {
+  records.add(record);
+  return CompletableFuture.completedFuture(null);
+}
+
+@Override
+public CompletableFuture deleteAsync(Integer key) {
+  records.remove(key);
+  return CompletableFuture.completedFuture(null);
+}
+
+@Override
+public boolean isRetriable(Throwable exception) {
+  return false;
+}
+  }
+
+
+  static class MyReadFunction implements TableReadFunction {
+@Override
+public CompletableFuture getAsync(Object key) {
+  return null;
+}
+
+@Override
+public boolean isRetriable(Throwable exception) {
+  return false;
+}
+  }
+
+ 

[GitHub] [samza] weisong44 commented on a change in pull request #1031: SAMZA-2191: Batch support for table APIs

2019-05-16 Thread GitBox
weisong44 commented on a change in pull request #1031: SAMZA-2191: Batch 
support for table APIs
URL: https://github.com/apache/samza/pull/1031#discussion_r284933428
 
 

 ##
 File path: 
samza-core/src/main/java/org/apache/samza/table/batching/BatchProcessor.java
 ##
 @@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.table.batching;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantLock;
+import org.apache.samza.table.batch.BatchPolicy;
+
+
+/**
+ * Place a sequence of operations into a {@link Batch}. When a batch is not 
allowed to accept more
+ * operations, it will handled by a {@link BatchHandler}. Meanwhile, a new 
{@link Batch} will be
+ * created and a timer will be set for it.
+ *
+ * @param  The type of the key associated with the {@link Operation}
+ * @param  The type of the value associated with the {@link Operation}
+ */
+public class BatchProcessor {
+  private final ScheduledExecutorService scheduledExecutorService;
+  private final ReentrantLock lock = new ReentrantLock();
+  private final BatchPolicy batchPolicy;
+  private final BatchHandler batchHandler;
+  private Batch batch;
+  private ScheduledFuture scheduledFuture;
+
+  /**
+   * @param batchHandler Defines how each batch will be processed.
+   * @param batchPolicy The batch configurations to be used.
+   * @param scheduledExecutorService A scheduled executor service to set 
timers for the managed batches.
+   */
+  public BatchProcessor(BatchHandler batchHandler, BatchPolicy 
batchPolicy,
+  ScheduledExecutorService scheduledExecutorService) {
+Preconditions.checkNotNull(batchHandler);
+Preconditions.checkNotNull(batchPolicy);
+Preconditions.checkNotNull(scheduledExecutorService);
+
+this.batchHandler = batchHandler;
+this.scheduledExecutorService = scheduledExecutorService;
+this.batchPolicy = batchPolicy;
+batch = BatchFactory.getBatch(batchHandler, batchPolicy);
+setBatchTimer(batch);
+  }
+
+  private CompletableFuture addOperation(Operation operation) {
+if (batch.isEmpty()) {
+  setBatchTimer(batch);
+} else if (batch.isClosed()) {
+  processBatch();
+}
+batch.addOperation(operation);
+final CompletableFuture res = batch.getCompletableFuture();
+
+if (batch.isClosed()) {
 
 Review comment:
   makes sense!


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [samza] weisong44 commented on a change in pull request #1031: SAMZA-2191: Batch support for table APIs

2019-05-16 Thread GitBox
weisong44 commented on a change in pull request #1031: SAMZA-2191: Batch 
support for table APIs
URL: https://github.com/apache/samza/pull/1031#discussion_r284933206
 
 

 ##
 File path: 
samza-core/src/main/java/org/apache/samza/table/batching/BatchProcessor.java
 ##
 @@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.table.batching;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantLock;
+import org.apache.samza.table.batch.BatchPolicy;
+
+
+/**
+ * Place a sequence of operations into a {@link Batch}. When a batch is not 
allowed to accept more
+ * operations, it will handled by a {@link BatchHandler}. Meanwhile, a new 
{@link Batch} will be
+ * created and a timer will be set for it.
+ *
+ * @param  The type of the key associated with the {@link Operation}
+ * @param  The type of the value associated with the {@link Operation}
+ */
+public class BatchProcessor {
+  private final ScheduledExecutorService scheduledExecutorService;
 
 Review comment:
   Perfect!


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [samza] weisong44 commented on a change in pull request #1031: SAMZA-2191: Batch support for table APIs

2019-05-16 Thread GitBox
weisong44 commented on a change in pull request #1031: SAMZA-2191: Batch 
support for table APIs
URL: https://github.com/apache/samza/pull/1031#discussion_r284932743
 
 

 ##
 File path: 
samza-core/src/main/java/org/apache/samza/table/batching/AsyncBatchingTable.java
 ##
 @@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.table.batching;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ScheduledExecutorService;
+import org.apache.samza.SamzaException;
+import org.apache.samza.context.Context;
+import org.apache.samza.storage.kv.Entry;
+import org.apache.samza.table.AsyncReadWriteTable;
+import org.apache.samza.table.ReadWriteTable;
+import org.apache.samza.table.utils.TableMetricsUtil;
+
+
+/**
+ * A wrapper of a {@link AsyncReadWriteTable} that supports batch operations.
+ *
+ * This batching table does not guarantee any ordering of different operation 
types within the batch.
+ * For instance, query(Q) and update(u) operations arrives in the following 
sequences, Q1, U1, Q2, U2,
+ * it does not mean the the remote data store will receive the messages in the 
same order. Instead,
+ * the operations will be grouped by type and sent via micro batches. For this 
sequence, Q1 and Q2 will
+ * be grouped to micro batch B1; U1 and U2 will be grouped to micro batch B2, 
the implementation class
+ * can decide the order of the micro batches.
+ *
+ * Synchronized table operations (get/put/delete) should be used with caution 
for the batching feature.
+ * If the table is used by a single thread, there will be at most one 
operation in the batch, and the
+ * batch will be performed when the TTL of the batch window expires. Batching 
does not make sense in this scenario.
+ *
+ * @param  The type of the key.
+ * @param  The type of the value.
+ */
+public class AsyncBatchingTable implements ReadWriteTable {
 
 Review comment:
   For delegation tables, we only need to implement the async methods. So it 
only need to implement AsyncReadWriteTable


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [samza] mynameborat commented on a change in pull request #938: SAMZA-1531: Support run.id in standalone for batch processing.

2019-05-16 Thread GitBox
mynameborat commented on a change in pull request #938: SAMZA-1531: Support 
run.id in standalone for batch processing.
URL: https://github.com/apache/samza/pull/938#discussion_r284936300
 
 

 ##
 File path: 
samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java
 ##
 @@ -1011,7 +1011,7 @@ public void testAgreeingOnSameRunIdForBatch() throws 
InterruptedException {
 LocalApplicationRunner localApplicationRunner1 = (LocalApplicationRunner) 
appRunner1;
 LocalApplicationRunner localApplicationRunner2 = (LocalApplicationRunner) 
appRunner2;
 
-assertEquals("RunId of the two processors does not match", 
localApplicationRunner2.getRunid(), localApplicationRunner1.getRunid());
+assertEquals("RunId of the two processors does not match", 
localApplicationRunner2.getRunId().orElse(null), 
localApplicationRunner1.getRunId().orElse(null));
 
 Review comment:
   You don't have to unwrap the optional for assertEquals. 
   I think it should work out of the box with object comparison.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [samza] weisong44 merged pull request #1034: SAMZA-2185: Add ability to expose remote store specific features in remote table

2019-05-16 Thread GitBox
weisong44 merged pull request #1034: SAMZA-2185: Add ability to expose remote 
store specific features in remote table
URL: https://github.com/apache/samza/pull/1034
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [samza] dengpanyin commented on a change in pull request #1031: SAMZA-2191: Batch support for table APIs

2019-05-16 Thread GitBox
dengpanyin commented on a change in pull request #1031: SAMZA-2191: Batch 
support for table APIs
URL: https://github.com/apache/samza/pull/1031#discussion_r284921485
 
 

 ##
 File path: 
samza-core/src/main/java/org/apache/samza/table/batching/BatchProcessor.java
 ##
 @@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.table.batching;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantLock;
+import org.apache.samza.table.batch.BatchPolicy;
+
+
+/**
+ * Place a sequence of operations into a {@link Batch}. When a batch is not 
allowed to accept more
+ * operations, it will handled by a {@link BatchHandler}. Meanwhile, a new 
{@link Batch} will be
+ * created and a timer will be set for it.
+ *
+ * @param  The type of the key associated with the {@link Operation}
+ * @param  The type of the value associated with the {@link Operation}
+ */
+public class BatchProcessor {
+  private final ScheduledExecutorService scheduledExecutorService;
 
 Review comment:
   Per our discussion, each java.util.timer has a thread, so we will leave as 
it is.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [samza] dengpanyin commented on a change in pull request #1031: SAMZA-2191: Batch support for table APIs

2019-05-16 Thread GitBox
dengpanyin commented on a change in pull request #1031: SAMZA-2191: Batch 
support for table APIs
URL: https://github.com/apache/samza/pull/1031#discussion_r284921292
 
 

 ##
 File path: 
samza-core/src/test/java/org/apache/samza/table/batching/TestBatchProcessor.java
 ##
 @@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.table.batching;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
+import java.util.function.Supplier;
+import org.apache.samza.table.ReadWriteTable;
+import org.apache.samza.table.batch.BatchPolicy;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Suite;
+
+import static java.lang.Thread.*;
+import static org.mockito.Mockito.*;
+
+@RunWith(Suite.class)
+@Suite.SuiteClasses({
+TestBatchProcessor.TestCreate.class,
+TestBatchProcessor.TestUpdatesAndLookup.class,
+TestBatchProcessor.TestBatchTriggered.class
+  })
+public class TestBatchProcessor {
 
 Review comment:
   Added.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [samza] dengpanyin commented on a change in pull request #1031: SAMZA-2191: Batch support for table APIs

2019-05-16 Thread GitBox
dengpanyin commented on a change in pull request #1031: SAMZA-2191: Batch 
support for table APIs
URL: https://github.com/apache/samza/pull/1031#discussion_r284921201
 
 

 ##
 File path: 
samza-core/src/main/java/org/apache/samza/table/batching/DeleteOperation.java
 ##
 @@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.table.batching;
+
+import com.google.common.base.Preconditions;
+
+
+/**
+ * Delete operation.
+ *
+ * @param  The type of the key.
+ */
+public class DeleteOperation implements Operation {
 
 Review comment:
   Per our discussion, this needs to modify the putAsyncAll/deleteAsyncAll APIs,
   will not fix at this time and postpone this optimization. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [samza] dengpanyin commented on a change in pull request #1031: SAMZA-2191: Batch support for table APIs

2019-05-16 Thread GitBox
dengpanyin commented on a change in pull request #1031: SAMZA-2191: Batch 
support for table APIs
URL: https://github.com/apache/samza/pull/1031#discussion_r284921235
 
 

 ##
 File path: 
samza-core/src/main/java/org/apache/samza/table/batching/BatchReadWriteTable.java
 ##
 @@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.table.batching;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Iterables;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import org.apache.samza.SamzaException;
+import org.apache.samza.storage.kv.Entry;
+import org.apache.samza.table.AsyncReadWriteTable;
+import org.apache.samza.table.ReadWriteTable;
+import org.apache.samza.table.batch.BatchPolicy;
+
+/**
+ * A wrapper of a {@link AsyncReadWriteTable} that supports batch operations.
+ *
+ * This batching table does not guarantee any ordering of different operation 
types within the batch.
+ * For instance, query(Q) and update(u) operations arrives in the following 
sequences, Q1, U1, Q2, U2,
+ * it does not mean the the remote data store will receive the messages in the 
same order. Instead,
+ * the operations will be grouped by type and sent via micro batches. For this 
sequence, Q1 and Q2 will
+ * be grouped to micro batch B1; U1 and U2 will be grouped to micro batch B2, 
the implementation class
+ * can decide the order of the micro batches.
+ *
+ * Synchronized table operations (get/put/delete) should be used with caution 
for the batching feature.
+ * If the table is used by a single thread, there will be at most one 
operation in the batch, and the
+ * batch will be performed when the TTL of the batch window expires. Batching 
does not make sense in this scenario.
+ *
+ * @param  The type of the key.
+ * @param  The type of the value.
+ */
+public class BatchReadWriteTable implements ReadWriteTable {
 
 Review comment:
   Fixed.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [samza] dengpanyin commented on a change in pull request #1031: SAMZA-2191: Batch support for table APIs

2019-05-16 Thread GitBox
dengpanyin commented on a change in pull request #1031: SAMZA-2191: Batch 
support for table APIs
URL: https://github.com/apache/samza/pull/1031#discussion_r284920904
 
 

 ##
 File path: 
samza-core/src/main/java/org/apache/samza/table/batching/BatchReadWriteTable.java
 ##
 @@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.table.batching;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Iterables;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import org.apache.samza.SamzaException;
+import org.apache.samza.storage.kv.Entry;
+import org.apache.samza.table.AsyncReadWriteTable;
+import org.apache.samza.table.ReadWriteTable;
+import org.apache.samza.table.batch.BatchPolicy;
+
+/**
+ * A wrapper of a {@link AsyncReadWriteTable} that supports batch operations.
+ *
+ * This batching table does not guarantee any ordering of different operation 
types within the batch.
+ * For instance, query(Q) and update(u) operations arrives in the following 
sequences, Q1, U1, Q2, U2,
+ * it does not mean the the remote data store will receive the messages in the 
same order. Instead,
+ * the operations will be grouped by type and sent via micro batches. For this 
sequence, Q1 and Q2 will
+ * be grouped to micro batch B1; U1 and U2 will be grouped to micro batch B2, 
the implementation class
+ * can decide the order of the micro batches.
+ *
+ * Synchronized table operations (get/put/delete) should be used with caution 
for the batching feature.
+ * If the table is used by a single thread, there will be at most one 
operation in the batch, and the
+ * batch will be performed when the TTL of the batch window expires. Batching 
does not make sense in this scenario.
+ *
+ * @param  The type of the key.
+ * @param  The type of the value.
+ */
+public class BatchReadWriteTable implements ReadWriteTable {
+  private final AsyncReadWriteTable table;
+  private final BatchProcessor batchProcessor;
+
+  /**
+   * @param table The target table that serves the batch operations.
+   * @param batchPolicy Batch configs.
+   */
+  public BatchReadWriteTable(AsyncReadWriteTable table, BatchPolicy 
batchPolicy,
+  ScheduledExecutorService batchTimerExecutorService) {
+Preconditions.checkNotNull(table);
+Preconditions.checkNotNull(batchPolicy);
+Preconditions.checkNotNull(batchTimerExecutorService);
+
+this.table = table;
+batchProcessor = new BatchProcessor<>(new TableBatchHandler<>(table), 
batchPolicy, batchTimerExecutorService);
+  }
+
+  @Override
+  public V get(K key) {
+Preconditions.checkNotNull(key);
+
+try {
+  return getAsync(key).get();
+} catch (Exception e) {
+  throw new SamzaException(e);
+}
+  }
+
+  @Override
+  public Map getAll(List keys) {
+Preconditions.checkNotNull(keys);
+
+try {
+  return getAllAsync(keys).get();
+} catch (Exception e) {
+  throw new SamzaException(e);
+}
+  }
+
+  @Override
+  public void put(K key, V value) {
+Preconditions.checkNotNull(key);
+
+try {
+  putAsync(key, value).get();
+} catch (Exception e) {
+  throw new SamzaException(e);
+}
+  }
+
+  @Override
+  public void putAll(List> entries) {
+Preconditions.checkNotNull(entries);
+
+try {
+  putAllAsync(entries).get();
+} catch (Exception e) {
+  throw new SamzaException(e);
+}
+  }
+
+  @Override
+  public void delete(K key) {
+Preconditions.checkNotNull(key);
+
+try {
+  deleteAsync(key).get();
+} catch (Exception e) {
+  throw new SamzaException(e);
+}
+  }
+
+  @Override
+  public void deleteAll(List keys) {
+Preconditions.checkNotNull(keys);
+
+try {
+  deleteAllAsync(keys).get();
+} catch (Exception e) {
+  throw new SamzaException(e);
+}
+  }
+
+  @Override
+  public CompletableFuture getAsync(K key) {
+Preconditions.checkNotNull(key);
+
+try {
+  

[GitHub] [samza] dengpanyin commented on a change in pull request #1031: SAMZA-2191: Batch support for table APIs

2019-05-16 Thread GitBox
dengpanyin commented on a change in pull request #1031: SAMZA-2191: Batch 
support for table APIs
URL: https://github.com/apache/samza/pull/1031#discussion_r284920777
 
 

 ##
 File path: samza-core/src/main/java/org/apache/samza/table/batching/Batch.java
 ##
 @@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.table.batching;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+import org.apache.samza.SamzaException;
+import org.apache.samza.table.batch.BatchPolicy;
+
+/**
+ * Manages a sequence of {@link Operation}s, which will be performed as a 
batch.
+ * A batch can be configured with a {@code batchMaxSize} and/or {@code 
batchMaxDelay}.
+ * When the number of operations in the batch exceeds the {@code batchMaxSize}
+ * or the time window exceeds the {@code batchMaxDelay}, the batch will be 
performed.
+ *
+ * @param  The type of the key associated with the {@link Operation}
+ * @param  The type of the value associated with the {@link Operation}
+ */
+abstract class Batch {
+  protected final int batchMaxSize;
+  protected final int batchMaxDelay;
+  protected final BatchHandler batchHandler;
 
 Review comment:
   Fixed.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [samza] dengpanyin commented on a change in pull request #1031: SAMZA-2191: Batch support for table APIs

2019-05-16 Thread GitBox
dengpanyin commented on a change in pull request #1031: SAMZA-2191: Batch 
support for table APIs
URL: https://github.com/apache/samza/pull/1031#discussion_r284920864
 
 

 ##
 File path: samza-core/src/main/java/org/apache/samza/table/batching/Batch.java
 ##
 @@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.table.batching;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+import org.apache.samza.SamzaException;
+import org.apache.samza.table.batch.BatchPolicy;
+
+/**
+ * Manages a sequence of {@link Operation}s, which will be performed as a 
batch.
+ * A batch can be configured with a {@code batchMaxSize} and/or {@code 
batchMaxDelay}.
+ * When the number of operations in the batch exceeds the {@code batchMaxSize}
+ * or the time window exceeds the {@code batchMaxDelay}, the batch will be 
performed.
+ *
+ * @param  The type of the key associated with the {@link Operation}
+ * @param  The type of the value associated with the {@link Operation}
+ */
+abstract class Batch {
+  protected final int batchMaxSize;
+  protected final int batchMaxDelay;
+  protected final BatchHandler batchHandler;
+  protected final CompletableFuture completableFuture;
+  protected boolean closed = false;
+
+  /**
+   * @param batchHandler Defines how the batch will be performed.
+   * @param batchPolicy Defines the batch configurations.
+   */
+  Batch(BatchHandler batchHandler, BatchPolicy batchPolicy) {
+this.batchHandler = batchHandler;
+// The max number of {@link Operation}s that the batch can hold.
+this.batchMaxSize = batchPolicy.getBatchMaxSize();
+// The max time that the batch can last before being performed.
+this.batchMaxDelay = batchPolicy.getBatchMaxDelay();
+completableFuture = new CompletableFuture<>();
+  }
+
+  /**
+   * Add an operation to the batch.
+   *
+   * @param operation The operation to be added.
+   */
+  abstract void addOperation(Operation operation);
+
+  /**
+   * Close the bach so that it will not accept more operations.
+   */
+  void close() {
+closed = true;
+  }
+
+  /**
+   * @return Whether the bach can accept more operations.
+   */
+  boolean isClosed() {
+return closed;
+  }
+
+  abstract Collection> getQueryOperations();
+  abstract Collection> getUpdateOperations();
+
+  private List> getPutOperations() {
+return getUpdateOperations().stream().filter(op -> op instanceof 
PutOperation)
+.collect(Collectors.toList());
+  }
+
+  private List> getDeleteOperations() {
+return getUpdateOperations().stream().filter(op -> op instanceof 
DeleteOperation)
+.collect(Collectors.toList());
+  }
+
+  /**
+   * Perform batch operations.
+   */
+  CompletableFuture process() {
+return CompletableFuture.allOf(
+batchHandler.handleBatchPut(getPutOperations()),
+batchHandler.handleBatchDelete(getDeleteOperations()),
+batchHandler.handleBatchGet(getQueryOperations()))
+.whenComplete((val, throwable) -> {
+if (throwable != null) {
+  completableFuture.completeExceptionally(throwable);
+  throw new SamzaException("Batch failed", throwable);
 
 Review comment:
   Fixed by removing the throw here


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [samza] dengpanyin commented on a change in pull request #1031: SAMZA-2191: Batch support for table APIs

2019-05-16 Thread GitBox
dengpanyin commented on a change in pull request #1031: SAMZA-2191: Batch 
support for table APIs
URL: https://github.com/apache/samza/pull/1031#discussion_r284920709
 
 

 ##
 File path: 
samza-core/src/main/java/org/apache/samza/table/batching/BatchHandler.java
 ##
 @@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.table.batching;
+
+import java.util.Collection;
+import java.util.concurrent.CompletableFuture;
+
+
+/**
+ * Define how the batch operations will be handled.
+ *
+ * @param  The key type of the operations
+ * @param  The value type of the operations.
+ */
+public interface BatchHandler {
 
 Review comment:
   The BatchProcessor create a timer for each batch, when the batch timer 
fires, it should know how to deal with the batch. 
   This BatchHandler class is used to tell the processor how to handle the 
batch.
   For table batching, we have a TableBatchHandler.
   
   For other non-table use cases, we can implement a new somethingBatchHandler 
and pass to the BatchProcessor.
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [samza] dengpanyin commented on a change in pull request #1031: SAMZA-2191: Batch support for table APIs

2019-05-16 Thread GitBox
dengpanyin commented on a change in pull request #1031: SAMZA-2191: Batch 
support for table APIs
URL: https://github.com/apache/samza/pull/1031#discussion_r284920111
 
 

 ##
 File path: 
samza-core/src/main/java/org/apache/samza/table/batching/BatchProcessor.java
 ##
 @@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.table.batching;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantLock;
+import org.apache.samza.table.batch.BatchPolicy;
+
+
+/**
+ * Place a sequence of operations into a {@link Batch}. When a batch is not 
allowed to accept more
+ * operations, it will handled by a {@link BatchHandler}. Meanwhile, a new 
{@link Batch} will be
+ * created and a timer will be set for it.
+ *
+ * @param  The type of the key associated with the {@link Operation}
+ * @param  The type of the value associated with the {@link Operation}
+ */
+public class BatchProcessor {
+  private final ScheduledExecutorService scheduledExecutorService;
+  private final ReentrantLock lock = new ReentrantLock();
+  private final BatchPolicy batchPolicy;
+  private final BatchHandler batchHandler;
+  private Batch batch;
+  private ScheduledFuture scheduledFuture;
+
+  /**
+   * @param batchHandler Defines how each batch will be processed.
+   * @param batchPolicy The batch configurations to be used.
+   * @param scheduledExecutorService A scheduled executor service to set 
timers for the managed batches.
+   */
+  public BatchProcessor(BatchHandler batchHandler, BatchPolicy 
batchPolicy,
+  ScheduledExecutorService scheduledExecutorService) {
+Preconditions.checkNotNull(batchHandler);
+Preconditions.checkNotNull(batchPolicy);
+Preconditions.checkNotNull(scheduledExecutorService);
+
+this.batchHandler = batchHandler;
+this.scheduledExecutorService = scheduledExecutorService;
+this.batchPolicy = batchPolicy;
+batch = BatchFactory.getBatch(batchHandler, batchPolicy);
+setBatchTimer(batch);
+  }
+
+  private CompletableFuture addOperation(Operation operation) {
+if (batch.isEmpty()) {
+  setBatchTimer(batch);
+} else if (batch.isClosed()) {
+  processBatch();
+}
+batch.addOperation(operation);
+final CompletableFuture res = batch.getCompletableFuture();
 
 Review comment:
   YES. processQueryOperation will call this function "addOperation()", ignores 
the returned value and use the CompletableFuture from the QueryOperation.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [samza] dengpanyin commented on a change in pull request #1031: SAMZA-2191: Batch support for table APIs

2019-05-16 Thread GitBox
dengpanyin commented on a change in pull request #1031: SAMZA-2191: Batch 
support for table APIs
URL: https://github.com/apache/samza/pull/1031#discussion_r284919684
 
 

 ##
 File path: 
samza-core/src/main/java/org/apache/samza/table/batching/BatchProcessor.java
 ##
 @@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.table.batching;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantLock;
+import org.apache.samza.table.batch.BatchPolicy;
+
+
+/**
+ * Place a sequence of operations into a {@link Batch}. When a batch is not 
allowed to accept more
+ * operations, it will handled by a {@link BatchHandler}. Meanwhile, a new 
{@link Batch} will be
+ * created and a timer will be set for it.
+ *
+ * @param  The type of the key associated with the {@link Operation}
+ * @param  The type of the value associated with the {@link Operation}
+ */
+public class BatchProcessor {
+  private final ScheduledExecutorService scheduledExecutorService;
+  private final ReentrantLock lock = new ReentrantLock();
+  private final BatchPolicy batchPolicy;
+  private final BatchHandler batchHandler;
+  private Batch batch;
+  private ScheduledFuture scheduledFuture;
+
+  /**
+   * @param batchHandler Defines how each batch will be processed.
+   * @param batchPolicy The batch configurations to be used.
+   * @param scheduledExecutorService A scheduled executor service to set 
timers for the managed batches.
+   */
+  public BatchProcessor(BatchHandler batchHandler, BatchPolicy 
batchPolicy,
+  ScheduledExecutorService scheduledExecutorService) {
+Preconditions.checkNotNull(batchHandler);
+Preconditions.checkNotNull(batchPolicy);
+Preconditions.checkNotNull(scheduledExecutorService);
+
+this.batchHandler = batchHandler;
+this.scheduledExecutorService = scheduledExecutorService;
+this.batchPolicy = batchPolicy;
+batch = BatchFactory.getBatch(batchHandler, batchPolicy);
+setBatchTimer(batch);
+  }
+
+  private CompletableFuture addOperation(Operation operation) {
+if (batch.isEmpty()) {
+  setBatchTimer(batch);
+} else if (batch.isClosed()) {
+  processBatch();
+}
+batch.addOperation(operation);
+final CompletableFuture res = batch.getCompletableFuture();
+
+if (batch.isClosed()) {
 
 Review comment:
   Add this check to sent the batch to remote store immediately after the 
previous addOperation.
   Otherwise, the processor needs to wait until next addOperation to trigger 
the already closed batch.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [samza] dengpanyin commented on a change in pull request #1031: SAMZA-2191: Batch support for table APIs

2019-05-16 Thread GitBox
dengpanyin commented on a change in pull request #1031: SAMZA-2191: Batch 
support for table APIs
URL: https://github.com/apache/samza/pull/1031#discussion_r284918299
 
 

 ##
 File path: 
samza-core/src/main/java/org/apache/samza/table/batching/PutOperation.java
 ##
 @@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.table.batching;
+
+import com.google.common.base.Preconditions;
+
+
+/**
+ * Put operation.
+ *
+ * @param  The type of the key.
+ * @param  The type of the value
+ */
+public class PutOperation implements Operation {
+  final private K key;
+  final private V val;
+
+  public PutOperation(K key, V val) {
+Preconditions.checkNotNull(key);
+
+this.key = key;
+this.val = val;
+  }
+
+  /**
+   * @return The key to be put to the table.
+   */
+  @Override
+  public K getKey() {
+return key;
+  }
+
+  /**
+   * @return The value to be put to the table.
+   */
+  @Override
+  public V getValue() {
+return val;
+  }
+
+  @Override
+  public boolean equals(Object other) {
+if (this == other) {
+  return true;
+}
+if (other == null || getClass() != other.getClass()) {
+  return false;
+}
+
+final PutOperation otherOp = (PutOperation) other;
+
+// If value for this object is null, should be null for other object as 
well.
+if ((getValue() == null) ^ (otherOp.getValue() == null)) {
 
 Review comment:
   ^ is used as a exclusively OR.
   in order to skip the body for the "if()", both condition should be evaluated 
to be true or false,
   otherwise, will return false from the body of the "if()" statement. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [samza] dengpanyin commented on a change in pull request #1031: SAMZA-2191: Batch support for table APIs

2019-05-16 Thread GitBox
dengpanyin commented on a change in pull request #1031: SAMZA-2191: Batch 
support for table APIs
URL: https://github.com/apache/samza/pull/1031#discussion_r284918688
 
 

 ##
 File path: 
samza-core/src/main/java/org/apache/samza/table/remote/RemoteTableProvider.java
 ##
 @@ -115,10 +117,22 @@ public ReadWriteTable getTable() {
 }));
 }
 
+BatchPolicy batchPolicy = deserializeObject(tableConfig, 
RemoteTableDescriptor.BATCH_POLICY);
 
 Review comment:
   good catch. thanks


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [samza] dengpanyin commented on a change in pull request #1031: SAMZA-2191: Batch support for table APIs

2019-05-16 Thread GitBox
dengpanyin commented on a change in pull request #1031: SAMZA-2191: Batch 
support for table APIs
URL: https://github.com/apache/samza/pull/1031#discussion_r284917559
 
 

 ##
 File path: 
samza-api/src/main/java/org/apache/samza/table/batch/BatchPolicy.java
 ##
 @@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.table.batch;
 
 Review comment:
   Fixed.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [samza] dengpanyin commented on a change in pull request #1031: SAMZA-2191: Batch support for table APIs

2019-05-16 Thread GitBox
dengpanyin commented on a change in pull request #1031: SAMZA-2191: Batch 
support for table APIs
URL: https://github.com/apache/samza/pull/1031#discussion_r284917528
 
 

 ##
 File path: 
samza-api/src/main/java/org/apache/samza/table/batch/BatchPolicy.java
 ##
 @@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.table.batch;
+
+import java.io.Serializable;
+import org.apache.samza.table.remote.TablePart;
+
+
+public class BatchPolicy implements TablePart, Serializable {
+  public enum BatchType {
+COMPACT_BATCH,
+COMPLETE_BATCH
+  }
+
+  private int batchMaxSize = 100;
+  private int batchMaxDelay = 100;
+  private BatchType batchType = BatchType.COMPACT_BATCH;
+
+  public BatchPolicy withBatchMaxSize(int batchMaxSize) {
+this.batchMaxSize = batchMaxSize;
+return this;
+  }
+
+  public BatchPolicy withBatchMaxDelay(int batchMaxDelay) {
 
 Review comment:
   Fixed


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [samza] dengpanyin commented on a change in pull request #1031: SAMZA-2191: Batch support for table APIs

2019-05-16 Thread GitBox
dengpanyin commented on a change in pull request #1031: SAMZA-2191: Batch 
support for table APIs
URL: https://github.com/apache/samza/pull/1031#discussion_r284917428
 
 

 ##
 File path: 
samza-api/src/main/java/org/apache/samza/table/batch/BatchPolicy.java
 ##
 @@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.table.batch;
+
+import java.io.Serializable;
+import org.apache.samza.table.remote.TablePart;
+
+
+public class BatchPolicy implements TablePart, Serializable {
 
 Review comment:
   For the new change, this class is deleted and use BatchProvider instead for 
config.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [samza] dengpanyin commented on a change in pull request #1031: SAMZA-2191: Batch support for table APIs

2019-05-16 Thread GitBox
dengpanyin commented on a change in pull request #1031: SAMZA-2191: Batch 
support for table APIs
URL: https://github.com/apache/samza/pull/1031#discussion_r284917500
 
 

 ##
 File path: 
samza-api/src/main/java/org/apache/samza/table/batch/BatchPolicy.java
 ##
 @@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.table.batch;
+
+import java.io.Serializable;
+import org.apache.samza.table.remote.TablePart;
+
+
+public class BatchPolicy implements TablePart, Serializable {
+  public enum BatchType {
 
 Review comment:
   Good point.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [samza] weisong44 commented on a change in pull request #1034: SAMZA-2185: Add ability to expose remote store specific features in remote table

2019-05-16 Thread GitBox
weisong44 commented on a change in pull request #1034: SAMZA-2185: Add ability 
to expose remote store specific features in remote table
URL: https://github.com/apache/samza/pull/1034#discussion_r284916421
 
 

 ##
 File path: 
samza-core/src/main/java/org/apache/samza/table/remote/AsyncRemoteTable.java
 ##
 @@ -46,45 +48,66 @@ public AsyncRemoteTable(TableReadFunction readFn, 
TableWriteFunction
   }
 
   @Override
-  public CompletableFuture getAsync(K key) {
-return readFn.getAsync(key);
+  public CompletableFuture getAsync(K key, Object ... args) {
 
 Review comment:
   The check is performed in RemoteTable, delegation tables don't need to do 
again.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [samza] srinipunuru opened a new pull request #1036: Adding a SQL test case with switch case

2019-05-16 Thread GitBox
srinipunuru opened a new pull request #1036: Adding a SQL test case with switch 
case
URL: https://github.com/apache/samza/pull/1036
 
 
   Adding a Samza SQL test case with switch cast statement. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [samza] shanthoosh commented on a change in pull request #1027: SAMZA-2046: Startpoint fan out implementation

2019-05-16 Thread GitBox
shanthoosh commented on a change in pull request #1027: SAMZA-2046: Startpoint 
fan out implementation
URL: https://github.com/apache/samza/pull/1027#discussion_r284894176
 
 

 ##
 File path: 
samza-core/src/main/java/org/apache/samza/startpoint/StartpointManager.java
 ##
 @@ -173,71 +207,148 @@ public void deleteStartpoint(SystemStreamPartition ssp, 
TaskName taskName) {
 Preconditions.checkState(!stopped, "Underlying metadata store not 
available");
 Preconditions.checkNotNull(ssp, "SystemStreamPartition cannot be null");
 
-metadataStore.delete(toStoreKey(ssp, taskName));
+readWriteStore.delete(toReadWriteStoreKey(ssp, taskName));
   }
 
   /**
-   * For {@link Startpoint}s keyed only by {@link SystemStreamPartition}, this 
method re-maps the Startpoints from
-   * SystemStreamPartition to SystemStreamPartition+{@link TaskName} for all 
tasks provided by the {@link JobModel}
+   * The Startpoints that are written to with {@link 
#writeStartpoint(SystemStreamPartition, Startpoint)} and with
+   * {@link #writeStartpoint(SystemStreamPartition, TaskName, Startpoint)} are 
moved from a "read-write" namespace
+   * to a "fan out" namespace.
* This method is not atomic or thread-safe. The intent is for the Samza 
Processor's coordinator to use this
* method to assign the Startpoints to the appropriate tasks.
-   * @param jobModel The {@link JobModel} is used to determine which {@link 
TaskName} each {@link SystemStreamPartition} maps to.
-   * @return The list of {@link SystemStreamPartition}s that were fanned out 
to SystemStreamPartition+TaskName.
+   * @param taskToSSPs Determines which {@link TaskName} each {@link 
SystemStreamPartition} maps to.
+   * @return The set of active {@link TaskName}s that were fanned out to.
*/
-  public Set fanOutStartpointsToTasks(JobModel 
jobModel) {
+  public Map> 
fanOut(Map> taskToSSPs) throws IOException 
{
 Preconditions.checkState(!stopped, "Underlying metadata store not 
available");
-Preconditions.checkNotNull(jobModel, "JobModel cannot be null");
-
-HashSet sspsToDelete = new HashSet<>();
-
-// Inspect the job model for TaskName-to-SSPs mapping and re-map 
startpoints from SSP-only keys to SSP+TaskName keys.
-for (ContainerModel containerModel: jobModel.getContainers().values()) {
-  for (TaskModel taskModel : containerModel.getTasks().values()) {
-TaskName taskName = taskModel.getTaskName();
-for (SystemStreamPartition ssp : 
taskModel.getSystemStreamPartitions()) {
-  Startpoint startpoint = readStartpoint(ssp); // Read SSP-only key
-  if (startpoint == null) {
-LOG.debug("No Startpoint for SSP: {} in task: {}", ssp, taskName);
-continue;
-  }
-
-  LOG.info("Grouping Startpoint keyed on SSP: {} to tasks determined 
by the job model.", ssp);
-  Startpoint startpointForTask = readStartpoint(ssp, taskName);
-  if (startpointForTask == null || 
startpointForTask.getCreationTimestamp() < startpoint.getCreationTimestamp()) {
-writeStartpoint(ssp, taskName, startpoint);
-sspsToDelete.add(ssp); // Mark for deletion
-LOG.info("Startpoint for SSP: {} remapped with task: {}.", ssp, 
taskName);
-  } else {
-LOG.info("Startpoint for SSP: {} and task: {} already exists and 
will not be overwritten.", ssp, taskName);
-  }
+Preconditions.checkArgument(MapUtils.isNotEmpty(taskToSSPs), "taskToSSPs 
cannot be null or empty");
 
+// construct fan out with the existing readWriteStore entries and mark the 
entries for deletion after fan out
+Instant now = Instant.now();
+HashMultimap deleteKeys = 
HashMultimap.create();
+HashMap fanOuts = new HashMap<>();
+for (TaskName taskName : taskToSSPs.keySet()) {
+  Set ssps = taskToSSPs.get(taskName);
+  if (CollectionUtils.isEmpty(ssps)) {
+LOG.warn("No SSPs are mapped to taskName: {}", taskName.getTaskName());
+continue;
+  }
+  for (SystemStreamPartition ssp : ssps) {
+Optional startpoint = readStartpoint(ssp); // Read 
SSP-only key
+startpoint.ifPresent(sp -> deleteKeys.put(ssp, null));
+
+Optional startpointForTask = readStartpoint(ssp, 
taskName); // Read SSP+taskName key
+startpointForTask.ifPresent(sp -> deleteKeys.put(ssp, taskName));
+
+Optional startpointWithPrecedence = 
resolveStartpointPrecendence(startpoint, startpointForTask);
+if (!startpointWithPrecedence.isPresent()) {
+  continue;
 }
+
+fanOuts.putIfAbsent(taskName, new StartpointFanOutPerTask(now));
+fanOuts.get(taskName).getFanOuts().put(ssp, 
startpointWithPrecedence.get());
   }
 }
 
-// Delete SSP-only keys
-sspsToDelete.forEach(ssp -> {
-deleteStartpoint(ssp);
-LOG.info("All Startpoints for SSP: {} have been grouped to the 
appropriate tasks and the SSP was deleted.");
-  });
+

[GitHub] [samza] dnishimura commented on a change in pull request #1027: SAMZA-2046: Startpoint fan out implementation

2019-05-16 Thread GitBox
dnishimura commented on a change in pull request #1027: SAMZA-2046: Startpoint 
fan out implementation
URL: https://github.com/apache/samza/pull/1027#discussion_r284887289
 
 

 ##
 File path: 
samza-core/src/main/java/org/apache/samza/startpoint/StartpointManager.java
 ##
 @@ -20,87 +20,115 @@
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
+import java.io.IOException;
 import java.time.Duration;
 import java.time.Instant;
-import java.util.HashSet;
-import java.util.Objects;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.Set;
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.commons.collections4.MapUtils;
+import org.apache.commons.lang3.ArrayUtils;
 import org.apache.samza.SamzaException;
-import org.apache.samza.config.Config;
 import org.apache.samza.container.TaskName;
 import 
org.apache.samza.coordinator.metadatastore.NamespaceAwareCoordinatorStreamStore;
-import org.apache.samza.job.model.ContainerModel;
-import org.apache.samza.job.model.JobModel;
-import org.apache.samza.job.model.TaskModel;
 import org.apache.samza.metadatastore.MetadataStore;
-import org.apache.samza.metadatastore.MetadataStoreFactory;
-import org.apache.samza.metrics.MetricsRegistry;
-import org.apache.samza.serializers.JsonSerdeV2;
 import org.apache.samza.system.SystemStreamPartition;
+import org.codehaus.jackson.map.ObjectMapper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 
 /**
- * The StartpointManager reads and writes {@link Startpoint} to the {@link 
MetadataStore} defined by
- * the configuration task.startpoint.metadata.store.factory.
- *
- * Startpoints are keyed in the MetadataStore by two different formats:
- * 1) Only by {@link SystemStreamPartition}
- * 2) A combination of {@link SystemStreamPartition} and {@link TaskName}
+ * The StartpointManager reads and writes {@link Startpoint} to the provided 
{@link MetadataStore}
  *
  * The intention for the StartpointManager is to maintain a strong contract 
between the caller
  * and how Startpoints are stored in the underlying MetadataStore.
+ *
+ * Startpoints are written in the MetadataStore using keys of two different 
formats:
+ * 1) {@link SystemStreamPartition} only
+ * 2) A combination of {@link SystemStreamPartition} and {@link TaskName}
+ *
+ * Startpoints are then fanned out to a fan out namespace in the MetadataStore 
by the
+ * {@link org.apache.samza.clustermanager.ClusterBasedJobCoordinator} or the 
standalone
+ * {@link org.apache.samza.coordinator.JobCoordinator} upon startup and the
+ * {@link org.apache.samza.checkpoint.OffsetManager} gets the fan outs to set 
the starting offsets per task and per
+ * {@link SystemStreamPartition}. The fan outs are deleted once the offsets 
are committed to the checkpoint.
+ *
+ * The read, write and delete methods are intended for external callers.
+ * The fan out methods are intended to be used within a job coordinator.
  */
 public class StartpointManager {
-  private static final Logger LOG = 
LoggerFactory.getLogger(StartpointManager.class);
-  public static final String NAMESPACE = "samza-startpoint-v1";
+  public static final Integer VERSION = 1;
+  public static final String NAMESPACE = "samza-startpoint-v" + VERSION;
 
   static final Duration DEFAULT_EXPIRATION_DURATION = Duration.ofHours(12);
 
-  private final MetadataStore metadataStore;
-  private final StartpointSerde startpointSerde = new StartpointSerde();
-
-  private boolean stopped = false;
+  private static final Logger LOG = 
LoggerFactory.getLogger(StartpointManager.class);
+  private static final String NAMESPACE_FAN_OUT = NAMESPACE + "-fan-out";
 
-  /**
-   *  Constructs a {@link StartpointManager} instance by instantiating a new 
metadata store connection.
-   *  This is primarily used for testing.
-   */
-  @VisibleForTesting
-  StartpointManager(MetadataStoreFactory metadataStoreFactory, Config config, 
MetricsRegistry metricsRegistry) {
-Preconditions.checkNotNull(metadataStoreFactory, "MetadataStoreFactory 
cannot be null");
-Preconditions.checkNotNull(config, "Config cannot be null");
-Preconditions.checkNotNull(metricsRegistry, "MetricsRegistry cannot be 
null");
+  private final MetadataStore metadataStore;
+  private final NamespaceAwareCoordinatorStreamStore fanOutStore;
+  private final NamespaceAwareCoordinatorStreamStore readWriteStore;
+  private final ObjectMapper objectMapper = 
StartpointObjectMapper.getObjectMapper();
 
-this.metadataStore = metadataStoreFactory.getMetadataStore(NAMESPACE, 
config, metricsRegistry);
-LOG.info("StartpointManager created with metadata store: {}", 
metadataStore.getClass().getCanonicalName());
-this.metadataStore.init();
-  }
+  private boolean stopped = true;
 
   /**
*  Builds the 

[GitHub] [samza] dnishimura commented on a change in pull request #1027: SAMZA-2046: Startpoint fan out implementation

2019-05-16 Thread GitBox
dnishimura commented on a change in pull request #1027: SAMZA-2046: Startpoint 
fan out implementation
URL: https://github.com/apache/samza/pull/1027#discussion_r284874627
 
 

 ##
 File path: 
samza-core/src/main/java/org/apache/samza/startpoint/StartpointManager.java
 ##
 @@ -173,71 +207,148 @@ public void deleteStartpoint(SystemStreamPartition ssp, 
TaskName taskName) {
 Preconditions.checkState(!stopped, "Underlying metadata store not 
available");
 Preconditions.checkNotNull(ssp, "SystemStreamPartition cannot be null");
 
-metadataStore.delete(toStoreKey(ssp, taskName));
+readWriteStore.delete(toReadWriteStoreKey(ssp, taskName));
   }
 
   /**
-   * For {@link Startpoint}s keyed only by {@link SystemStreamPartition}, this 
method re-maps the Startpoints from
-   * SystemStreamPartition to SystemStreamPartition+{@link TaskName} for all 
tasks provided by the {@link JobModel}
+   * The Startpoints that are written to with {@link 
#writeStartpoint(SystemStreamPartition, Startpoint)} and with
+   * {@link #writeStartpoint(SystemStreamPartition, TaskName, Startpoint)} are 
moved from a "read-write" namespace
+   * to a "fan out" namespace.
* This method is not atomic or thread-safe. The intent is for the Samza 
Processor's coordinator to use this
* method to assign the Startpoints to the appropriate tasks.
-   * @param jobModel The {@link JobModel} is used to determine which {@link 
TaskName} each {@link SystemStreamPartition} maps to.
-   * @return The list of {@link SystemStreamPartition}s that were fanned out 
to SystemStreamPartition+TaskName.
+   * @param taskToSSPs Determines which {@link TaskName} each {@link 
SystemStreamPartition} maps to.
+   * @return The set of active {@link TaskName}s that were fanned out to.
*/
-  public Set fanOutStartpointsToTasks(JobModel 
jobModel) {
+  public Map> 
fanOut(Map> taskToSSPs) throws IOException 
{
 Preconditions.checkState(!stopped, "Underlying metadata store not 
available");
-Preconditions.checkNotNull(jobModel, "JobModel cannot be null");
-
-HashSet sspsToDelete = new HashSet<>();
-
-// Inspect the job model for TaskName-to-SSPs mapping and re-map 
startpoints from SSP-only keys to SSP+TaskName keys.
-for (ContainerModel containerModel: jobModel.getContainers().values()) {
-  for (TaskModel taskModel : containerModel.getTasks().values()) {
-TaskName taskName = taskModel.getTaskName();
-for (SystemStreamPartition ssp : 
taskModel.getSystemStreamPartitions()) {
-  Startpoint startpoint = readStartpoint(ssp); // Read SSP-only key
-  if (startpoint == null) {
-LOG.debug("No Startpoint for SSP: {} in task: {}", ssp, taskName);
-continue;
-  }
-
-  LOG.info("Grouping Startpoint keyed on SSP: {} to tasks determined 
by the job model.", ssp);
-  Startpoint startpointForTask = readStartpoint(ssp, taskName);
-  if (startpointForTask == null || 
startpointForTask.getCreationTimestamp() < startpoint.getCreationTimestamp()) {
-writeStartpoint(ssp, taskName, startpoint);
-sspsToDelete.add(ssp); // Mark for deletion
-LOG.info("Startpoint for SSP: {} remapped with task: {}.", ssp, 
taskName);
-  } else {
-LOG.info("Startpoint for SSP: {} and task: {} already exists and 
will not be overwritten.", ssp, taskName);
-  }
+Preconditions.checkArgument(MapUtils.isNotEmpty(taskToSSPs), "taskToSSPs 
cannot be null or empty");
 
+// construct fan out with the existing readWriteStore entries and mark the 
entries for deletion after fan out
+Instant now = Instant.now();
+HashMultimap deleteKeys = 
HashMultimap.create();
+HashMap fanOuts = new HashMap<>();
+for (TaskName taskName : taskToSSPs.keySet()) {
+  Set ssps = taskToSSPs.get(taskName);
+  if (CollectionUtils.isEmpty(ssps)) {
+LOG.warn("No SSPs are mapped to taskName: {}", taskName.getTaskName());
+continue;
+  }
+  for (SystemStreamPartition ssp : ssps) {
+Optional startpoint = readStartpoint(ssp); // Read 
SSP-only key
+startpoint.ifPresent(sp -> deleteKeys.put(ssp, null));
+
+Optional startpointForTask = readStartpoint(ssp, 
taskName); // Read SSP+taskName key
+startpointForTask.ifPresent(sp -> deleteKeys.put(ssp, taskName));
+
+Optional startpointWithPrecedence = 
resolveStartpointPrecendence(startpoint, startpointForTask);
+if (!startpointWithPrecedence.isPresent()) {
+  continue;
 }
+
+fanOuts.putIfAbsent(taskName, new StartpointFanOutPerTask(now));
+fanOuts.get(taskName).getFanOuts().put(ssp, 
startpointWithPrecedence.get());
   }
 }
 
-// Delete SSP-only keys
-sspsToDelete.forEach(ssp -> {
-deleteStartpoint(ssp);
-LOG.info("All Startpoints for SSP: {} have been grouped to the 
appropriate tasks and the SSP was deleted.");
-  });
+

[GitHub] [samza] shanthoosh commented on a change in pull request #1027: SAMZA-2046: Startpoint fan out implementation

2019-05-16 Thread GitBox
shanthoosh commented on a change in pull request #1027: SAMZA-2046: Startpoint 
fan out implementation
URL: https://github.com/apache/samza/pull/1027#discussion_r284853798
 
 

 ##
 File path: 
samza-core/src/main/java/org/apache/samza/startpoint/StartpointManager.java
 ##
 @@ -173,71 +207,148 @@ public void deleteStartpoint(SystemStreamPartition ssp, 
TaskName taskName) {
 Preconditions.checkState(!stopped, "Underlying metadata store not 
available");
 Preconditions.checkNotNull(ssp, "SystemStreamPartition cannot be null");
 
-metadataStore.delete(toStoreKey(ssp, taskName));
+readWriteStore.delete(toReadWriteStoreKey(ssp, taskName));
   }
 
   /**
-   * For {@link Startpoint}s keyed only by {@link SystemStreamPartition}, this 
method re-maps the Startpoints from
-   * SystemStreamPartition to SystemStreamPartition+{@link TaskName} for all 
tasks provided by the {@link JobModel}
+   * The Startpoints that are written to with {@link 
#writeStartpoint(SystemStreamPartition, Startpoint)} and with
+   * {@link #writeStartpoint(SystemStreamPartition, TaskName, Startpoint)} are 
moved from a "read-write" namespace
+   * to a "fan out" namespace.
* This method is not atomic or thread-safe. The intent is for the Samza 
Processor's coordinator to use this
* method to assign the Startpoints to the appropriate tasks.
-   * @param jobModel The {@link JobModel} is used to determine which {@link 
TaskName} each {@link SystemStreamPartition} maps to.
-   * @return The list of {@link SystemStreamPartition}s that were fanned out 
to SystemStreamPartition+TaskName.
+   * @param taskToSSPs Determines which {@link TaskName} each {@link 
SystemStreamPartition} maps to.
+   * @return The set of active {@link TaskName}s that were fanned out to.
*/
-  public Set fanOutStartpointsToTasks(JobModel 
jobModel) {
+  public Map> 
fanOut(Map> taskToSSPs) throws IOException 
{
 Preconditions.checkState(!stopped, "Underlying metadata store not 
available");
-Preconditions.checkNotNull(jobModel, "JobModel cannot be null");
-
-HashSet sspsToDelete = new HashSet<>();
-
-// Inspect the job model for TaskName-to-SSPs mapping and re-map 
startpoints from SSP-only keys to SSP+TaskName keys.
-for (ContainerModel containerModel: jobModel.getContainers().values()) {
-  for (TaskModel taskModel : containerModel.getTasks().values()) {
-TaskName taskName = taskModel.getTaskName();
-for (SystemStreamPartition ssp : 
taskModel.getSystemStreamPartitions()) {
-  Startpoint startpoint = readStartpoint(ssp); // Read SSP-only key
-  if (startpoint == null) {
-LOG.debug("No Startpoint for SSP: {} in task: {}", ssp, taskName);
-continue;
-  }
-
-  LOG.info("Grouping Startpoint keyed on SSP: {} to tasks determined 
by the job model.", ssp);
-  Startpoint startpointForTask = readStartpoint(ssp, taskName);
-  if (startpointForTask == null || 
startpointForTask.getCreationTimestamp() < startpoint.getCreationTimestamp()) {
-writeStartpoint(ssp, taskName, startpoint);
-sspsToDelete.add(ssp); // Mark for deletion
-LOG.info("Startpoint for SSP: {} remapped with task: {}.", ssp, 
taskName);
-  } else {
-LOG.info("Startpoint for SSP: {} and task: {} already exists and 
will not be overwritten.", ssp, taskName);
-  }
+Preconditions.checkArgument(MapUtils.isNotEmpty(taskToSSPs), "taskToSSPs 
cannot be null or empty");
 
+// construct fan out with the existing readWriteStore entries and mark the 
entries for deletion after fan out
+Instant now = Instant.now();
+HashMultimap deleteKeys = 
HashMultimap.create();
+HashMap fanOuts = new HashMap<>();
+for (TaskName taskName : taskToSSPs.keySet()) {
+  Set ssps = taskToSSPs.get(taskName);
+  if (CollectionUtils.isEmpty(ssps)) {
+LOG.warn("No SSPs are mapped to taskName: {}", taskName.getTaskName());
+continue;
+  }
+  for (SystemStreamPartition ssp : ssps) {
+Optional startpoint = readStartpoint(ssp); // Read 
SSP-only key
+startpoint.ifPresent(sp -> deleteKeys.put(ssp, null));
+
+Optional startpointForTask = readStartpoint(ssp, 
taskName); // Read SSP+taskName key
+startpointForTask.ifPresent(sp -> deleteKeys.put(ssp, taskName));
+
+Optional startpointWithPrecedence = 
resolveStartpointPrecendence(startpoint, startpointForTask);
+if (!startpointWithPrecedence.isPresent()) {
+  continue;
 }
+
+fanOuts.putIfAbsent(taskName, new StartpointFanOutPerTask(now));
+fanOuts.get(taskName).getFanOuts().put(ssp, 
startpointWithPrecedence.get());
   }
 }
 
-// Delete SSP-only keys
-sspsToDelete.forEach(ssp -> {
-deleteStartpoint(ssp);
-LOG.info("All Startpoints for SSP: {} have been grouped to the 
appropriate tasks and the SSP was deleted.");
-  });
+

[GitHub] [samza] weisong44 commented on a change in pull request #1034: SAMZA-2185: Add ability to expose remote store specific features in remote table

2019-05-16 Thread GitBox
weisong44 commented on a change in pull request #1034: SAMZA-2185: Add ability 
to expose remote store specific features in remote table
URL: https://github.com/apache/samza/pull/1034#discussion_r284841742
 
 

 ##
 File path: 
samza-core/src/main/java/org/apache/samza/table/remote/RemoteTable.java
 ##
 @@ -241,64 +249,76 @@ public void putAll(List> entries) {
 
 CompletableFuture deleteFuture = deleteKeys.isEmpty()
 ? CompletableFuture.completedFuture(null)
-: deleteAllAsync(deleteKeys);
+: deleteAllAsync(deleteKeys, args);
 
 // Return the combined future
 return CompletableFuture.allOf(
 deleteFuture,
-instrument(() -> asyncTable.putAllAsync(putRecords), 
metrics.numPutAlls, metrics.putAllNs))
+instrument(() -> asyncTable.putAllAsync(putRecords, args), 
metrics.numPutAlls, metrics.putAllNs))
 .exceptionally(e -> {
 String strKeys = records.stream().map(r -> 
r.getKey().toString()).collect(Collectors.joining(","));
 throw new SamzaException(String.format("Failed to put records with 
keys=" + strKeys), e);
   });
   }
 
   @Override
-  public void delete(K key) {
+  public void delete(K key, Object ... args) {
 try {
-  deleteAsync(key).get();
+  deleteAsync(key, args).get();
 } catch (Exception e) {
   throw new SamzaException(e);
 }
   }
 
   @Override
-  public CompletableFuture deleteAsync(K key) {
+  public CompletableFuture deleteAsync(K key, Object ... args) {
 Preconditions.checkNotNull(writeFn, "null write function");
 Preconditions.checkNotNull(key, "null key");
-return instrument(() -> asyncTable.deleteAsync(key), metrics.numDeletes, 
metrics.deleteNs)
+return instrument(() -> asyncTable.deleteAsync(key, args), 
metrics.numDeletes, metrics.deleteNs)
 .exceptionally(e -> {
 throw new SamzaException(String.format("Failed to delete the 
record for " + key), (Throwable) e);
   });
   }
 
   @Override
-  public void deleteAll(List keys) {
+  public void deleteAll(List keys, Object ... args) {
 try {
-  deleteAllAsync(keys).get();
+  deleteAllAsync(keys, args).get();
 } catch (Exception e) {
   throw new SamzaException(e);
 }
   }
 
   @Override
-  public CompletableFuture deleteAllAsync(List keys) {
+  public CompletableFuture deleteAllAsync(List keys, Object ... args) 
{
 Preconditions.checkNotNull(writeFn, "null write function");
 Preconditions.checkNotNull(keys, "null keys");
 if (keys.isEmpty()) {
   return CompletableFuture.completedFuture(null);
 }
 
-return instrument(() -> asyncTable.deleteAllAsync(keys), 
metrics.numDeleteAlls, metrics.deleteAllNs)
+return instrument(() -> asyncTable.deleteAllAsync(keys, args), 
metrics.numDeleteAlls, metrics.deleteAllNs)
 .exceptionally(e -> {
-throw new SamzaException(String.format("Failed to delete records 
for " + keys), (Throwable) e);
+throw new SamzaException(String.format("Failed to delete records 
for " + keys), e);
+  });
+  }
+
+  @Override
+  public  CompletableFuture writeAsync(int opId, Object... args) {
+return (CompletableFuture) instrument(() -> asyncTable.writeAsync(opId, 
args), metrics.numWrites, metrics.writeNs)
+.exceptionally(e -> {
+throw new SamzaException(String.format("Failed to write, opId=%d", 
opId), e);
   });
   }
 
   @Override
   public void init(Context context) {
 super.init(context);
 asyncTable.init(context);
+readFn.init(context, this);
 
 Review comment:
   Since I need to pass this here, so that table functions get a reference of 
the remote table to get rate limiter, etc. hooked in during invocation. I 
removed the init part from AsyncRemoteTable and added a comment to indicate the 
move.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [samza] shanthoosh commented on a change in pull request #1027: SAMZA-2046: Startpoint fan out implementation

2019-05-16 Thread GitBox
shanthoosh commented on a change in pull request #1027: SAMZA-2046: Startpoint 
fan out implementation
URL: https://github.com/apache/samza/pull/1027#discussion_r284837838
 
 

 ##
 File path: 
samza-core/src/main/java/org/apache/samza/startpoint/StartpointManager.java
 ##
 @@ -20,87 +20,115 @@
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
+import java.io.IOException;
 import java.time.Duration;
 import java.time.Instant;
-import java.util.HashSet;
-import java.util.Objects;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.Set;
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.commons.collections4.MapUtils;
+import org.apache.commons.lang3.ArrayUtils;
 import org.apache.samza.SamzaException;
-import org.apache.samza.config.Config;
 import org.apache.samza.container.TaskName;
 import 
org.apache.samza.coordinator.metadatastore.NamespaceAwareCoordinatorStreamStore;
-import org.apache.samza.job.model.ContainerModel;
-import org.apache.samza.job.model.JobModel;
-import org.apache.samza.job.model.TaskModel;
 import org.apache.samza.metadatastore.MetadataStore;
-import org.apache.samza.metadatastore.MetadataStoreFactory;
-import org.apache.samza.metrics.MetricsRegistry;
-import org.apache.samza.serializers.JsonSerdeV2;
 import org.apache.samza.system.SystemStreamPartition;
+import org.codehaus.jackson.map.ObjectMapper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 
 /**
- * The StartpointManager reads and writes {@link Startpoint} to the {@link 
MetadataStore} defined by
- * the configuration task.startpoint.metadata.store.factory.
- *
- * Startpoints are keyed in the MetadataStore by two different formats:
- * 1) Only by {@link SystemStreamPartition}
- * 2) A combination of {@link SystemStreamPartition} and {@link TaskName}
+ * The StartpointManager reads and writes {@link Startpoint} to the provided 
{@link MetadataStore}
  *
  * The intention for the StartpointManager is to maintain a strong contract 
between the caller
  * and how Startpoints are stored in the underlying MetadataStore.
+ *
+ * Startpoints are written in the MetadataStore using keys of two different 
formats:
+ * 1) {@link SystemStreamPartition} only
+ * 2) A combination of {@link SystemStreamPartition} and {@link TaskName}
+ *
+ * Startpoints are then fanned out to a fan out namespace in the MetadataStore 
by the
+ * {@link org.apache.samza.clustermanager.ClusterBasedJobCoordinator} or the 
standalone
+ * {@link org.apache.samza.coordinator.JobCoordinator} upon startup and the
+ * {@link org.apache.samza.checkpoint.OffsetManager} gets the fan outs to set 
the starting offsets per task and per
+ * {@link SystemStreamPartition}. The fan outs are deleted once the offsets 
are committed to the checkpoint.
+ *
+ * The read, write and delete methods are intended for external callers.
+ * The fan out methods are intended to be used within a job coordinator.
  */
 public class StartpointManager {
-  private static final Logger LOG = 
LoggerFactory.getLogger(StartpointManager.class);
-  public static final String NAMESPACE = "samza-startpoint-v1";
+  public static final Integer VERSION = 1;
+  public static final String NAMESPACE = "samza-startpoint-v" + VERSION;
 
   static final Duration DEFAULT_EXPIRATION_DURATION = Duration.ofHours(12);
 
-  private final MetadataStore metadataStore;
-  private final StartpointSerde startpointSerde = new StartpointSerde();
-
-  private boolean stopped = false;
+  private static final Logger LOG = 
LoggerFactory.getLogger(StartpointManager.class);
+  private static final String NAMESPACE_FAN_OUT = NAMESPACE + "-fan-out";
 
-  /**
-   *  Constructs a {@link StartpointManager} instance by instantiating a new 
metadata store connection.
-   *  This is primarily used for testing.
-   */
-  @VisibleForTesting
-  StartpointManager(MetadataStoreFactory metadataStoreFactory, Config config, 
MetricsRegistry metricsRegistry) {
-Preconditions.checkNotNull(metadataStoreFactory, "MetadataStoreFactory 
cannot be null");
-Preconditions.checkNotNull(config, "Config cannot be null");
-Preconditions.checkNotNull(metricsRegistry, "MetricsRegistry cannot be 
null");
+  private final MetadataStore metadataStore;
+  private final NamespaceAwareCoordinatorStreamStore fanOutStore;
+  private final NamespaceAwareCoordinatorStreamStore readWriteStore;
+  private final ObjectMapper objectMapper = 
StartpointObjectMapper.getObjectMapper();
 
-this.metadataStore = metadataStoreFactory.getMetadataStore(NAMESPACE, 
config, metricsRegistry);
-LOG.info("StartpointManager created with metadata store: {}", 
metadataStore.getClass().getCanonicalName());
-this.metadataStore.init();
-  }
+  private boolean stopped = true;
 
   /**
*  Builds the 

[GitHub] [samza] shanthoosh commented on a change in pull request #1027: SAMZA-2046: Startpoint fan out implementation

2019-05-16 Thread GitBox
shanthoosh commented on a change in pull request #1027: SAMZA-2046: Startpoint 
fan out implementation
URL: https://github.com/apache/samza/pull/1027#discussion_r284837838
 
 

 ##
 File path: 
samza-core/src/main/java/org/apache/samza/startpoint/StartpointManager.java
 ##
 @@ -20,87 +20,115 @@
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
+import java.io.IOException;
 import java.time.Duration;
 import java.time.Instant;
-import java.util.HashSet;
-import java.util.Objects;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.Set;
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.commons.collections4.MapUtils;
+import org.apache.commons.lang3.ArrayUtils;
 import org.apache.samza.SamzaException;
-import org.apache.samza.config.Config;
 import org.apache.samza.container.TaskName;
 import 
org.apache.samza.coordinator.metadatastore.NamespaceAwareCoordinatorStreamStore;
-import org.apache.samza.job.model.ContainerModel;
-import org.apache.samza.job.model.JobModel;
-import org.apache.samza.job.model.TaskModel;
 import org.apache.samza.metadatastore.MetadataStore;
-import org.apache.samza.metadatastore.MetadataStoreFactory;
-import org.apache.samza.metrics.MetricsRegistry;
-import org.apache.samza.serializers.JsonSerdeV2;
 import org.apache.samza.system.SystemStreamPartition;
+import org.codehaus.jackson.map.ObjectMapper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 
 /**
- * The StartpointManager reads and writes {@link Startpoint} to the {@link 
MetadataStore} defined by
- * the configuration task.startpoint.metadata.store.factory.
- *
- * Startpoints are keyed in the MetadataStore by two different formats:
- * 1) Only by {@link SystemStreamPartition}
- * 2) A combination of {@link SystemStreamPartition} and {@link TaskName}
+ * The StartpointManager reads and writes {@link Startpoint} to the provided 
{@link MetadataStore}
  *
  * The intention for the StartpointManager is to maintain a strong contract 
between the caller
  * and how Startpoints are stored in the underlying MetadataStore.
+ *
+ * Startpoints are written in the MetadataStore using keys of two different 
formats:
+ * 1) {@link SystemStreamPartition} only
+ * 2) A combination of {@link SystemStreamPartition} and {@link TaskName}
+ *
+ * Startpoints are then fanned out to a fan out namespace in the MetadataStore 
by the
+ * {@link org.apache.samza.clustermanager.ClusterBasedJobCoordinator} or the 
standalone
+ * {@link org.apache.samza.coordinator.JobCoordinator} upon startup and the
+ * {@link org.apache.samza.checkpoint.OffsetManager} gets the fan outs to set 
the starting offsets per task and per
+ * {@link SystemStreamPartition}. The fan outs are deleted once the offsets 
are committed to the checkpoint.
+ *
+ * The read, write and delete methods are intended for external callers.
+ * The fan out methods are intended to be used within a job coordinator.
  */
 public class StartpointManager {
-  private static final Logger LOG = 
LoggerFactory.getLogger(StartpointManager.class);
-  public static final String NAMESPACE = "samza-startpoint-v1";
+  public static final Integer VERSION = 1;
+  public static final String NAMESPACE = "samza-startpoint-v" + VERSION;
 
   static final Duration DEFAULT_EXPIRATION_DURATION = Duration.ofHours(12);
 
-  private final MetadataStore metadataStore;
-  private final StartpointSerde startpointSerde = new StartpointSerde();
-
-  private boolean stopped = false;
+  private static final Logger LOG = 
LoggerFactory.getLogger(StartpointManager.class);
+  private static final String NAMESPACE_FAN_OUT = NAMESPACE + "-fan-out";
 
-  /**
-   *  Constructs a {@link StartpointManager} instance by instantiating a new 
metadata store connection.
-   *  This is primarily used for testing.
-   */
-  @VisibleForTesting
-  StartpointManager(MetadataStoreFactory metadataStoreFactory, Config config, 
MetricsRegistry metricsRegistry) {
-Preconditions.checkNotNull(metadataStoreFactory, "MetadataStoreFactory 
cannot be null");
-Preconditions.checkNotNull(config, "Config cannot be null");
-Preconditions.checkNotNull(metricsRegistry, "MetricsRegistry cannot be 
null");
+  private final MetadataStore metadataStore;
+  private final NamespaceAwareCoordinatorStreamStore fanOutStore;
+  private final NamespaceAwareCoordinatorStreamStore readWriteStore;
+  private final ObjectMapper objectMapper = 
StartpointObjectMapper.getObjectMapper();
 
-this.metadataStore = metadataStoreFactory.getMetadataStore(NAMESPACE, 
config, metricsRegistry);
-LOG.info("StartpointManager created with metadata store: {}", 
metadataStore.getClass().getCanonicalName());
-this.metadataStore.init();
-  }
+  private boolean stopped = true;
 
   /**
*  Builds the 

[GitHub] [samza] dengpanyin commented on a change in pull request #1034: SAMZA-2185: Add ability to expose remote store specific features in remote table

2019-05-16 Thread GitBox
dengpanyin commented on a change in pull request #1034: SAMZA-2185: Add ability 
to expose remote store specific features in remote table
URL: https://github.com/apache/samza/pull/1034#discussion_r284818562
 
 

 ##
 File path: 
samza-core/src/main/java/org/apache/samza/table/remote/AsyncRemoteTable.java
 ##
 @@ -46,45 +48,66 @@ public AsyncRemoteTable(TableReadFunction readFn, 
TableWriteFunction
   }
 
   @Override
-  public CompletableFuture getAsync(K key) {
-return readFn.getAsync(key);
+  public CompletableFuture getAsync(K key, Object ... args) {
 
 Review comment:
   checkNotNull for args, maybe key as well?
   Should "args.length" check be deferred to the raadFn by passing the args to 
raadFn?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [samza] dengpanyin commented on a change in pull request #1034: SAMZA-2185: Add ability to expose remote store specific features in remote table

2019-05-16 Thread GitBox
dengpanyin commented on a change in pull request #1034: SAMZA-2185: Add ability 
to expose remote store specific features in remote table
URL: https://github.com/apache/samza/pull/1034#discussion_r284828100
 
 

 ##
 File path: 
samza-core/src/main/java/org/apache/samza/table/remote/RemoteTable.java
 ##
 @@ -241,64 +249,76 @@ public void putAll(List> entries) {
 
 CompletableFuture deleteFuture = deleteKeys.isEmpty()
 ? CompletableFuture.completedFuture(null)
-: deleteAllAsync(deleteKeys);
+: deleteAllAsync(deleteKeys, args);
 
 // Return the combined future
 return CompletableFuture.allOf(
 deleteFuture,
-instrument(() -> asyncTable.putAllAsync(putRecords), 
metrics.numPutAlls, metrics.putAllNs))
+instrument(() -> asyncTable.putAllAsync(putRecords, args), 
metrics.numPutAlls, metrics.putAllNs))
 .exceptionally(e -> {
 String strKeys = records.stream().map(r -> 
r.getKey().toString()).collect(Collectors.joining(","));
 throw new SamzaException(String.format("Failed to put records with 
keys=" + strKeys), e);
   });
   }
 
   @Override
-  public void delete(K key) {
+  public void delete(K key, Object ... args) {
 try {
-  deleteAsync(key).get();
+  deleteAsync(key, args).get();
 } catch (Exception e) {
   throw new SamzaException(e);
 }
   }
 
   @Override
-  public CompletableFuture deleteAsync(K key) {
+  public CompletableFuture deleteAsync(K key, Object ... args) {
 Preconditions.checkNotNull(writeFn, "null write function");
 Preconditions.checkNotNull(key, "null key");
-return instrument(() -> asyncTable.deleteAsync(key), metrics.numDeletes, 
metrics.deleteNs)
+return instrument(() -> asyncTable.deleteAsync(key, args), 
metrics.numDeletes, metrics.deleteNs)
 .exceptionally(e -> {
 throw new SamzaException(String.format("Failed to delete the 
record for " + key), (Throwable) e);
   });
   }
 
   @Override
-  public void deleteAll(List keys) {
+  public void deleteAll(List keys, Object ... args) {
 try {
-  deleteAllAsync(keys).get();
+  deleteAllAsync(keys, args).get();
 } catch (Exception e) {
   throw new SamzaException(e);
 }
   }
 
   @Override
-  public CompletableFuture deleteAllAsync(List keys) {
+  public CompletableFuture deleteAllAsync(List keys, Object ... args) 
{
 Preconditions.checkNotNull(writeFn, "null write function");
 Preconditions.checkNotNull(keys, "null keys");
 if (keys.isEmpty()) {
   return CompletableFuture.completedFuture(null);
 }
 
-return instrument(() -> asyncTable.deleteAllAsync(keys), 
metrics.numDeleteAlls, metrics.deleteAllNs)
+return instrument(() -> asyncTable.deleteAllAsync(keys, args), 
metrics.numDeleteAlls, metrics.deleteAllNs)
 .exceptionally(e -> {
-throw new SamzaException(String.format("Failed to delete records 
for " + keys), (Throwable) e);
+throw new SamzaException(String.format("Failed to delete records 
for " + keys), e);
+  });
+  }
+
+  @Override
+  public  CompletableFuture writeAsync(int opId, Object... args) {
+return (CompletableFuture) instrument(() -> asyncTable.writeAsync(opId, 
args), metrics.numWrites, metrics.writeNs)
+.exceptionally(e -> {
+throw new SamzaException(String.format("Failed to write, opId=%d", 
opId), e);
   });
   }
 
   @Override
   public void init(Context context) {
 super.init(context);
 asyncTable.init(context);
+readFn.init(context, this);
 
 Review comment:
   asyncTable already initizize readFn/writeFn,  why still need this?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [samza] dengpanyin commented on a change in pull request #1034: SAMZA-2185: Add ability to expose remote store specific features in remote table

2019-05-16 Thread GitBox
dengpanyin commented on a change in pull request #1034: SAMZA-2185: Add ability 
to expose remote store specific features in remote table
URL: https://github.com/apache/samza/pull/1034#discussion_r284809446
 
 

 ##
 File path: 
samza-api/src/main/java/org/apache/samza/table/remote/TableRateLimiter.java
 ##
 @@ -63,9 +63,20 @@
  * Get the number of credits required for the {@code key} and {@code 
value} pair.
  * @param key table key
  * @param value table record
+ * @param args additional arguments
  * @return number of credits
  */
-int getCredits(K key, V value);
+int getCredits(K key, V value, Object ... args);
+
+/**
+ * Get the number of credits required for the {@code opId} and associated 
{@code args}.
+ * @param opId operation Id
+ * @param args additional arguments
+ * @return number of credits
+ */
+default int getCredit(int opId, Object ... args) {
 
 Review comment:
   rename it be to "getCredits" to make the name consistent ?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [samza] mynameborat commented on a change in pull request #938: SAMZA-1531: Support run.id in standalone for batch processing.

2019-05-16 Thread GitBox
mynameborat commented on a change in pull request #938: SAMZA-1531: Support 
run.id in standalone for batch processing.
URL: https://github.com/apache/samza/pull/938#discussion_r284819953
 
 

 ##
 File path: 
samza-core/src/main/java/org/apache/samza/coordinator/ClusterMembership.java
 ##
 @@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.coordinator;
+
+import org.apache.samza.annotation.InterfaceStability;
+
+/**
+ * Coordination Primitive to maintain the list of processors in the quorum
+ *
+ * Guarantees:
 
 Review comment:
   I feel the documentation needs to address two aspects of the contract.
   1. What are the responsibilities/expectations of the implementors of the 
interface?
   2. What are the guarantees provided to the users of the interface? 
   
   It will be nice to have this separation since right now, it is not unclear 
who is targeted in these sections. 
   Also, I suppose what you mean by 1 and 2 under guarantees is referred to as 
`Linearizability`. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [samza] mynameborat commented on a change in pull request #938: SAMZA-1531: Support run.id in standalone for batch processing.

2019-05-16 Thread GitBox
mynameborat commented on a change in pull request #938: SAMZA-1531: Support 
run.id in standalone for batch processing.
URL: https://github.com/apache/samza/pull/938#discussion_r284829002
 
 

 ##
 File path: 
samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java
 ##
 @@ -77,20 +93,81 @@
*/
   public LocalApplicationRunner(SamzaApplication app, Config config) {
 this.appDesc = ApplicationDescriptorUtil.getAppDescriptor(app, config);
-this.planner = new LocalJobPlanner(appDesc);
+isAppModeBatch = new ApplicationConfig(config).getAppMode() == 
ApplicationConfig.ApplicationMode.BATCH;
+coordinationUtils = getCoordinationUtils(config);
   }
 
   /**
* Constructor only used in unit test to allow injection of {@link 
LocalJobPlanner}
*/
   @VisibleForTesting
-  LocalApplicationRunner(ApplicationDescriptorImpl appDesc, LocalJobPlanner planner) {
+  LocalApplicationRunner(ApplicationDescriptorImpl appDesc, Optional coordinationUtils) {
 this.appDesc = appDesc;
-this.planner = planner;
+isAppModeBatch = new ApplicationConfig(appDesc.getConfig()).getAppMode() 
== ApplicationConfig.ApplicationMode.BATCH;
+this.coordinationUtils = coordinationUtils;
+  }
+
+  private Optional getCoordinationUtils(Config config) {
+if (!isAppModeBatch) {
+  return Optional.empty();
+}
+JobCoordinatorConfig jcConfig = new JobCoordinatorConfig(config);
+CoordinationUtils coordinationUtils = 
jcConfig.getCoordinationUtilsFactory().getCoordinationUtils(CoordinationConstants.APPLICATION_RUNNER_PATH_SUFFIX,
 PROCESSOR_ID, config);
+return Optional.ofNullable(coordinationUtils);
+  }
+
+  /**
+   * @return LocalJobPlanner created
+   */
+  @VisibleForTesting
+  LocalJobPlanner getPlanner() {
+boolean isAppModeBatch = new 
ApplicationConfig(appDesc.getConfig()).getAppMode() == 
ApplicationConfig.ApplicationMode.BATCH;
+if (!isAppModeBatch) {
+  return new LocalJobPlanner(appDesc, PROCESSOR_ID);
+}
+CoordinationUtils coordinationUtils = null;
+String runId = null;
+if (this.coordinationUtils.isPresent()) {
+  coordinationUtils = this.coordinationUtils.get();
+}
+if (this.runId.isPresent()) {
+  runId = this.runId.get();
+}
+return new LocalJobPlanner(appDesc, coordinationUtils, PROCESSOR_ID, 
runId);
+  }
+
+
+  private void initializeRunId() {
 
 Review comment:
   It is hard to read through this. Especially around when the runId isn't 
initialized.
   We should at least separate the scenarios into categories
   1. We don't need to initialize runId because we explicitly chose to. e.g. 
stream mode
   2. We can't initialize the runId because we don't have everything we need to 
construct runIdGenerator. 
   3. We didn't initialize runId because we failed to generate runId.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [samza] mynameborat commented on a change in pull request #938: SAMZA-1531: Support run.id in standalone for batch processing.

2019-05-16 Thread GitBox
mynameborat commented on a change in pull request #938: SAMZA-1531: Support 
run.id in standalone for batch processing.
URL: https://github.com/apache/samza/pull/938#discussion_r284806205
 
 

 ##
 File path: 
samza-azure/src/main/java/org/apache/samza/coordinator/AzureCoordinationUtils.java
 ##
 @@ -45,11 +44,19 @@ public Latch getLatch(int size, String latchId) throws 
UnsupportedOperationExcep
 return null;
   }
 
+  // To support DistributedLock in Azure, even MetadataStore needs to be 
implemented.
 
 Review comment:
   nit: can we change this to java docs instead of comments?
   also, link the `MetadataStore` to the interface so that it is clear on what 
is referred here.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [samza] mynameborat commented on a change in pull request #938: SAMZA-1531: Support run.id in standalone for batch processing.

2019-05-16 Thread GitBox
mynameborat commented on a change in pull request #938: SAMZA-1531: Support 
run.id in standalone for batch processing.
URL: https://github.com/apache/samza/pull/938#discussion_r284829361
 
 

 ##
 File path: 
samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java
 ##
 @@ -77,20 +93,81 @@
*/
   public LocalApplicationRunner(SamzaApplication app, Config config) {
 this.appDesc = ApplicationDescriptorUtil.getAppDescriptor(app, config);
-this.planner = new LocalJobPlanner(appDesc);
+isAppModeBatch = new ApplicationConfig(config).getAppMode() == 
ApplicationConfig.ApplicationMode.BATCH;
+coordinationUtils = getCoordinationUtils(config);
   }
 
   /**
* Constructor only used in unit test to allow injection of {@link 
LocalJobPlanner}
*/
   @VisibleForTesting
-  LocalApplicationRunner(ApplicationDescriptorImpl appDesc, LocalJobPlanner planner) {
+  LocalApplicationRunner(ApplicationDescriptorImpl appDesc, Optional coordinationUtils) {
 this.appDesc = appDesc;
-this.planner = planner;
+isAppModeBatch = new ApplicationConfig(appDesc.getConfig()).getAppMode() 
== ApplicationConfig.ApplicationMode.BATCH;
+this.coordinationUtils = coordinationUtils;
+  }
+
+  private Optional getCoordinationUtils(Config config) {
+if (!isAppModeBatch) {
+  return Optional.empty();
+}
+JobCoordinatorConfig jcConfig = new JobCoordinatorConfig(config);
+CoordinationUtils coordinationUtils = 
jcConfig.getCoordinationUtilsFactory().getCoordinationUtils(CoordinationConstants.APPLICATION_RUNNER_PATH_SUFFIX,
 PROCESSOR_ID, config);
+return Optional.ofNullable(coordinationUtils);
+  }
+
+  /**
+   * @return LocalJobPlanner created
+   */
+  @VisibleForTesting
+  LocalJobPlanner getPlanner() {
+boolean isAppModeBatch = new 
ApplicationConfig(appDesc.getConfig()).getAppMode() == 
ApplicationConfig.ApplicationMode.BATCH;
+if (!isAppModeBatch) {
+  return new LocalJobPlanner(appDesc, PROCESSOR_ID);
+}
+CoordinationUtils coordinationUtils = null;
+String runId = null;
+if (this.coordinationUtils.isPresent()) {
+  coordinationUtils = this.coordinationUtils.get();
+}
+if (this.runId.isPresent()) {
+  runId = this.runId.get();
+}
+return new LocalJobPlanner(appDesc, coordinationUtils, PROCESSOR_ID, 
runId);
+  }
+
+
+  private void initializeRunId() {
+try {
+  MetadataStore metadataStore = getMetadataStore();
+  if (coordinationUtils.isPresent() && metadataStore != null) {
+runIdGenerator = Optional.of(new 
RunIdGenerator(coordinationUtils.get(), metadataStore));
+  }
+  if (!coordinationUtils.isPresent() || !isAppModeBatch || 
!runIdGenerator.isPresent()) {
+LOG.warn("coordination utils or run id generator could not be created 
successfully!");
+return;
+  }
+  runId = runIdGenerator.get().getRunId();
 
 Review comment:
   runIdGenerator check can be removed and you can simplify this to `runId = 
runIdGenerator.ifPresent(RunIdGenerator::getRunId);`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [samza] mynameborat commented on a change in pull request #938: SAMZA-1531: Support run.id in standalone for batch processing.

2019-05-16 Thread GitBox
mynameborat commented on a change in pull request #938: SAMZA-1531: Support 
run.id in standalone for batch processing.
URL: https://github.com/apache/samza/pull/938#discussion_r284817518
 
 

 ##
 File path: 
samza-core/src/main/java/org/apache/samza/coordinator/ClusterMembership.java
 ##
 @@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.coordinator;
+
+import org.apache.samza.annotation.InterfaceStability;
+
+/**
+ * Coordination Primitive to maintain the list of processors in the quorum
+ *
+ * Guarantees:
+ * 1. after registerProcessor, getNumberOfProcessors should return count 
inclusive of at least the current processor
+ * 2. after unregisterProcessor, getNumberOfProcessors should return count 
exclusive of at least the current processor
+ *
+ * Non-guarantees:
+ * 1. thread safe
+ * 2. concurrent access of the list
 
 Review comment:
   what list are we referring to? can you clarify the comment a bit more?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [samza] mynameborat commented on a change in pull request #938: SAMZA-1531: Support run.id in standalone for batch processing.

2019-05-16 Thread GitBox
mynameborat commented on a change in pull request #938: SAMZA-1531: Support 
run.id in standalone for batch processing.
URL: https://github.com/apache/samza/pull/938#discussion_r284826566
 
 

 ##
 File path: 
samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java
 ##
 @@ -77,20 +93,81 @@
*/
   public LocalApplicationRunner(SamzaApplication app, Config config) {
 this.appDesc = ApplicationDescriptorUtil.getAppDescriptor(app, config);
-this.planner = new LocalJobPlanner(appDesc);
+isAppModeBatch = new ApplicationConfig(config).getAppMode() == 
ApplicationConfig.ApplicationMode.BATCH;
+coordinationUtils = getCoordinationUtils(config);
   }
 
   /**
* Constructor only used in unit test to allow injection of {@link 
LocalJobPlanner}
*/
   @VisibleForTesting
-  LocalApplicationRunner(ApplicationDescriptorImpl appDesc, LocalJobPlanner planner) {
+  LocalApplicationRunner(ApplicationDescriptorImpl appDesc, Optional coordinationUtils) {
 this.appDesc = appDesc;
-this.planner = planner;
+isAppModeBatch = new ApplicationConfig(appDesc.getConfig()).getAppMode() 
== ApplicationConfig.ApplicationMode.BATCH;
+this.coordinationUtils = coordinationUtils;
+  }
+
+  private Optional getCoordinationUtils(Config config) {
+if (!isAppModeBatch) {
+  return Optional.empty();
+}
+JobCoordinatorConfig jcConfig = new JobCoordinatorConfig(config);
+CoordinationUtils coordinationUtils = 
jcConfig.getCoordinationUtilsFactory().getCoordinationUtils(CoordinationConstants.APPLICATION_RUNNER_PATH_SUFFIX,
 PROCESSOR_ID, config);
+return Optional.ofNullable(coordinationUtils);
+  }
+
+  /**
+   * @return LocalJobPlanner created
+   */
+  @VisibleForTesting
+  LocalJobPlanner getPlanner() {
+boolean isAppModeBatch = new 
ApplicationConfig(appDesc.getConfig()).getAppMode() == 
ApplicationConfig.ApplicationMode.BATCH;
+if (!isAppModeBatch) {
+  return new LocalJobPlanner(appDesc, PROCESSOR_ID);
+}
+CoordinationUtils coordinationUtils = null;
+String runId = null;
+if (this.coordinationUtils.isPresent()) {
+  coordinationUtils = this.coordinationUtils.get();
+}
+if (this.runId.isPresent()) {
+  runId = this.runId.get();
+}
+return new LocalJobPlanner(appDesc, coordinationUtils, PROCESSOR_ID, 
runId);
+  }
+
+
+  private void initializeRunId() {
+try {
+  MetadataStore metadataStore = getMetadataStore();
+  if (coordinationUtils.isPresent() && metadataStore != null) {
+runIdGenerator = Optional.of(new 
RunIdGenerator(coordinationUtils.get(), metadataStore));
+  }
+  if (!coordinationUtils.isPresent() || !isAppModeBatch || 
!runIdGenerator.isPresent()) {
+LOG.warn("coordination utils or run id generator could not be created 
successfully!");
+return;
+  }
+  runId = runIdGenerator.get().getRunId();
+} catch (Exception e) {
+  LOG.warn("Failed to generate run id. Will continue execution without a 
run id. Caused by {}", e);
+}
+  }
+
+  public String getRunid() {
+if (runId.isPresent()) {
 
 Review comment:
   can be simplified to `return runId.orElse(null);` independently, why are we 
returning null here? Why not simply return `Optional`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [samza] mynameborat commented on a change in pull request #938: SAMZA-1531: Support run.id in standalone for batch processing.

2019-05-16 Thread GitBox
mynameborat commented on a change in pull request #938: SAMZA-1531: Support 
run.id in standalone for batch processing.
URL: https://github.com/apache/samza/pull/938#discussion_r284829974
 
 

 ##
 File path: 
samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java
 ##
 @@ -77,20 +93,81 @@
*/
   public LocalApplicationRunner(SamzaApplication app, Config config) {
 this.appDesc = ApplicationDescriptorUtil.getAppDescriptor(app, config);
-this.planner = new LocalJobPlanner(appDesc);
+isAppModeBatch = new ApplicationConfig(config).getAppMode() == 
ApplicationConfig.ApplicationMode.BATCH;
+coordinationUtils = getCoordinationUtils(config);
   }
 
   /**
* Constructor only used in unit test to allow injection of {@link 
LocalJobPlanner}
*/
   @VisibleForTesting
-  LocalApplicationRunner(ApplicationDescriptorImpl appDesc, LocalJobPlanner planner) {
+  LocalApplicationRunner(ApplicationDescriptorImpl appDesc, Optional coordinationUtils) {
 this.appDesc = appDesc;
-this.planner = planner;
+isAppModeBatch = new ApplicationConfig(appDesc.getConfig()).getAppMode() 
== ApplicationConfig.ApplicationMode.BATCH;
+this.coordinationUtils = coordinationUtils;
+  }
+
+  private Optional getCoordinationUtils(Config config) {
+if (!isAppModeBatch) {
+  return Optional.empty();
+}
+JobCoordinatorConfig jcConfig = new JobCoordinatorConfig(config);
+CoordinationUtils coordinationUtils = 
jcConfig.getCoordinationUtilsFactory().getCoordinationUtils(CoordinationConstants.APPLICATION_RUNNER_PATH_SUFFIX,
 PROCESSOR_ID, config);
+return Optional.ofNullable(coordinationUtils);
+  }
+
+  /**
+   * @return LocalJobPlanner created
+   */
+  @VisibleForTesting
+  LocalJobPlanner getPlanner() {
+boolean isAppModeBatch = new 
ApplicationConfig(appDesc.getConfig()).getAppMode() == 
ApplicationConfig.ApplicationMode.BATCH;
+if (!isAppModeBatch) {
+  return new LocalJobPlanner(appDesc, PROCESSOR_ID);
+}
+CoordinationUtils coordinationUtils = null;
+String runId = null;
+if (this.coordinationUtils.isPresent()) {
+  coordinationUtils = this.coordinationUtils.get();
+}
+if (this.runId.isPresent()) {
+  runId = this.runId.get();
+}
+return new LocalJobPlanner(appDesc, coordinationUtils, PROCESSOR_ID, 
runId);
+  }
+
+
+  private void initializeRunId() {
+try {
+  MetadataStore metadataStore = getMetadataStore();
+  if (coordinationUtils.isPresent() && metadataStore != null) {
+runIdGenerator = Optional.of(new 
RunIdGenerator(coordinationUtils.get(), metadataStore));
+  }
+  if (!coordinationUtils.isPresent() || !isAppModeBatch || 
!runIdGenerator.isPresent()) {
+LOG.warn("coordination utils or run id generator could not be created 
successfully!");
+return;
+  }
+  runId = runIdGenerator.get().getRunId();
+} catch (Exception e) {
+  LOG.warn("Failed to generate run id. Will continue execution without a 
run id. Caused by {}", e);
+}
+  }
+
+  public String getRunid() {
+if (runId.isPresent()) {
+  return runId.get();
+}
+return null;
   }
 
   @Override
   public void run(ExternalContext externalContext) {
+if (isAppModeBatch) {
 
 Review comment:
   can we remove this check since we already do this check inside the 
`initializeRunId()`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [samza] mynameborat commented on a change in pull request #938: SAMZA-1531: Support run.id in standalone for batch processing.

2019-05-16 Thread GitBox
mynameborat commented on a change in pull request #938: SAMZA-1531: Support 
run.id in standalone for batch processing.
URL: https://github.com/apache/samza/pull/938#discussion_r284830168
 
 

 ##
 File path: 
samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationUtils.java
 ##
 @@ -52,11 +53,12 @@ public Latch getLatch(int size, String latchId) {
   }
 
   @Override
-  public DistributedLockWithState getLockWithState(String lockId) {
+  public DistributedLock getLock(String lockId) {
 return new ZkDistributedLock(processorIdStr, zkUtils, lockId);
   }
 
   public void close() {
+System.out.println("Manasa: closing coordination utils");
 
 Review comment:
   nit: remove


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [samza] mynameborat commented on a change in pull request #938: SAMZA-1531: Support run.id in standalone for batch processing.

2019-05-16 Thread GitBox
mynameborat commented on a change in pull request #938: SAMZA-1531: Support 
run.id in standalone for batch processing.
URL: https://github.com/apache/samza/pull/938#discussion_r284820619
 
 

 ##
 File path: 
samza-core/src/main/java/org/apache/samza/coordinator/RunIdGenerator.java
 ##
 @@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.coordinator;
+
+import com.google.common.base.Preconditions;
+import java.io.UnsupportedEncodingException;
+import java.time.Duration;
+import java.util.Optional;
+import java.util.UUID;
+import org.apache.samza.SamzaException;
+import org.apache.samza.metadatastore.MetadataStore;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Generates RunId for Standalone use case
+ * If there is only one processor in the quorum (registered with 
ClusterMembership) then create new runid and add to store
+ * Else read runid from the store
+ *
+ * Steps to generate:
+ * 1. acquire lock
+ * 2. add self to quorum (register itself with ClusterMembership)
+ * 3. get number of processors in quorum
+ * 4. if qurorum size is 1 (only self) then create new runid and write to store
+ * 5. if quorum size if greater than 1 then read runid from store
+ * 6. unlock
+ */
+public class RunIdGenerator {
+  private static final Logger LOG = 
LoggerFactory.getLogger(RunIdGenerator.class);
+
+  private final CoordinationUtils coordinationUtils;
+  private final MetadataStore metadataStore;
+  private final ClusterMembership clusterMembership;
+  private String processorId = null;
+  private boolean closed = false;
 
 Review comment:
   should be volatile?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [samza] mynameborat commented on a change in pull request #938: SAMZA-1531: Support run.id in standalone for batch processing.

2019-05-16 Thread GitBox
mynameborat commented on a change in pull request #938: SAMZA-1531: Support 
run.id in standalone for batch processing.
URL: https://github.com/apache/samza/pull/938#discussion_r284824438
 
 

 ##
 File path: 
samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java
 ##
 @@ -77,20 +93,81 @@
*/
   public LocalApplicationRunner(SamzaApplication app, Config config) {
 this.appDesc = ApplicationDescriptorUtil.getAppDescriptor(app, config);
-this.planner = new LocalJobPlanner(appDesc);
+isAppModeBatch = new ApplicationConfig(config).getAppMode() == 
ApplicationConfig.ApplicationMode.BATCH;
+coordinationUtils = getCoordinationUtils(config);
   }
 
   /**
* Constructor only used in unit test to allow injection of {@link 
LocalJobPlanner}
*/
   @VisibleForTesting
-  LocalApplicationRunner(ApplicationDescriptorImpl appDesc, LocalJobPlanner planner) {
+  LocalApplicationRunner(ApplicationDescriptorImpl appDesc, Optional coordinationUtils) {
 this.appDesc = appDesc;
-this.planner = planner;
+isAppModeBatch = new ApplicationConfig(appDesc.getConfig()).getAppMode() 
== ApplicationConfig.ApplicationMode.BATCH;
+this.coordinationUtils = coordinationUtils;
+  }
+
+  private Optional getCoordinationUtils(Config config) {
+if (!isAppModeBatch) {
+  return Optional.empty();
+}
+JobCoordinatorConfig jcConfig = new JobCoordinatorConfig(config);
+CoordinationUtils coordinationUtils = 
jcConfig.getCoordinationUtilsFactory().getCoordinationUtils(CoordinationConstants.APPLICATION_RUNNER_PATH_SUFFIX,
 PROCESSOR_ID, config);
+return Optional.ofNullable(coordinationUtils);
+  }
+
+  /**
+   * @return LocalJobPlanner created
+   */
+  @VisibleForTesting
+  LocalJobPlanner getPlanner() {
+boolean isAppModeBatch = new 
ApplicationConfig(appDesc.getConfig()).getAppMode() == 
ApplicationConfig.ApplicationMode.BATCH;
+if (!isAppModeBatch) {
+  return new LocalJobPlanner(appDesc, PROCESSOR_ID);
+}
+CoordinationUtils coordinationUtils = null;
 
 Review comment:
   Can be simplified to 
   `coordinationUtils = this.coordinationUtils.orElse(null);`
   `runId = this.runId.orElse(null)`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [samza] mynameborat commented on a change in pull request #938: SAMZA-1531: Support run.id in standalone for batch processing.

2019-05-16 Thread GitBox
mynameborat commented on a change in pull request #938: SAMZA-1531: Support 
run.id in standalone for batch processing.
URL: https://github.com/apache/samza/pull/938#discussion_r284821188
 
 

 ##
 File path: 
samza-core/src/main/java/org/apache/samza/coordinator/RunIdGenerator.java
 ##
 @@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.coordinator;
+
+import com.google.common.base.Preconditions;
+import java.io.UnsupportedEncodingException;
+import java.time.Duration;
+import java.util.Optional;
+import java.util.UUID;
+import org.apache.samza.SamzaException;
+import org.apache.samza.metadatastore.MetadataStore;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Generates RunId for Standalone use case
+ * If there is only one processor in the quorum (registered with 
ClusterMembership) then create new runid and add to store
+ * Else read runid from the store
+ *
+ * Steps to generate:
+ * 1. acquire lock
+ * 2. add self to quorum (register itself with ClusterMembership)
+ * 3. get number of processors in quorum
+ * 4. if qurorum size is 1 (only self) then create new runid and write to store
+ * 5. if quorum size if greater than 1 then read runid from store
+ * 6. unlock
+ */
+public class RunIdGenerator {
+  private static final Logger LOG = 
LoggerFactory.getLogger(RunIdGenerator.class);
+
+  private final CoordinationUtils coordinationUtils;
+  private final MetadataStore metadataStore;
+  private final ClusterMembership clusterMembership;
+  private String processorId = null;
+  private boolean closed = false;
+
+  public RunIdGenerator(CoordinationUtils coordinationUtils, MetadataStore 
metadataStore) {
+Preconditions.checkNotNull(coordinationUtils, "CoordinationUtils cannot be 
null");
+Preconditions.checkNotNull(metadataStore, "MetadataStore cannot be null");
+this.coordinationUtils = coordinationUtils;
+this.metadataStore = metadataStore;
+this.clusterMembership = coordinationUtils.getClusterMembership();
+Preconditions.checkNotNull(this.clusterMembership, "Failed to create utils 
for run id generation");
+  }
+
+  public Optional getRunId() {
+DistributedLock runIdLock;
+String runId = null;
+
+runIdLock = coordinationUtils.getLock(CoordinationConstants.RUNID_LOCK_ID);
+if (runIdLock == null) {
+  throw new SamzaException("Failed to create utils for run id generation");
+}
+
+try {
+  // acquire lock to write or read run.id
+  if 
(runIdLock.lock(Duration.ofMillis(CoordinationConstants.LOCK_TIMEOUT_MS))) {
+LOG.info("lock acquired for run.id generation by this processor");
+processorId = clusterMembership.registerProcessor();
+int numberOfActiveProcessors = 
clusterMembership.getNumberOfProcessors();
+if (numberOfActiveProcessors == 0) {
+  String msg = String.format("Processor failed to fetch number of 
processors for run.id generation");
+  throw new SamzaException(msg);
+}
+if (numberOfActiveProcessors == 1) {
+  runId =
+  String.valueOf(System.currentTimeMillis()) + "-" + 
UUID.randomUUID().toString().substring(0, 8);
+  LOG.info("Writing the run id for this run as {}", runId);
+  metadataStore.put(CoordinationConstants.RUNID_STORE_KEY, 
runId.getBytes("UTF-8"));
+} else {
+  runId = new 
String(metadataStore.get(CoordinationConstants.RUNID_STORE_KEY));
+  LOG.info("Read the run id for this run as {}", runId);
+}
+runIdLock.unlock();
+  } else {
+throw new SamzaException("Processor timed out waiting to acquire lock 
for run.id generation");
+  }
+} catch (UnsupportedEncodingException e) {
+  throw new SamzaException("Processor could not serialize/deserialize 
string for run.id generation", e);
+}
+return Optional.ofNullable(runId);
+  }
+
+  /**
+   * might be called several times and hence should be idempotent
+   */
+  public void close() {
+if (!closed && clusterMembership != null && processorId != null) {
 
 Review comment:
   `clusterMembership` is never null right? We ensure that in the constructor.


[GitHub] [samza] xinyuiscool commented on a change in pull request #1034: SAMZA-2185: Add ability to expose remote store specific features in remote table

2019-05-16 Thread GitBox
xinyuiscool commented on a change in pull request #1034: SAMZA-2185: Add 
ability to expose remote store specific features in remote table
URL: https://github.com/apache/samza/pull/1034#discussion_r284824006
 
 

 ##
 File path: samza-api/src/main/java/org/apache/samza/table/ReadWriteTable.java
 ##
 @@ -59,33 +61,37 @@
*
* @param key the key with which the specified {@code value} is to be 
associated.
* @param value the value with which the specified {@code key} is to be 
associated.
+   * @param args additional arguments
* @throws NullPointerException if the specified {@code key} is {@code null}.
*/
-  void put(K key, V value);
+  void put(K key, V value, Object ... args);
 
   /**
* Updates the mappings of the specified key-value {@code entries}.
*
* A key is deleted from the table if its corresponding value is {@code 
null}.
*
* @param entries the updated mappings to put into this table.
+   * @param args additional arguments
* @throws NullPointerException if any of the specified {@code entries} has 
{@code null} as key.
*/
-  void putAll(List> entries);
+  void putAll(List> entries, Object ... args);
 
   /**
* Deletes the mapping for the specified {@code key} from this table (if 
such mapping exists).
*
* @param key the key for which the mapping is to be deleted.
+   * @param args additional arguments
* @throws NullPointerException if the specified {@code key} is {@code null}.
*/
-  void delete(K key);
+  void delete(K key, Object ... args);
 
   /**
* Deletes the mappings for the specified {@code keys} from this table.
*
* @param keys the keys for which the mappings are to be deleted.
+   * @param args additional arguments
* @throws NullPointerException if the specified {@code keys} list, or any 
of the keys, is {@code null}.
*/
-  void deleteAll(List keys);
+  void deleteAll(List keys, Object ... args);
 }
 
 Review comment:
   Shall we also add read() and write() too?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [samza] cameronlee314 commented on a change in pull request #1027: SAMZA-2046: Startpoint fan out implementation

2019-05-16 Thread GitBox
cameronlee314 commented on a change in pull request #1027: SAMZA-2046: 
Startpoint fan out implementation
URL: https://github.com/apache/samza/pull/1027#discussion_r284821797
 
 

 ##
 File path: 
samza-core/src/main/java/org/apache/samza/startpoint/StartpointObjectMapper.java
 ##
 @@ -39,12 +40,16 @@ static ObjectMapper getObjectMapper() {
 SimpleModule module = new SimpleModule("StartpointModule", new Version(1, 
0, 0, ""));
 module.addSerializer(Instant.class, new CustomInstantSerializer());
 module.addDeserializer(Instant.class, new CustomInstantDeserializer());
-
 objectMapper.registerModule(module);
-objectMapper.registerSubtypes(StartpointSpecific.class);
-objectMapper.registerSubtypes(StartpointTimestamp.class);
-objectMapper.registerSubtypes(StartpointUpcoming.class);
-objectMapper.registerSubtypes(StartpointOldest.class);
+
+// 1. To support polymorphism for serialization, the Startpoint subtypes 
must be registered here.
+// 2. The NamedType container class provides a logical name as an external 
identifier so that the full canonical
 
 Review comment:
   Sounds good. Just wanted to make sure there wasn't any gotcha with using the 
new way. I didn't need it to be easier than the old way. It's good that it 
makes the serialization more compact.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Resolved] (SAMZA-2195) Fix javadoc in SerdeUtils

2019-05-16 Thread Wei Song (JIRA)


 [ 
https://issues.apache.org/jira/browse/SAMZA-2195?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wei Song resolved SAMZA-2195.
-
Resolution: Fixed

> Fix javadoc in SerdeUtils
> -
>
> Key: SAMZA-2195
> URL: https://issues.apache.org/jira/browse/SAMZA-2195
> Project: Samza
>  Issue Type: Improvement
>Reporter: Wei Song
>Assignee: Wei Song
>Priority: Major
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> As per subject



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[samza] branch master updated (d7b5034 -> 57cce59)

2019-05-16 Thread weisong44
This is an automated email from the ASF dual-hosted git repository.

weisong44 pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git.


from d7b5034  Disable flaky test in samza-sql (#1033)
 new 3f7ed71  Added self to committer list
 new 5cbf9af  Merge remote-tracking branch 'upstream/master'
 new a15a7c9  Merge remote-tracking branch 'upstream/master'
 new aae0f38  Merge remote-tracking branch 'upstream/master'
 new 0440f75  Merge remote-tracking branch 'upstream/master'
 new 4782c61  Merge remote-tracking branch 'upstream/master'
 new f28b491  Merge remote-tracking branch 'upstream/master'
 new df2f8d7  Merge remote-tracking branch 'upstream/master'
 new de708f5  Merge remote-tracking branch 'upstream/master'
 new 5156239  Merge remote-tracking branch 'upstream/master'
 new eca0020  Merge remote-tracking branch 'upstream/master'
 new 239a095  Merge remote-tracking branch 'upstream/master'
 new 41299b5  Merge remote-tracking branch 'upstream/master'
 new a6c94ad  Merge remote-tracking branch 'upstream/master'
 new 1c6a2ea  Merge remote-tracking branch 'upstream/master'
 new 8ee7844  Merge remote-tracking branch 'upstream/master'
 new e19b4dc  Merge remote-tracking branch 'upstream/master'
 new ec7d840  Merge remote-tracking branch 'upstream/master'
 new 242d844  Merge remote-tracking branch 'upstream/master'
 new c85604e  Merge remote-tracking branch 'upstream/master'
 new 1e5de45  Merge remote-tracking branch 'upstream/master'
 new f5731b1  Merge remote-tracking branch 'upstream/master'
 new 7706ab1  Merge remote-tracking branch 'upstream/master'
 new f748050  Merge remote-tracking branch 'upstream/master'
 new 05822f0  Merge remote-tracking branch 'upstream/master'
 new 097958c  Merge remote-tracking branch 'upstream/master'
 new a56c28d  Merge remote-tracking branch 'upstream/master'
 new 2c679c3  Merge remote-tracking branch 'upstream/master'
 new a06e8ec  Merge remote-tracking branch 'upstream/master'
 new 7c777fe  Merge remote-tracking branch 'upstream/master'
 new c9e8bf7  Merge remote-tracking branch 'upstream/master'
 new a53e562  SAMZA-1964 Make getTableSpec() in RemoteTableDescriptor 
reentrant
 new 89bfc14  Merge remote-tracking branch 'upstream/master'
 new 9c12120  Merge remote-tracking branch 'upstream/master'
 new e82daa1  Merge remote-tracking branch 'upstream/master'
 new b0c544a  Merge remote-tracking branch 'upstream/master'
 new cc49da0  Merge remote-tracking branch 'upstream/master'
 new cd14501  Merge remote-tracking branch 'upstream/master'
 new 043523d  Merge remote-tracking branch 'upstream/master'
 new 9f6aa1f  Merge remote-tracking branch 'upstream/master'
 new 9e05362  Merge remote-tracking branch 'upstream/master'
 new 49ea480  Merge remote-tracking branch 'upstream/master'
 new 160f1d7  Merge remote-tracking branch 'upstream/master'
 new 094ceb4  Merge remote-tracking branch 'upstream/master'
 new 25a4a16  Merge remote-tracking branch 'upstream/master'
 new cfd483e  Merge remote-tracking branch 'upstream/master'
 new c738ee6  Merge remote-tracking branch 'upstream/master'
 new d8bf7fb  Merge remote-tracking branch 'upstream/master'
 new 327b076  Merge remote-tracking branch 'upstream/master'
 new eee8d36  Merge remote-tracking branch 'upstream/master'
 new 0e719e4  Merge remote-tracking branch 'upstream/master'
 new 28b7a88  Merge remote-tracking branch 'upstream/master'
 new a824db6  Merge remote-tracking branch 'upstream/master'
 new 9127d92  Merge remote-tracking branch 'upstream/master'
 new 5fbe3b5  Merge remote-tracking branch 'upstream/master'
 new 6bea33c  Merge remote-tracking branch 'upstream/master'
 new f871463  Merge remote-tracking branch 'upstream/master'
 new 77fd2c3  Merge remote-tracking branch 'upstream/master'
 new f11648b  Merge remote-tracking branch 'upstream/master'
 new 5e28a65  Merge remote-tracking branch 'upstream/master'
 new b8c8a15  Merge remote-tracking branch 'upstream/master'
 new 9f3a1e7  Merge remote-tracking branch 'upstream/master'
 new 8c1821e  Merge remote-tracking branch 'upstream/master'
 new b5abcf2  Merge remote-tracking branch 'upstream/master'
 new 22bd3ac  Merge remote-tracking branch 'upstream/master'
 new 3373012  Merge remote-tracking branch 'upstream/master'
 new 60b42d6  Merge remote-tracking branch 'upstream/master'
 new fc2d5b9  Merge remote-tracking branch 'upstream/master'
 new 1854072  SAMZA-2195: Fix javadoc in SerdeUtils
 new 57cce59  Merge pull request #1035 from weisong44/SAMZA-2195

The 1764 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to 

[GitHub] [samza] weisong44 merged pull request #1035: SAMZA-2195: Fix javadoc in SerdeUtils

2019-05-16 Thread GitBox
weisong44 merged pull request #1035: SAMZA-2195: Fix javadoc in SerdeUtils
URL: https://github.com/apache/samza/pull/1035
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Created] (SAMZA-2195) Fix javadoc in SerdeUtils

2019-05-16 Thread Wei Song (JIRA)
Wei Song created SAMZA-2195:
---

 Summary: Fix javadoc in SerdeUtils
 Key: SAMZA-2195
 URL: https://issues.apache.org/jira/browse/SAMZA-2195
 Project: Samza
  Issue Type: Improvement
Reporter: Wei Song
Assignee: Wei Song


As per subject



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [samza] weisong44 opened a new pull request #1035: SAMZA-2195: Fix javadoc in SerdeUtils

2019-05-16 Thread GitBox
weisong44 opened a new pull request #1035: SAMZA-2195: Fix javadoc in SerdeUtils
URL: https://github.com/apache/samza/pull/1035
 
 
   As per subject


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [samza] dnishimura commented on a change in pull request #1027: SAMZA-2046: Startpoint fan out implementation

2019-05-16 Thread GitBox
dnishimura commented on a change in pull request #1027: SAMZA-2046: Startpoint 
fan out implementation
URL: https://github.com/apache/samza/pull/1027#discussion_r284799604
 
 

 ##
 File path: 
samza-core/src/main/java/org/apache/samza/startpoint/StartpointObjectMapper.java
 ##
 @@ -39,12 +40,16 @@ static ObjectMapper getObjectMapper() {
 SimpleModule module = new SimpleModule("StartpointModule", new Version(1, 
0, 0, ""));
 module.addSerializer(Instant.class, new CustomInstantSerializer());
 module.addDeserializer(Instant.class, new CustomInstantDeserializer());
-
 objectMapper.registerModule(module);
-objectMapper.registerSubtypes(StartpointSpecific.class);
-objectMapper.registerSubtypes(StartpointTimestamp.class);
-objectMapper.registerSubtypes(StartpointUpcoming.class);
-objectMapper.registerSubtypes(StartpointOldest.class);
+
+// 1. To support polymorphism for serialization, the Startpoint subtypes 
must be registered here.
+// 2. The NamedType container class provides a logical name as an external 
identifier so that the full canonical
 
 Review comment:
   Not necessarily easier, but it seems to be a best practice based on the 
examples I've seen. Underneath the hood, it just uses the simple class name and 
not the full canonical name, so ordering doesn't matter. The `NamedType` 
container constructor also has a 2nd optional parameter where we can specify 
the logical name ourselves too.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [samza] cameronlee314 commented on a change in pull request #1027: SAMZA-2046: Startpoint fan out implementation

2019-05-16 Thread GitBox
cameronlee314 commented on a change in pull request #1027: SAMZA-2046: 
Startpoint fan out implementation
URL: https://github.com/apache/samza/pull/1027#discussion_r284796109
 
 

 ##
 File path: 
samza-core/src/main/java/org/apache/samza/startpoint/StartpointObjectMapper.java
 ##
 @@ -39,12 +40,16 @@ static ObjectMapper getObjectMapper() {
 SimpleModule module = new SimpleModule("StartpointModule", new Version(1, 
0, 0, ""));
 module.addSerializer(Instant.class, new CustomInstantSerializer());
 module.addDeserializer(Instant.class, new CustomInstantDeserializer());
-
 objectMapper.registerModule(module);
-objectMapper.registerSubtypes(StartpointSpecific.class);
-objectMapper.registerSubtypes(StartpointTimestamp.class);
-objectMapper.registerSubtypes(StartpointUpcoming.class);
-objectMapper.registerSubtypes(StartpointOldest.class);
+
+// 1. To support polymorphism for serialization, the Startpoint subtypes 
must be registered here.
+// 2. The NamedType container class provides a logical name as an external 
identifier so that the full canonical
 
 Review comment:
   Just double checking: Does the "logical name" allow for easily adding new 
types in the future? Is there a certain way new types need to be added? For 
example, if the "logical name" is just a counter, then we would need to add a 
new subtype at the end of the list.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [samza] dnishimura commented on a change in pull request #1027: SAMZA-2046: Startpoint fan out implementation

2019-05-16 Thread GitBox
dnishimura commented on a change in pull request #1027: SAMZA-2046: Startpoint 
fan out implementation
URL: https://github.com/apache/samza/pull/1027#discussion_r284751424
 
 

 ##
 File path: samza-api/src/main/java/org/apache/samza/startpoint/Startpoint.java
 ##
 @@ -22,11 +22,13 @@
 import com.google.common.base.Objects;
 import java.time.Instant;
 import org.apache.samza.annotation.InterfaceStability;
+import org.codehaus.jackson.annotate.JsonTypeInfo;
 
 /**
  * Startpoint represents a position in a stream partition.
  */
 @InterfaceStability.Evolving
+@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, 
property = "@type")
 
 Review comment:
   [There seems to be a way](https://stackoverflow.com/a/41982776), but it's 
not working when I try it out for some reason. It may be due to the fact that 
we're on a really old version of `jackson`. The version we're using is so old 
that the recent versions of `jackson` have different package names. I'll leave 
the annotation for now and investigate moving us to a newer version of jackson 
in a separate PR/ticket.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [samza] dnishimura commented on a change in pull request #1027: SAMZA-2046: Startpoint fan out implementation

2019-05-16 Thread GitBox
dnishimura commented on a change in pull request #1027: SAMZA-2046: Startpoint 
fan out implementation
URL: https://github.com/apache/samza/pull/1027#discussion_r284754923
 
 

 ##
 File path: 
samza-core/src/main/java/org/apache/samza/startpoint/StartpointManager.java
 ##
 @@ -139,21 +169,26 @@ public Startpoint readStartpoint(SystemStreamPartition 
ssp) {
* @param taskName The {@link TaskName} to fetch the {@link Startpoint} for.
* @return {@link Startpoint} for the {@link SystemStreamPartition}, or null 
if it does not exist or if it is too stale.
 
 Review comment:
   Will do.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [samza] dnishimura commented on a change in pull request #1027: SAMZA-2046: Startpoint fan out implementation

2019-05-16 Thread GitBox
dnishimura commented on a change in pull request #1027: SAMZA-2046: Startpoint 
fan out implementation
URL: https://github.com/apache/samza/pull/1027#discussion_r284754194
 
 

 ##
 File path: 
samza-core/src/main/java/org/apache/samza/startpoint/StartpointManager.java
 ##
 @@ -129,7 +159,7 @@ public void writeStartpoint(SystemStreamPartition ssp, 
TaskName taskName, Startp
* @param ssp The {@link SystemStreamPartition} to fetch the {@link 
Startpoint} for.
* @return {@link Startpoint} for the {@link SystemStreamPartition}, or null 
if it does not exist or if it is too stale
 
 Review comment:
   Will do.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [samza] dnishimura commented on a change in pull request #1027: SAMZA-2046: Startpoint fan out implementation

2019-05-16 Thread GitBox
dnishimura commented on a change in pull request #1027: SAMZA-2046: Startpoint 
fan out implementation
URL: https://github.com/apache/samza/pull/1027#discussion_r284753937
 
 

 ##
 File path: 
samza-core/src/main/java/org/apache/samza/startpoint/StartpointManager.java
 ##
 @@ -20,87 +20,117 @@
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.ImmutableMap;
+import java.io.IOException;
 import java.time.Duration;
 import java.time.Instant;
-import java.util.HashSet;
-import java.util.Objects;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.commons.collections4.MapUtils;
+import org.apache.commons.lang3.ArrayUtils;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.samza.SamzaException;
-import org.apache.samza.config.Config;
 import org.apache.samza.container.TaskName;
 import 
org.apache.samza.coordinator.metadatastore.NamespaceAwareCoordinatorStreamStore;
-import org.apache.samza.job.model.ContainerModel;
-import org.apache.samza.job.model.JobModel;
-import org.apache.samza.job.model.TaskModel;
 import org.apache.samza.metadatastore.MetadataStore;
-import org.apache.samza.metadatastore.MetadataStoreFactory;
-import org.apache.samza.metrics.MetricsRegistry;
-import org.apache.samza.serializers.JsonSerdeV2;
 import org.apache.samza.system.SystemStreamPartition;
+import org.codehaus.jackson.map.ObjectMapper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 
 /**
- * The StartpointManager reads and writes {@link Startpoint} to the {@link 
MetadataStore} defined by
- * the configuration task.startpoint.metadata.store.factory.
- *
- * Startpoints are keyed in the MetadataStore by two different formats:
- * 1) Only by {@link SystemStreamPartition}
- * 2) A combination of {@link SystemStreamPartition} and {@link TaskName}
+ * The StartpointManager reads and writes {@link Startpoint} to the provided 
{@link MetadataStore}
  *
  * The intention for the StartpointManager is to maintain a strong contract 
between the caller
  * and how Startpoints are stored in the underlying MetadataStore.
+ *
+ * Startpoints are written in the MetadataStore using keys of two different 
formats:
+ * 1) {@link SystemStreamPartition} only
+ * 2) A combination of {@link SystemStreamPartition} and {@link TaskName}
+ *
+ * Startpoints are then fanned out to a fan out namespace in the MetadataStore 
by the
+ * {@link org.apache.samza.clustermanager.ClusterBasedJobCoordinator} or the 
standalone
+ * {@link org.apache.samza.coordinator.JobCoordinator} upon startup and the
+ * {@link org.apache.samza.checkpoint.OffsetManager} gets the fan outs to set 
the starting offsets per task and per
+ * {@link SystemStreamPartition}. The fan outs are deleted once the offsets 
are committed to the checkpoint.
+ *
+ * The read, write and delete methods are intended for external callers.
+ * The fan out methods are intended to be used within a job coordinator.
  */
 public class StartpointManager {
-  private static final Logger LOG = 
LoggerFactory.getLogger(StartpointManager.class);
-  public static final String NAMESPACE = "samza-startpoint-v1";
+  private static final Integer VERSION = 1;
+  public static final String NAMESPACE = "samza-startpoint-v" + VERSION;
 
   static final Duration DEFAULT_EXPIRATION_DURATION = Duration.ofHours(12);
 
-  private final MetadataStore metadataStore;
-  private final StartpointSerde startpointSerde = new StartpointSerde();
-
-  private boolean stopped = false;
+  private static final Logger LOG = 
LoggerFactory.getLogger(StartpointManager.class);
+  private static final String NAMESPACE_FAN_OUT = NAMESPACE + "-fan-out";
 
-  /**
-   *  Constructs a {@link StartpointManager} instance by instantiating a new 
metadata store connection.
-   *  This is primarily used for testing.
-   */
-  @VisibleForTesting
-  StartpointManager(MetadataStoreFactory metadataStoreFactory, Config config, 
MetricsRegistry metricsRegistry) {
-Preconditions.checkNotNull(metadataStoreFactory, "MetadataStoreFactory 
cannot be null");
-Preconditions.checkNotNull(config, "Config cannot be null");
-Preconditions.checkNotNull(metricsRegistry, "MetricsRegistry cannot be 
null");
+  private final MetadataStore metadataStore;
 
 Review comment:
   True, it's no longer needed. Will remove.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [samza] dnishimura commented on a change in pull request #1027: SAMZA-2046: Startpoint fan out implementation

2019-05-16 Thread GitBox
dnishimura commented on a change in pull request #1027: SAMZA-2046: Startpoint 
fan out implementation
URL: https://github.com/apache/samza/pull/1027#discussion_r284756502
 
 

 ##
 File path: 
samza-core/src/main/java/org/apache/samza/startpoint/StartpointObjectMapper.java
 ##
 @@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.startpoint;
+
+import java.io.IOException;
+import java.time.Instant;
+import org.codehaus.jackson.JsonGenerator;
+import org.codehaus.jackson.JsonParser;
+import org.codehaus.jackson.JsonProcessingException;
+import org.codehaus.jackson.Version;
+import org.codehaus.jackson.map.DeserializationContext;
+import org.codehaus.jackson.map.JsonDeserializer;
+import org.codehaus.jackson.map.JsonSerializer;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.map.SerializerProvider;
+import org.codehaus.jackson.map.module.SimpleModule;
+
+
+class StartpointObjectMapper {
+
+  static ObjectMapper getObjectMapper() {
+ObjectMapper objectMapper = new ObjectMapper();
+SimpleModule module = new SimpleModule("StartpointModule", new Version(1, 
0, 0, ""));
+module.addSerializer(Instant.class, new CustomInstantSerializer());
+module.addDeserializer(Instant.class, new CustomInstantDeserializer());
+
+objectMapper.registerModule(module);
 
 Review comment:
   I can't think of any way other than that the new serde tests for any new 
Startpoints will surface the requirement for registering the subtype. I'll 
leave a comment here also.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [samza] dnishimura commented on a change in pull request #1027: SAMZA-2046: Startpoint fan out implementation

2019-05-16 Thread GitBox
dnishimura commented on a change in pull request #1027: SAMZA-2046: Startpoint 
fan out implementation
URL: https://github.com/apache/samza/pull/1027#discussion_r284753684
 
 

 ##
 File path: 
samza-core/src/main/java/org/apache/samza/startpoint/StartpointFanOutPerTask.java
 ##
 @@ -20,33 +20,45 @@
 
 import com.google.common.base.MoreObjects;
 import com.google.common.base.Objects;
-import org.apache.samza.container.TaskName;
+import java.time.Instant;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.samza.serializers.model.SamzaObjectMapper;
 import org.apache.samza.system.SystemStreamPartition;
+import org.codehaus.jackson.map.annotate.JsonDeserialize;
 import org.codehaus.jackson.map.annotate.JsonSerialize;
 
 
-@JsonSerialize(using = StartpointKeySerializer.class)
-class StartpointKey {
-  private final SystemStreamPartition systemStreamPartition;
-  private final TaskName taskName;
+/**
+ * Holds the {@link Startpoint} fan outs for each {@link 
SystemStreamPartition}. Each StartpointFanOutPerTask maps to a
+ * {@link org.apache.samza.container.TaskName}
+ */
+class StartpointFanOutPerTask {
+  @JsonSerialize
+  @JsonDeserialize
+  private final Instant timestamp;
+
+  @JsonDeserialize(keyUsing = 
SamzaObjectMapper.SystemStreamPartitionKeyDeserializer.class)
 
 Review comment:
   Related to the above comment, adding it to the `StartpointObjectMapper` via 
`SimpleModule#addKeySerializer` and `SimpleModule#addKeyDeserializer` doesn't 
seem to work when the map is nested in an object. Again, might be due to the 
old version of `jackson` we're on. I'll let comments to remove the annotations 
if we can consolidate all serialization code into the `StartpointObjectMapper` 
with a newer version of `jackson`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [samza] dnishimura commented on a change in pull request #1027: SAMZA-2046: Startpoint fan out implementation

2019-05-16 Thread GitBox
dnishimura commented on a change in pull request #1027: SAMZA-2046: Startpoint 
fan out implementation
URL: https://github.com/apache/samza/pull/1027#discussion_r284757971
 
 

 ##
 File path: 
samza-core/src/test/java/org/apache/samza/startpoint/TestStartpointSerde.java
 ##
 @@ -18,45 +18,47 @@
  */
 package org.apache.samza.startpoint;
 
+import java.io.IOException;
+import org.codehaus.jackson.map.ObjectMapper;
 import org.junit.Assert;
 import org.junit.Test;
 
 public class TestStartpointSerde {
 
 Review comment:
   Good point, will do


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [samza] dnishimura commented on a change in pull request #1027: SAMZA-2046: Startpoint fan out implementation

2019-05-16 Thread GitBox
dnishimura commented on a change in pull request #1027: SAMZA-2046: Startpoint 
fan out implementation
URL: https://github.com/apache/samza/pull/1027#discussion_r284754067
 
 

 ##
 File path: 
samza-core/src/main/java/org/apache/samza/startpoint/StartpointManager.java
 ##
 @@ -20,87 +20,117 @@
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.ImmutableMap;
+import java.io.IOException;
 import java.time.Duration;
 import java.time.Instant;
-import java.util.HashSet;
-import java.util.Objects;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.commons.collections4.MapUtils;
+import org.apache.commons.lang3.ArrayUtils;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.samza.SamzaException;
-import org.apache.samza.config.Config;
 import org.apache.samza.container.TaskName;
 import 
org.apache.samza.coordinator.metadatastore.NamespaceAwareCoordinatorStreamStore;
-import org.apache.samza.job.model.ContainerModel;
-import org.apache.samza.job.model.JobModel;
-import org.apache.samza.job.model.TaskModel;
 import org.apache.samza.metadatastore.MetadataStore;
-import org.apache.samza.metadatastore.MetadataStoreFactory;
-import org.apache.samza.metrics.MetricsRegistry;
-import org.apache.samza.serializers.JsonSerdeV2;
 import org.apache.samza.system.SystemStreamPartition;
+import org.codehaus.jackson.map.ObjectMapper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 
 /**
- * The StartpointManager reads and writes {@link Startpoint} to the {@link 
MetadataStore} defined by
- * the configuration task.startpoint.metadata.store.factory.
- *
- * Startpoints are keyed in the MetadataStore by two different formats:
- * 1) Only by {@link SystemStreamPartition}
- * 2) A combination of {@link SystemStreamPartition} and {@link TaskName}
+ * The StartpointManager reads and writes {@link Startpoint} to the provided 
{@link MetadataStore}
  *
  * The intention for the StartpointManager is to maintain a strong contract 
between the caller
  * and how Startpoints are stored in the underlying MetadataStore.
+ *
+ * Startpoints are written in the MetadataStore using keys of two different 
formats:
+ * 1) {@link SystemStreamPartition} only
+ * 2) A combination of {@link SystemStreamPartition} and {@link TaskName}
+ *
+ * Startpoints are then fanned out to a fan out namespace in the MetadataStore 
by the
+ * {@link org.apache.samza.clustermanager.ClusterBasedJobCoordinator} or the 
standalone
+ * {@link org.apache.samza.coordinator.JobCoordinator} upon startup and the
+ * {@link org.apache.samza.checkpoint.OffsetManager} gets the fan outs to set 
the starting offsets per task and per
+ * {@link SystemStreamPartition}. The fan outs are deleted once the offsets 
are committed to the checkpoint.
+ *
+ * The read, write and delete methods are intended for external callers.
+ * The fan out methods are intended to be used within a job coordinator.
  */
 public class StartpointManager {
-  private static final Logger LOG = 
LoggerFactory.getLogger(StartpointManager.class);
-  public static final String NAMESPACE = "samza-startpoint-v1";
+  private static final Integer VERSION = 1;
+  public static final String NAMESPACE = "samza-startpoint-v" + VERSION;
 
   static final Duration DEFAULT_EXPIRATION_DURATION = Duration.ofHours(12);
 
-  private final MetadataStore metadataStore;
-  private final StartpointSerde startpointSerde = new StartpointSerde();
-
-  private boolean stopped = false;
+  private static final Logger LOG = 
LoggerFactory.getLogger(StartpointManager.class);
+  private static final String NAMESPACE_FAN_OUT = NAMESPACE + "-fan-out";
 
-  /**
-   *  Constructs a {@link StartpointManager} instance by instantiating a new 
metadata store connection.
-   *  This is primarily used for testing.
-   */
-  @VisibleForTesting
-  StartpointManager(MetadataStoreFactory metadataStoreFactory, Config config, 
MetricsRegistry metricsRegistry) {
-Preconditions.checkNotNull(metadataStoreFactory, "MetadataStoreFactory 
cannot be null");
-Preconditions.checkNotNull(config, "Config cannot be null");
-Preconditions.checkNotNull(metricsRegistry, "MetricsRegistry cannot be 
null");
+  private final MetadataStore metadataStore;
+  private final NamespaceAwareCoordinatorStreamStore fanOutStore;
+  private final NamespaceAwareCoordinatorStreamStore readWriteStore;
+  private final ObjectMapper objectMapper = 
StartpointObjectMapper.getObjectMapper();
 
-this.metadataStore = metadataStoreFactory.getMetadataStore(NAMESPACE, 
config, metricsRegistry);
-LOG.info("StartpointManager created with metadata store: {}", 

[GitHub] [samza] dnishimura commented on a change in pull request #1027: SAMZA-2046: Startpoint fan out implementation

2019-05-16 Thread GitBox
dnishimura commented on a change in pull request #1027: SAMZA-2046: Startpoint 
fan out implementation
URL: https://github.com/apache/samza/pull/1027#discussion_r284756719
 
 

 ##
 File path: 
samza-core/src/test/java/org/apache/samza/startpoint/TestStartpointFanOutPerTaskSerde.java
 ##
 @@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.startpoint;
+
+import java.io.IOException;
+import java.time.Instant;
+import org.apache.samza.Partition;
+import org.apache.samza.system.SystemStreamPartition;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.junit.Assert;
+import org.junit.Test;
+
+
+public class TestStartpointFanOutPerTaskSerde {
 
 Review comment:
   Sure


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services