[jira] [Reopened] (SAMZA-2005) Validate Sql
[ https://issues.apache.org/jira/browse/SAMZA-2005?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Weiqing Yang reopened SAMZA-2005: - > Validate Sql > > > Key: SAMZA-2005 > URL: https://issues.apache.org/jira/browse/SAMZA-2005 > Project: Samza > Issue Type: Sub-task >Reporter: Weiqing Yang >Priority: Major > > We can implement Samza SQL validation: Syntax, resource presence, schema > presence, etc. (including unit tests with Samza SQL test framework) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (SAMZA-1902) Support different data systems
[ https://issues.apache.org/jira/browse/SAMZA-1902?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Weiqing Yang resolved SAMZA-1902. - Resolution: Fixed > Support different data systems > -- > > Key: SAMZA-1902 > URL: https://issues.apache.org/jira/browse/SAMZA-1902 > Project: Samza > Issue Type: Sub-task >Reporter: Weiqing Yang >Priority: Major > > Currently the Shell can only talk to Kafka system, but we may need to use a > general way to connect to different systems. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (SAMZA-2027) Config support in through Set statement
[ https://issues.apache.org/jira/browse/SAMZA-2027?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Weiqing Yang resolved SAMZA-2027. - Resolution: Fixed > Config support in through Set statement > --- > > Key: SAMZA-2027 > URL: https://issues.apache.org/jira/browse/SAMZA-2027 > Project: Samza > Issue Type: Sub-task > Components: sql >Reporter: Weiqing Yang >Priority: Major > > Supports environment variables for SqlExecutors. User can now set environment > variables for the shell itself and for any SqlExecutor, by executing the > 'SET' command in the shell or by using a configuration file. User can also > choose to use any SqlExecutor they want in the same way. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (SAMZA-2005) Validate Sql
[ https://issues.apache.org/jira/browse/SAMZA-2005?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Weiqing Yang resolved SAMZA-2005. - Resolution: Fixed > Validate Sql > > > Key: SAMZA-2005 > URL: https://issues.apache.org/jira/browse/SAMZA-2005 > Project: Samza > Issue Type: Sub-task >Reporter: Weiqing Yang >Priority: Major > > We can implement Samza SQL validation: Syntax, resource presence, schema > presence, etc. (including unit tests with Samza SQL test framework) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Reopened] (SAMZA-1901) Samza SQL Shell
[ https://issues.apache.org/jira/browse/SAMZA-1901?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Weiqing Yang reopened SAMZA-1901: - > Samza SQL Shell > --- > > Key: SAMZA-1901 > URL: https://issues.apache.org/jira/browse/SAMZA-1901 > Project: Samza > Issue Type: New Feature > Components: sql >Reporter: Weiqing Yang >Priority: Major > Fix For: 0.15.0 > > > This Jira implements the first version of Samza sql shell. > Please refer to the attached document for more details about the shell, > including the tech choices, features, design decisions, how to build, run and > debug the shell. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (SAMZA-1901) Samza SQL Shell
[ https://issues.apache.org/jira/browse/SAMZA-1901?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Weiqing Yang resolved SAMZA-1901. - Resolution: Fixed > Samza SQL Shell > --- > > Key: SAMZA-1901 > URL: https://issues.apache.org/jira/browse/SAMZA-1901 > Project: Samza > Issue Type: New Feature > Components: sql >Reporter: Weiqing Yang >Priority: Major > Fix For: 0.15.0 > > > This Jira implements the first version of Samza sql shell. > Please refer to the attached document for more details about the shell, > including the tech choices, features, design decisions, how to build, run and > debug the shell. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[samza] branch 1.2.0 created (now f80659d)
This is an automated email from the ASF dual-hosted git repository. boryas pushed a change to branch 1.2.0 in repository https://gitbox.apache.org/repos/asf/samza.git. at f80659d replaced verstion to be 1.2.0 This branch includes the following new commits: new f80659d replaced verstion to be 1.2.0 The 1 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference.
[samza] 01/01: replaced verstion to be 1.2.0
This is an automated email from the ASF dual-hosted git repository. boryas pushed a commit to branch 1.2.0 in repository https://gitbox.apache.org/repos/asf/samza.git commit f80659d999e9e6b7b3377abeacccdd863e42b19b Author: Boris S AuthorDate: Thu May 16 17:34:18 2019 -0700 replaced verstion to be 1.2.0 --- gradle.properties | 2 +- samza-test/src/main/config/join/README | 8 samza-test/src/main/python/configs/tests.json | 2 +- samza-test/src/main/python/stream_processor.py | 2 +- 4 files changed, 7 insertions(+), 7 deletions(-) diff --git a/gradle.properties b/gradle.properties index 20b4b9f..11dd29f 100644 --- a/gradle.properties +++ b/gradle.properties @@ -15,7 +15,7 @@ # specific language governing permissions and limitations # under the License. group=org.apache.samza -version=1.1.1-SNAPSHOT +version=1.2.0 scalaSuffix=2.11 gradleVersion=4.8 diff --git a/samza-test/src/main/config/join/README b/samza-test/src/main/config/join/README index 37bb354..4130553 100644 --- a/samza-test/src/main/config/join/README +++ b/samza-test/src/main/config/join/README @@ -44,17 +44,17 @@ Deploy Zookeeper, YARN and Kafka: > cd $HELLO_SAMZA_SRC > for i in zookeeper kafka yarn; do ./bin/grid install $i; ./bin/grid start > $i; done -Update the "yarn.package.path" to $DEPLOY_DIR/samza-test_2.11-1.1.1-SNAPSHOT.tgz +Update the "yarn.package.path" to $DEPLOY_DIR/samza-test_2.11-1.2.0.tgz > cd $SAMZA_SRC > vi samza-test/src/main/config/join/common.properties -yarn.package.path=file:///path/to/samza-hello-samza/deploy/samza-test_2.11-1.1.1-SNAPSHOT.tgz +yarn.package.path=file:///path/to/samza-hello-samza/deploy/samza-test_2.11-1.2.0.tgz Then release and extract the test tarball: > cd $SAMZA_SRC > ./gradlew releaseTestJobs -> cp samza-test/build/distributions/samza-test_2.11-1.1.1-SNAPSHOT.tgz $DEPLOY_DIR +> cp samza-test/build/distributions/samza-test_2.11-1.2.0.tgz $DEPLOY_DIR > mkdir $DEPLOY_DIR/samza -> tar -xvf $DEPLOY_DIR/samza-test_2.11-1.1.1-SNAPSHOT.tgz -C $DEPLOY_DIR/samza +> tar -xvf $DEPLOY_DIR/samza-test_2.11-1.2.0.tgz -C $DEPLOY_DIR/samza Finally, create the kafka topics and start the samza jobs: > ./bin/setup-int-test.sh $DEPLOY_DIR diff --git a/samza-test/src/main/python/configs/tests.json b/samza-test/src/main/python/configs/tests.json index 0b0ab92..e537367 100644 --- a/samza-test/src/main/python/configs/tests.json +++ b/samza-test/src/main/python/configs/tests.json @@ -1,5 +1,5 @@ { - "samza_executable": "samza-test_2.11-1.1.1-SNAPSHOT.tgz", + "samza_executable": "samza-test_2.11-1.2.0.tgz", "samza_install_path": "deploy/smoke_tests", "samza_config_factory": "org.apache.samza.config.factories.PropertiesConfigFactory" } diff --git a/samza-test/src/main/python/stream_processor.py b/samza-test/src/main/python/stream_processor.py index dacd59c..2fef894 100644 --- a/samza-test/src/main/python/stream_processor.py +++ b/samza-test/src/main/python/stream_processor.py @@ -43,7 +43,7 @@ class StreamProcessor: logger.info('Running processor start command: {0}'.format(self.processor_start_command)) self.deployment_config = { 'install_path': os.path.join(runtime.get_active_config('remote_install_path'), 'deploy/{0}'.format(self.processor_id)), -'executable': 'samza-test_2.11-1.0.1-SNAPSHOT.tgz', +'executable': 'samza-test_2.11-1.2.0.tgz', 'post_install_cmds': [], 'start_command': self.processor_start_command, 'stop_command': '',
[GitHub] [samza] weisong44 commented on a change in pull request #1031: SAMZA-2191: Batch support for table APIs
weisong44 commented on a change in pull request #1031: SAMZA-2191: Batch support for table APIs URL: https://github.com/apache/samza/pull/1031#discussion_r284931281 ## File path: docs/learn/documentation/versioned/api/table-api.md ## @@ -296,6 +299,29 @@ Couchbase is supported as remote table. See [`CouchbaseTableReadFunction`](https://github.com/apache/samza/blob/master/samza-kv-couchbase/src/main/java/org/apache/samza/table/remote/couchbase/CouchbaseTableReadFunction.java) and [`CouchbaseTableWriteFunction`](https://github.com/apache/samza/blob/master/samza-kv-couchbase/src/main/java/org/apache/samza/table/remote/couchbase/CouchbaseTableWriteFunction.java). +### Batching + +Remote Table has built-in client-side batching support in both of its sync and async executions. +This is useful when a remote data store supports batch processing and is not sophisticated enough +to handle heavy inbound requests. + + Configuration + +Batching can be enabled with [`RemoteTableDescriptor`](https://github.com/apache/samza/blob/master/samza-core/src/main/java/org/apache/samza/table/remote/RemoteTableDescriptor.java) +by providing a [`BatchProvider`](https://github.com/apache/samza/samza-api/src/main/java/org/apache/samza/table/batching/BatchProvider.java) +The user can choose: + +1. A [`CompactBatchProvider`](https://github.com/apache/samza/samza-core/src/main/java/org/apache/samza/table/batching/CompactBatchProvider.java) which provides a batch such that + the operations are compacted the the key. For update operations, the latter update will override the value of the previous one when they have the same key. For query operations, Review comment: Remove "the" This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [samza] weisong44 commented on a change in pull request #1031: SAMZA-2191: Batch support for table APIs
weisong44 commented on a change in pull request #1031: SAMZA-2191: Batch support for table APIs URL: https://github.com/apache/samza/pull/1031#discussion_r284935113 ## File path: samza-test/src/test/java/org/apache/samza/test/table/TestRemoteTableWithBatchEndToEnd.java ## @@ -0,0 +1,214 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.test.table; + +import java.io.IOException; +import java.io.ObjectInputStream; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.function.Function; +import java.util.stream.Collectors; +import org.apache.samza.application.StreamApplication; +import org.apache.samza.config.Config; +import org.apache.samza.config.MapConfig; +import org.apache.samza.operators.KV; +import org.apache.samza.runtime.LocalApplicationRunner; +import org.apache.samza.serializers.NoOpSerde; +import org.apache.samza.system.descriptors.DelegatingSystemDescriptor; +import org.apache.samza.system.descriptors.GenericInputDescriptor; +import org.apache.samza.table.Table; +import org.apache.samza.table.batching.CompactBatchProvider; +import org.apache.samza.table.descriptors.RemoteTableDescriptor; +import org.apache.samza.table.remote.TableRateLimiter; +import org.apache.samza.table.remote.TableReadFunction; +import org.apache.samza.table.remote.TableWriteFunction; +import org.apache.samza.test.harness.IntegrationTestHarness; +import org.apache.samza.test.util.Base64Serializer; +import org.apache.samza.util.RateLimiter; +import org.junit.Assert; +import org.junit.Test; + +import static org.apache.samza.test.table.TestTableData.*; +import static org.mockito.Mockito.*; + + +public class TestRemoteTableWithBatchEndToEnd extends IntegrationTestHarness { + + static Map> writtenRecords = new HashMap<>(); + + static class InMemoryReadFunction implements TableReadFunction { +private final String serializedProfiles; +private transient Map profileMap; + +private InMemoryReadFunction(String profiles) { + this.serializedProfiles = profiles; +} + +private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException { + in.defaultReadObject(); + Profile[] profiles = Base64Serializer.deserialize(this.serializedProfiles, Profile[].class); + this.profileMap = Arrays.stream(profiles).collect(Collectors.toMap(p -> p.getMemberId(), Function.identity())); +} + +@Override +public CompletableFuture getAsync(Integer key) { + return CompletableFuture.completedFuture(profileMap.get(key)); +} + +@Override +public boolean isRetriable(Throwable exception) { + return false; +} + +static InMemoryReadFunction getInMemoryReadFunction(String serializedProfiles) { + return new InMemoryReadFunction(serializedProfiles); +} + } + + static class InMemoryWriteFunction implements TableWriteFunction { +private transient List records; +private String testName; + +public InMemoryWriteFunction(String testName) { + this.testName = testName; +} + +// Verify serializable functionality +private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException { + in.defaultReadObject(); + + // Write to the global list for verification + records = writtenRecords.get(testName); +} + +@Override +public CompletableFuture putAsync(Integer key, EnrichedPageView record) { + records.add(record); + return CompletableFuture.completedFuture(null); +} + +@Override +public CompletableFuture deleteAsync(Integer key) { + records.remove(key); + return CompletableFuture.completedFuture(null); +} + +@Override +public boolean isRetriable(Throwable exception) { + return false; +} + } + + + static class MyReadFunction implements TableReadFunction { +@Override +public CompletableFuture getAsync(Object key) { + return null; +} + +@Override +public boolean isRetriable(Throwable exception) { + return false; +} + } + +
[GitHub] [samza] weisong44 commented on a change in pull request #1031: SAMZA-2191: Batch support for table APIs
weisong44 commented on a change in pull request #1031: SAMZA-2191: Batch support for table APIs URL: https://github.com/apache/samza/pull/1031#discussion_r284933428 ## File path: samza-core/src/main/java/org/apache/samza/table/batching/BatchProcessor.java ## @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.table.batching; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.ReentrantLock; +import org.apache.samza.table.batch.BatchPolicy; + + +/** + * Place a sequence of operations into a {@link Batch}. When a batch is not allowed to accept more + * operations, it will handled by a {@link BatchHandler}. Meanwhile, a new {@link Batch} will be + * created and a timer will be set for it. + * + * @param The type of the key associated with the {@link Operation} + * @param The type of the value associated with the {@link Operation} + */ +public class BatchProcessor { + private final ScheduledExecutorService scheduledExecutorService; + private final ReentrantLock lock = new ReentrantLock(); + private final BatchPolicy batchPolicy; + private final BatchHandler batchHandler; + private Batch batch; + private ScheduledFuture scheduledFuture; + + /** + * @param batchHandler Defines how each batch will be processed. + * @param batchPolicy The batch configurations to be used. + * @param scheduledExecutorService A scheduled executor service to set timers for the managed batches. + */ + public BatchProcessor(BatchHandler batchHandler, BatchPolicy batchPolicy, + ScheduledExecutorService scheduledExecutorService) { +Preconditions.checkNotNull(batchHandler); +Preconditions.checkNotNull(batchPolicy); +Preconditions.checkNotNull(scheduledExecutorService); + +this.batchHandler = batchHandler; +this.scheduledExecutorService = scheduledExecutorService; +this.batchPolicy = batchPolicy; +batch = BatchFactory.getBatch(batchHandler, batchPolicy); +setBatchTimer(batch); + } + + private CompletableFuture addOperation(Operation operation) { +if (batch.isEmpty()) { + setBatchTimer(batch); +} else if (batch.isClosed()) { + processBatch(); +} +batch.addOperation(operation); +final CompletableFuture res = batch.getCompletableFuture(); + +if (batch.isClosed()) { Review comment: makes sense! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [samza] weisong44 commented on a change in pull request #1031: SAMZA-2191: Batch support for table APIs
weisong44 commented on a change in pull request #1031: SAMZA-2191: Batch support for table APIs URL: https://github.com/apache/samza/pull/1031#discussion_r284933206 ## File path: samza-core/src/main/java/org/apache/samza/table/batching/BatchProcessor.java ## @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.table.batching; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.ReentrantLock; +import org.apache.samza.table.batch.BatchPolicy; + + +/** + * Place a sequence of operations into a {@link Batch}. When a batch is not allowed to accept more + * operations, it will handled by a {@link BatchHandler}. Meanwhile, a new {@link Batch} will be + * created and a timer will be set for it. + * + * @param The type of the key associated with the {@link Operation} + * @param The type of the value associated with the {@link Operation} + */ +public class BatchProcessor { + private final ScheduledExecutorService scheduledExecutorService; Review comment: Perfect! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [samza] weisong44 commented on a change in pull request #1031: SAMZA-2191: Batch support for table APIs
weisong44 commented on a change in pull request #1031: SAMZA-2191: Batch support for table APIs URL: https://github.com/apache/samza/pull/1031#discussion_r284932743 ## File path: samza-core/src/main/java/org/apache/samza/table/batching/AsyncBatchingTable.java ## @@ -0,0 +1,225 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.table.batching; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ScheduledExecutorService; +import org.apache.samza.SamzaException; +import org.apache.samza.context.Context; +import org.apache.samza.storage.kv.Entry; +import org.apache.samza.table.AsyncReadWriteTable; +import org.apache.samza.table.ReadWriteTable; +import org.apache.samza.table.utils.TableMetricsUtil; + + +/** + * A wrapper of a {@link AsyncReadWriteTable} that supports batch operations. + * + * This batching table does not guarantee any ordering of different operation types within the batch. + * For instance, query(Q) and update(u) operations arrives in the following sequences, Q1, U1, Q2, U2, + * it does not mean the the remote data store will receive the messages in the same order. Instead, + * the operations will be grouped by type and sent via micro batches. For this sequence, Q1 and Q2 will + * be grouped to micro batch B1; U1 and U2 will be grouped to micro batch B2, the implementation class + * can decide the order of the micro batches. + * + * Synchronized table operations (get/put/delete) should be used with caution for the batching feature. + * If the table is used by a single thread, there will be at most one operation in the batch, and the + * batch will be performed when the TTL of the batch window expires. Batching does not make sense in this scenario. + * + * @param The type of the key. + * @param The type of the value. + */ +public class AsyncBatchingTable implements ReadWriteTable { Review comment: For delegation tables, we only need to implement the async methods. So it only need to implement AsyncReadWriteTable This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [samza] mynameborat commented on a change in pull request #938: SAMZA-1531: Support run.id in standalone for batch processing.
mynameborat commented on a change in pull request #938: SAMZA-1531: Support run.id in standalone for batch processing. URL: https://github.com/apache/samza/pull/938#discussion_r284936300 ## File path: samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java ## @@ -1011,7 +1011,7 @@ public void testAgreeingOnSameRunIdForBatch() throws InterruptedException { LocalApplicationRunner localApplicationRunner1 = (LocalApplicationRunner) appRunner1; LocalApplicationRunner localApplicationRunner2 = (LocalApplicationRunner) appRunner2; -assertEquals("RunId of the two processors does not match", localApplicationRunner2.getRunid(), localApplicationRunner1.getRunid()); +assertEquals("RunId of the two processors does not match", localApplicationRunner2.getRunId().orElse(null), localApplicationRunner1.getRunId().orElse(null)); Review comment: You don't have to unwrap the optional for assertEquals. I think it should work out of the box with object comparison. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [samza] weisong44 merged pull request #1034: SAMZA-2185: Add ability to expose remote store specific features in remote table
weisong44 merged pull request #1034: SAMZA-2185: Add ability to expose remote store specific features in remote table URL: https://github.com/apache/samza/pull/1034 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [samza] dengpanyin commented on a change in pull request #1031: SAMZA-2191: Batch support for table APIs
dengpanyin commented on a change in pull request #1031: SAMZA-2191: Batch support for table APIs URL: https://github.com/apache/samza/pull/1031#discussion_r284921485 ## File path: samza-core/src/main/java/org/apache/samza/table/batching/BatchProcessor.java ## @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.table.batching; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.ReentrantLock; +import org.apache.samza.table.batch.BatchPolicy; + + +/** + * Place a sequence of operations into a {@link Batch}. When a batch is not allowed to accept more + * operations, it will handled by a {@link BatchHandler}. Meanwhile, a new {@link Batch} will be + * created and a timer will be set for it. + * + * @param The type of the key associated with the {@link Operation} + * @param The type of the value associated with the {@link Operation} + */ +public class BatchProcessor { + private final ScheduledExecutorService scheduledExecutorService; Review comment: Per our discussion, each java.util.timer has a thread, so we will leave as it is. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [samza] dengpanyin commented on a change in pull request #1031: SAMZA-2191: Batch support for table APIs
dengpanyin commented on a change in pull request #1031: SAMZA-2191: Batch support for table APIs URL: https://github.com/apache/samza/pull/1031#discussion_r284921292 ## File path: samza-core/src/test/java/org/apache/samza/table/batching/TestBatchProcessor.java ## @@ -0,0 +1,167 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.table.batching; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executors; +import java.util.function.Supplier; +import org.apache.samza.table.ReadWriteTable; +import org.apache.samza.table.batch.BatchPolicy; +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Suite; + +import static java.lang.Thread.*; +import static org.mockito.Mockito.*; + +@RunWith(Suite.class) +@Suite.SuiteClasses({ +TestBatchProcessor.TestCreate.class, +TestBatchProcessor.TestUpdatesAndLookup.class, +TestBatchProcessor.TestBatchTriggered.class + }) +public class TestBatchProcessor { Review comment: Added. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [samza] dengpanyin commented on a change in pull request #1031: SAMZA-2191: Batch support for table APIs
dengpanyin commented on a change in pull request #1031: SAMZA-2191: Batch support for table APIs URL: https://github.com/apache/samza/pull/1031#discussion_r284921201 ## File path: samza-core/src/main/java/org/apache/samza/table/batching/DeleteOperation.java ## @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.table.batching; + +import com.google.common.base.Preconditions; + + +/** + * Delete operation. + * + * @param The type of the key. + */ +public class DeleteOperation implements Operation { Review comment: Per our discussion, this needs to modify the putAsyncAll/deleteAsyncAll APIs, will not fix at this time and postpone this optimization. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [samza] dengpanyin commented on a change in pull request #1031: SAMZA-2191: Batch support for table APIs
dengpanyin commented on a change in pull request #1031: SAMZA-2191: Batch support for table APIs URL: https://github.com/apache/samza/pull/1031#discussion_r284921235 ## File path: samza-core/src/main/java/org/apache/samza/table/batching/BatchReadWriteTable.java ## @@ -0,0 +1,217 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.table.batching; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.google.common.collect.Iterables; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ScheduledExecutorService; +import java.util.function.Function; +import java.util.stream.Collectors; +import org.apache.samza.SamzaException; +import org.apache.samza.storage.kv.Entry; +import org.apache.samza.table.AsyncReadWriteTable; +import org.apache.samza.table.ReadWriteTable; +import org.apache.samza.table.batch.BatchPolicy; + +/** + * A wrapper of a {@link AsyncReadWriteTable} that supports batch operations. + * + * This batching table does not guarantee any ordering of different operation types within the batch. + * For instance, query(Q) and update(u) operations arrives in the following sequences, Q1, U1, Q2, U2, + * it does not mean the the remote data store will receive the messages in the same order. Instead, + * the operations will be grouped by type and sent via micro batches. For this sequence, Q1 and Q2 will + * be grouped to micro batch B1; U1 and U2 will be grouped to micro batch B2, the implementation class + * can decide the order of the micro batches. + * + * Synchronized table operations (get/put/delete) should be used with caution for the batching feature. + * If the table is used by a single thread, there will be at most one operation in the batch, and the + * batch will be performed when the TTL of the batch window expires. Batching does not make sense in this scenario. + * + * @param The type of the key. + * @param The type of the value. + */ +public class BatchReadWriteTable implements ReadWriteTable { Review comment: Fixed. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [samza] dengpanyin commented on a change in pull request #1031: SAMZA-2191: Batch support for table APIs
dengpanyin commented on a change in pull request #1031: SAMZA-2191: Batch support for table APIs URL: https://github.com/apache/samza/pull/1031#discussion_r284920904 ## File path: samza-core/src/main/java/org/apache/samza/table/batching/BatchReadWriteTable.java ## @@ -0,0 +1,217 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.table.batching; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.google.common.collect.Iterables; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ScheduledExecutorService; +import java.util.function.Function; +import java.util.stream.Collectors; +import org.apache.samza.SamzaException; +import org.apache.samza.storage.kv.Entry; +import org.apache.samza.table.AsyncReadWriteTable; +import org.apache.samza.table.ReadWriteTable; +import org.apache.samza.table.batch.BatchPolicy; + +/** + * A wrapper of a {@link AsyncReadWriteTable} that supports batch operations. + * + * This batching table does not guarantee any ordering of different operation types within the batch. + * For instance, query(Q) and update(u) operations arrives in the following sequences, Q1, U1, Q2, U2, + * it does not mean the the remote data store will receive the messages in the same order. Instead, + * the operations will be grouped by type and sent via micro batches. For this sequence, Q1 and Q2 will + * be grouped to micro batch B1; U1 and U2 will be grouped to micro batch B2, the implementation class + * can decide the order of the micro batches. + * + * Synchronized table operations (get/put/delete) should be used with caution for the batching feature. + * If the table is used by a single thread, there will be at most one operation in the batch, and the + * batch will be performed when the TTL of the batch window expires. Batching does not make sense in this scenario. + * + * @param The type of the key. + * @param The type of the value. + */ +public class BatchReadWriteTable implements ReadWriteTable { + private final AsyncReadWriteTable table; + private final BatchProcessor batchProcessor; + + /** + * @param table The target table that serves the batch operations. + * @param batchPolicy Batch configs. + */ + public BatchReadWriteTable(AsyncReadWriteTable table, BatchPolicy batchPolicy, + ScheduledExecutorService batchTimerExecutorService) { +Preconditions.checkNotNull(table); +Preconditions.checkNotNull(batchPolicy); +Preconditions.checkNotNull(batchTimerExecutorService); + +this.table = table; +batchProcessor = new BatchProcessor<>(new TableBatchHandler<>(table), batchPolicy, batchTimerExecutorService); + } + + @Override + public V get(K key) { +Preconditions.checkNotNull(key); + +try { + return getAsync(key).get(); +} catch (Exception e) { + throw new SamzaException(e); +} + } + + @Override + public Map getAll(List keys) { +Preconditions.checkNotNull(keys); + +try { + return getAllAsync(keys).get(); +} catch (Exception e) { + throw new SamzaException(e); +} + } + + @Override + public void put(K key, V value) { +Preconditions.checkNotNull(key); + +try { + putAsync(key, value).get(); +} catch (Exception e) { + throw new SamzaException(e); +} + } + + @Override + public void putAll(List> entries) { +Preconditions.checkNotNull(entries); + +try { + putAllAsync(entries).get(); +} catch (Exception e) { + throw new SamzaException(e); +} + } + + @Override + public void delete(K key) { +Preconditions.checkNotNull(key); + +try { + deleteAsync(key).get(); +} catch (Exception e) { + throw new SamzaException(e); +} + } + + @Override + public void deleteAll(List keys) { +Preconditions.checkNotNull(keys); + +try { + deleteAllAsync(keys).get(); +} catch (Exception e) { + throw new SamzaException(e); +} + } + + @Override + public CompletableFuture getAsync(K key) { +Preconditions.checkNotNull(key); + +try { +
[GitHub] [samza] dengpanyin commented on a change in pull request #1031: SAMZA-2191: Batch support for table APIs
dengpanyin commented on a change in pull request #1031: SAMZA-2191: Batch support for table APIs URL: https://github.com/apache/samza/pull/1031#discussion_r284920777 ## File path: samza-core/src/main/java/org/apache/samza/table/batching/Batch.java ## @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.table.batching; + +import java.util.Collection; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; +import org.apache.samza.SamzaException; +import org.apache.samza.table.batch.BatchPolicy; + +/** + * Manages a sequence of {@link Operation}s, which will be performed as a batch. + * A batch can be configured with a {@code batchMaxSize} and/or {@code batchMaxDelay}. + * When the number of operations in the batch exceeds the {@code batchMaxSize} + * or the time window exceeds the {@code batchMaxDelay}, the batch will be performed. + * + * @param The type of the key associated with the {@link Operation} + * @param The type of the value associated with the {@link Operation} + */ +abstract class Batch { + protected final int batchMaxSize; + protected final int batchMaxDelay; + protected final BatchHandler batchHandler; Review comment: Fixed. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [samza] dengpanyin commented on a change in pull request #1031: SAMZA-2191: Batch support for table APIs
dengpanyin commented on a change in pull request #1031: SAMZA-2191: Batch support for table APIs URL: https://github.com/apache/samza/pull/1031#discussion_r284920864 ## File path: samza-core/src/main/java/org/apache/samza/table/batching/Batch.java ## @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.table.batching; + +import java.util.Collection; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; +import org.apache.samza.SamzaException; +import org.apache.samza.table.batch.BatchPolicy; + +/** + * Manages a sequence of {@link Operation}s, which will be performed as a batch. + * A batch can be configured with a {@code batchMaxSize} and/or {@code batchMaxDelay}. + * When the number of operations in the batch exceeds the {@code batchMaxSize} + * or the time window exceeds the {@code batchMaxDelay}, the batch will be performed. + * + * @param The type of the key associated with the {@link Operation} + * @param The type of the value associated with the {@link Operation} + */ +abstract class Batch { + protected final int batchMaxSize; + protected final int batchMaxDelay; + protected final BatchHandler batchHandler; + protected final CompletableFuture completableFuture; + protected boolean closed = false; + + /** + * @param batchHandler Defines how the batch will be performed. + * @param batchPolicy Defines the batch configurations. + */ + Batch(BatchHandler batchHandler, BatchPolicy batchPolicy) { +this.batchHandler = batchHandler; +// The max number of {@link Operation}s that the batch can hold. +this.batchMaxSize = batchPolicy.getBatchMaxSize(); +// The max time that the batch can last before being performed. +this.batchMaxDelay = batchPolicy.getBatchMaxDelay(); +completableFuture = new CompletableFuture<>(); + } + + /** + * Add an operation to the batch. + * + * @param operation The operation to be added. + */ + abstract void addOperation(Operation operation); + + /** + * Close the bach so that it will not accept more operations. + */ + void close() { +closed = true; + } + + /** + * @return Whether the bach can accept more operations. + */ + boolean isClosed() { +return closed; + } + + abstract Collection> getQueryOperations(); + abstract Collection> getUpdateOperations(); + + private List> getPutOperations() { +return getUpdateOperations().stream().filter(op -> op instanceof PutOperation) +.collect(Collectors.toList()); + } + + private List> getDeleteOperations() { +return getUpdateOperations().stream().filter(op -> op instanceof DeleteOperation) +.collect(Collectors.toList()); + } + + /** + * Perform batch operations. + */ + CompletableFuture process() { +return CompletableFuture.allOf( +batchHandler.handleBatchPut(getPutOperations()), +batchHandler.handleBatchDelete(getDeleteOperations()), +batchHandler.handleBatchGet(getQueryOperations())) +.whenComplete((val, throwable) -> { +if (throwable != null) { + completableFuture.completeExceptionally(throwable); + throw new SamzaException("Batch failed", throwable); Review comment: Fixed by removing the throw here This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [samza] dengpanyin commented on a change in pull request #1031: SAMZA-2191: Batch support for table APIs
dengpanyin commented on a change in pull request #1031: SAMZA-2191: Batch support for table APIs URL: https://github.com/apache/samza/pull/1031#discussion_r284920709 ## File path: samza-core/src/main/java/org/apache/samza/table/batching/BatchHandler.java ## @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.table.batching; + +import java.util.Collection; +import java.util.concurrent.CompletableFuture; + + +/** + * Define how the batch operations will be handled. + * + * @param The key type of the operations + * @param The value type of the operations. + */ +public interface BatchHandler { Review comment: The BatchProcessor create a timer for each batch, when the batch timer fires, it should know how to deal with the batch. This BatchHandler class is used to tell the processor how to handle the batch. For table batching, we have a TableBatchHandler. For other non-table use cases, we can implement a new somethingBatchHandler and pass to the BatchProcessor. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [samza] dengpanyin commented on a change in pull request #1031: SAMZA-2191: Batch support for table APIs
dengpanyin commented on a change in pull request #1031: SAMZA-2191: Batch support for table APIs URL: https://github.com/apache/samza/pull/1031#discussion_r284920111 ## File path: samza-core/src/main/java/org/apache/samza/table/batching/BatchProcessor.java ## @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.table.batching; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.ReentrantLock; +import org.apache.samza.table.batch.BatchPolicy; + + +/** + * Place a sequence of operations into a {@link Batch}. When a batch is not allowed to accept more + * operations, it will handled by a {@link BatchHandler}. Meanwhile, a new {@link Batch} will be + * created and a timer will be set for it. + * + * @param The type of the key associated with the {@link Operation} + * @param The type of the value associated with the {@link Operation} + */ +public class BatchProcessor { + private final ScheduledExecutorService scheduledExecutorService; + private final ReentrantLock lock = new ReentrantLock(); + private final BatchPolicy batchPolicy; + private final BatchHandler batchHandler; + private Batch batch; + private ScheduledFuture scheduledFuture; + + /** + * @param batchHandler Defines how each batch will be processed. + * @param batchPolicy The batch configurations to be used. + * @param scheduledExecutorService A scheduled executor service to set timers for the managed batches. + */ + public BatchProcessor(BatchHandler batchHandler, BatchPolicy batchPolicy, + ScheduledExecutorService scheduledExecutorService) { +Preconditions.checkNotNull(batchHandler); +Preconditions.checkNotNull(batchPolicy); +Preconditions.checkNotNull(scheduledExecutorService); + +this.batchHandler = batchHandler; +this.scheduledExecutorService = scheduledExecutorService; +this.batchPolicy = batchPolicy; +batch = BatchFactory.getBatch(batchHandler, batchPolicy); +setBatchTimer(batch); + } + + private CompletableFuture addOperation(Operation operation) { +if (batch.isEmpty()) { + setBatchTimer(batch); +} else if (batch.isClosed()) { + processBatch(); +} +batch.addOperation(operation); +final CompletableFuture res = batch.getCompletableFuture(); Review comment: YES. processQueryOperation will call this function "addOperation()", ignores the returned value and use the CompletableFuture from the QueryOperation. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [samza] dengpanyin commented on a change in pull request #1031: SAMZA-2191: Batch support for table APIs
dengpanyin commented on a change in pull request #1031: SAMZA-2191: Batch support for table APIs URL: https://github.com/apache/samza/pull/1031#discussion_r284919684 ## File path: samza-core/src/main/java/org/apache/samza/table/batching/BatchProcessor.java ## @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.table.batching; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.ReentrantLock; +import org.apache.samza.table.batch.BatchPolicy; + + +/** + * Place a sequence of operations into a {@link Batch}. When a batch is not allowed to accept more + * operations, it will handled by a {@link BatchHandler}. Meanwhile, a new {@link Batch} will be + * created and a timer will be set for it. + * + * @param The type of the key associated with the {@link Operation} + * @param The type of the value associated with the {@link Operation} + */ +public class BatchProcessor { + private final ScheduledExecutorService scheduledExecutorService; + private final ReentrantLock lock = new ReentrantLock(); + private final BatchPolicy batchPolicy; + private final BatchHandler batchHandler; + private Batch batch; + private ScheduledFuture scheduledFuture; + + /** + * @param batchHandler Defines how each batch will be processed. + * @param batchPolicy The batch configurations to be used. + * @param scheduledExecutorService A scheduled executor service to set timers for the managed batches. + */ + public BatchProcessor(BatchHandler batchHandler, BatchPolicy batchPolicy, + ScheduledExecutorService scheduledExecutorService) { +Preconditions.checkNotNull(batchHandler); +Preconditions.checkNotNull(batchPolicy); +Preconditions.checkNotNull(scheduledExecutorService); + +this.batchHandler = batchHandler; +this.scheduledExecutorService = scheduledExecutorService; +this.batchPolicy = batchPolicy; +batch = BatchFactory.getBatch(batchHandler, batchPolicy); +setBatchTimer(batch); + } + + private CompletableFuture addOperation(Operation operation) { +if (batch.isEmpty()) { + setBatchTimer(batch); +} else if (batch.isClosed()) { + processBatch(); +} +batch.addOperation(operation); +final CompletableFuture res = batch.getCompletableFuture(); + +if (batch.isClosed()) { Review comment: Add this check to sent the batch to remote store immediately after the previous addOperation. Otherwise, the processor needs to wait until next addOperation to trigger the already closed batch. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [samza] dengpanyin commented on a change in pull request #1031: SAMZA-2191: Batch support for table APIs
dengpanyin commented on a change in pull request #1031: SAMZA-2191: Batch support for table APIs URL: https://github.com/apache/samza/pull/1031#discussion_r284918299 ## File path: samza-core/src/main/java/org/apache/samza/table/batching/PutOperation.java ## @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.table.batching; + +import com.google.common.base.Preconditions; + + +/** + * Put operation. + * + * @param The type of the key. + * @param The type of the value + */ +public class PutOperation implements Operation { + final private K key; + final private V val; + + public PutOperation(K key, V val) { +Preconditions.checkNotNull(key); + +this.key = key; +this.val = val; + } + + /** + * @return The key to be put to the table. + */ + @Override + public K getKey() { +return key; + } + + /** + * @return The value to be put to the table. + */ + @Override + public V getValue() { +return val; + } + + @Override + public boolean equals(Object other) { +if (this == other) { + return true; +} +if (other == null || getClass() != other.getClass()) { + return false; +} + +final PutOperation otherOp = (PutOperation) other; + +// If value for this object is null, should be null for other object as well. +if ((getValue() == null) ^ (otherOp.getValue() == null)) { Review comment: ^ is used as a exclusively OR. in order to skip the body for the "if()", both condition should be evaluated to be true or false, otherwise, will return false from the body of the "if()" statement. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [samza] dengpanyin commented on a change in pull request #1031: SAMZA-2191: Batch support for table APIs
dengpanyin commented on a change in pull request #1031: SAMZA-2191: Batch support for table APIs URL: https://github.com/apache/samza/pull/1031#discussion_r284918688 ## File path: samza-core/src/main/java/org/apache/samza/table/remote/RemoteTableProvider.java ## @@ -115,10 +117,22 @@ public ReadWriteTable getTable() { })); } +BatchPolicy batchPolicy = deserializeObject(tableConfig, RemoteTableDescriptor.BATCH_POLICY); Review comment: good catch. thanks This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [samza] dengpanyin commented on a change in pull request #1031: SAMZA-2191: Batch support for table APIs
dengpanyin commented on a change in pull request #1031: SAMZA-2191: Batch support for table APIs URL: https://github.com/apache/samza/pull/1031#discussion_r284917559 ## File path: samza-api/src/main/java/org/apache/samza/table/batch/BatchPolicy.java ## @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.table.batch; Review comment: Fixed. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [samza] dengpanyin commented on a change in pull request #1031: SAMZA-2191: Batch support for table APIs
dengpanyin commented on a change in pull request #1031: SAMZA-2191: Batch support for table APIs URL: https://github.com/apache/samza/pull/1031#discussion_r284917528 ## File path: samza-api/src/main/java/org/apache/samza/table/batch/BatchPolicy.java ## @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.table.batch; + +import java.io.Serializable; +import org.apache.samza.table.remote.TablePart; + + +public class BatchPolicy implements TablePart, Serializable { + public enum BatchType { +COMPACT_BATCH, +COMPLETE_BATCH + } + + private int batchMaxSize = 100; + private int batchMaxDelay = 100; + private BatchType batchType = BatchType.COMPACT_BATCH; + + public BatchPolicy withBatchMaxSize(int batchMaxSize) { +this.batchMaxSize = batchMaxSize; +return this; + } + + public BatchPolicy withBatchMaxDelay(int batchMaxDelay) { Review comment: Fixed This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [samza] dengpanyin commented on a change in pull request #1031: SAMZA-2191: Batch support for table APIs
dengpanyin commented on a change in pull request #1031: SAMZA-2191: Batch support for table APIs URL: https://github.com/apache/samza/pull/1031#discussion_r284917428 ## File path: samza-api/src/main/java/org/apache/samza/table/batch/BatchPolicy.java ## @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.table.batch; + +import java.io.Serializable; +import org.apache.samza.table.remote.TablePart; + + +public class BatchPolicy implements TablePart, Serializable { Review comment: For the new change, this class is deleted and use BatchProvider instead for config. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [samza] dengpanyin commented on a change in pull request #1031: SAMZA-2191: Batch support for table APIs
dengpanyin commented on a change in pull request #1031: SAMZA-2191: Batch support for table APIs URL: https://github.com/apache/samza/pull/1031#discussion_r284917500 ## File path: samza-api/src/main/java/org/apache/samza/table/batch/BatchPolicy.java ## @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.table.batch; + +import java.io.Serializable; +import org.apache.samza.table.remote.TablePart; + + +public class BatchPolicy implements TablePart, Serializable { + public enum BatchType { Review comment: Good point. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [samza] weisong44 commented on a change in pull request #1034: SAMZA-2185: Add ability to expose remote store specific features in remote table
weisong44 commented on a change in pull request #1034: SAMZA-2185: Add ability to expose remote store specific features in remote table URL: https://github.com/apache/samza/pull/1034#discussion_r284916421 ## File path: samza-core/src/main/java/org/apache/samza/table/remote/AsyncRemoteTable.java ## @@ -46,45 +48,66 @@ public AsyncRemoteTable(TableReadFunction readFn, TableWriteFunction } @Override - public CompletableFuture getAsync(K key) { -return readFn.getAsync(key); + public CompletableFuture getAsync(K key, Object ... args) { Review comment: The check is performed in RemoteTable, delegation tables don't need to do again. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [samza] srinipunuru opened a new pull request #1036: Adding a SQL test case with switch case
srinipunuru opened a new pull request #1036: Adding a SQL test case with switch case URL: https://github.com/apache/samza/pull/1036 Adding a Samza SQL test case with switch cast statement. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [samza] shanthoosh commented on a change in pull request #1027: SAMZA-2046: Startpoint fan out implementation
shanthoosh commented on a change in pull request #1027: SAMZA-2046: Startpoint fan out implementation URL: https://github.com/apache/samza/pull/1027#discussion_r284894176 ## File path: samza-core/src/main/java/org/apache/samza/startpoint/StartpointManager.java ## @@ -173,71 +207,148 @@ public void deleteStartpoint(SystemStreamPartition ssp, TaskName taskName) { Preconditions.checkState(!stopped, "Underlying metadata store not available"); Preconditions.checkNotNull(ssp, "SystemStreamPartition cannot be null"); -metadataStore.delete(toStoreKey(ssp, taskName)); +readWriteStore.delete(toReadWriteStoreKey(ssp, taskName)); } /** - * For {@link Startpoint}s keyed only by {@link SystemStreamPartition}, this method re-maps the Startpoints from - * SystemStreamPartition to SystemStreamPartition+{@link TaskName} for all tasks provided by the {@link JobModel} + * The Startpoints that are written to with {@link #writeStartpoint(SystemStreamPartition, Startpoint)} and with + * {@link #writeStartpoint(SystemStreamPartition, TaskName, Startpoint)} are moved from a "read-write" namespace + * to a "fan out" namespace. * This method is not atomic or thread-safe. The intent is for the Samza Processor's coordinator to use this * method to assign the Startpoints to the appropriate tasks. - * @param jobModel The {@link JobModel} is used to determine which {@link TaskName} each {@link SystemStreamPartition} maps to. - * @return The list of {@link SystemStreamPartition}s that were fanned out to SystemStreamPartition+TaskName. + * @param taskToSSPs Determines which {@link TaskName} each {@link SystemStreamPartition} maps to. + * @return The set of active {@link TaskName}s that were fanned out to. */ - public Set fanOutStartpointsToTasks(JobModel jobModel) { + public Map> fanOut(Map> taskToSSPs) throws IOException { Preconditions.checkState(!stopped, "Underlying metadata store not available"); -Preconditions.checkNotNull(jobModel, "JobModel cannot be null"); - -HashSet sspsToDelete = new HashSet<>(); - -// Inspect the job model for TaskName-to-SSPs mapping and re-map startpoints from SSP-only keys to SSP+TaskName keys. -for (ContainerModel containerModel: jobModel.getContainers().values()) { - for (TaskModel taskModel : containerModel.getTasks().values()) { -TaskName taskName = taskModel.getTaskName(); -for (SystemStreamPartition ssp : taskModel.getSystemStreamPartitions()) { - Startpoint startpoint = readStartpoint(ssp); // Read SSP-only key - if (startpoint == null) { -LOG.debug("No Startpoint for SSP: {} in task: {}", ssp, taskName); -continue; - } - - LOG.info("Grouping Startpoint keyed on SSP: {} to tasks determined by the job model.", ssp); - Startpoint startpointForTask = readStartpoint(ssp, taskName); - if (startpointForTask == null || startpointForTask.getCreationTimestamp() < startpoint.getCreationTimestamp()) { -writeStartpoint(ssp, taskName, startpoint); -sspsToDelete.add(ssp); // Mark for deletion -LOG.info("Startpoint for SSP: {} remapped with task: {}.", ssp, taskName); - } else { -LOG.info("Startpoint for SSP: {} and task: {} already exists and will not be overwritten.", ssp, taskName); - } +Preconditions.checkArgument(MapUtils.isNotEmpty(taskToSSPs), "taskToSSPs cannot be null or empty"); +// construct fan out with the existing readWriteStore entries and mark the entries for deletion after fan out +Instant now = Instant.now(); +HashMultimap deleteKeys = HashMultimap.create(); +HashMap fanOuts = new HashMap<>(); +for (TaskName taskName : taskToSSPs.keySet()) { + Set ssps = taskToSSPs.get(taskName); + if (CollectionUtils.isEmpty(ssps)) { +LOG.warn("No SSPs are mapped to taskName: {}", taskName.getTaskName()); +continue; + } + for (SystemStreamPartition ssp : ssps) { +Optional startpoint = readStartpoint(ssp); // Read SSP-only key +startpoint.ifPresent(sp -> deleteKeys.put(ssp, null)); + +Optional startpointForTask = readStartpoint(ssp, taskName); // Read SSP+taskName key +startpointForTask.ifPresent(sp -> deleteKeys.put(ssp, taskName)); + +Optional startpointWithPrecedence = resolveStartpointPrecendence(startpoint, startpointForTask); +if (!startpointWithPrecedence.isPresent()) { + continue; } + +fanOuts.putIfAbsent(taskName, new StartpointFanOutPerTask(now)); +fanOuts.get(taskName).getFanOuts().put(ssp, startpointWithPrecedence.get()); } } -// Delete SSP-only keys -sspsToDelete.forEach(ssp -> { -deleteStartpoint(ssp); -LOG.info("All Startpoints for SSP: {} have been grouped to the appropriate tasks and the SSP was deleted."); - }); +
[GitHub] [samza] dnishimura commented on a change in pull request #1027: SAMZA-2046: Startpoint fan out implementation
dnishimura commented on a change in pull request #1027: SAMZA-2046: Startpoint fan out implementation URL: https://github.com/apache/samza/pull/1027#discussion_r284887289 ## File path: samza-core/src/main/java/org/apache/samza/startpoint/StartpointManager.java ## @@ -20,87 +20,115 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import com.google.common.collect.HashMultimap; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; +import java.io.IOException; import java.time.Duration; import java.time.Instant; -import java.util.HashSet; -import java.util.Objects; +import java.util.HashMap; +import java.util.Map; import java.util.Set; +import org.apache.commons.collections4.CollectionUtils; +import org.apache.commons.collections4.MapUtils; +import org.apache.commons.lang3.ArrayUtils; import org.apache.samza.SamzaException; -import org.apache.samza.config.Config; import org.apache.samza.container.TaskName; import org.apache.samza.coordinator.metadatastore.NamespaceAwareCoordinatorStreamStore; -import org.apache.samza.job.model.ContainerModel; -import org.apache.samza.job.model.JobModel; -import org.apache.samza.job.model.TaskModel; import org.apache.samza.metadatastore.MetadataStore; -import org.apache.samza.metadatastore.MetadataStoreFactory; -import org.apache.samza.metrics.MetricsRegistry; -import org.apache.samza.serializers.JsonSerdeV2; import org.apache.samza.system.SystemStreamPartition; +import org.codehaus.jackson.map.ObjectMapper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * The StartpointManager reads and writes {@link Startpoint} to the {@link MetadataStore} defined by - * the configuration task.startpoint.metadata.store.factory. - * - * Startpoints are keyed in the MetadataStore by two different formats: - * 1) Only by {@link SystemStreamPartition} - * 2) A combination of {@link SystemStreamPartition} and {@link TaskName} + * The StartpointManager reads and writes {@link Startpoint} to the provided {@link MetadataStore} * * The intention for the StartpointManager is to maintain a strong contract between the caller * and how Startpoints are stored in the underlying MetadataStore. + * + * Startpoints are written in the MetadataStore using keys of two different formats: + * 1) {@link SystemStreamPartition} only + * 2) A combination of {@link SystemStreamPartition} and {@link TaskName} + * + * Startpoints are then fanned out to a fan out namespace in the MetadataStore by the + * {@link org.apache.samza.clustermanager.ClusterBasedJobCoordinator} or the standalone + * {@link org.apache.samza.coordinator.JobCoordinator} upon startup and the + * {@link org.apache.samza.checkpoint.OffsetManager} gets the fan outs to set the starting offsets per task and per + * {@link SystemStreamPartition}. The fan outs are deleted once the offsets are committed to the checkpoint. + * + * The read, write and delete methods are intended for external callers. + * The fan out methods are intended to be used within a job coordinator. */ public class StartpointManager { - private static final Logger LOG = LoggerFactory.getLogger(StartpointManager.class); - public static final String NAMESPACE = "samza-startpoint-v1"; + public static final Integer VERSION = 1; + public static final String NAMESPACE = "samza-startpoint-v" + VERSION; static final Duration DEFAULT_EXPIRATION_DURATION = Duration.ofHours(12); - private final MetadataStore metadataStore; - private final StartpointSerde startpointSerde = new StartpointSerde(); - - private boolean stopped = false; + private static final Logger LOG = LoggerFactory.getLogger(StartpointManager.class); + private static final String NAMESPACE_FAN_OUT = NAMESPACE + "-fan-out"; - /** - * Constructs a {@link StartpointManager} instance by instantiating a new metadata store connection. - * This is primarily used for testing. - */ - @VisibleForTesting - StartpointManager(MetadataStoreFactory metadataStoreFactory, Config config, MetricsRegistry metricsRegistry) { -Preconditions.checkNotNull(metadataStoreFactory, "MetadataStoreFactory cannot be null"); -Preconditions.checkNotNull(config, "Config cannot be null"); -Preconditions.checkNotNull(metricsRegistry, "MetricsRegistry cannot be null"); + private final MetadataStore metadataStore; + private final NamespaceAwareCoordinatorStreamStore fanOutStore; + private final NamespaceAwareCoordinatorStreamStore readWriteStore; + private final ObjectMapper objectMapper = StartpointObjectMapper.getObjectMapper(); -this.metadataStore = metadataStoreFactory.getMetadataStore(NAMESPACE, config, metricsRegistry); -LOG.info("StartpointManager created with metadata store: {}", metadataStore.getClass().getCanonicalName()); -this.metadataStore.init(); - } + private boolean stopped = true; /** * Builds the
[GitHub] [samza] dnishimura commented on a change in pull request #1027: SAMZA-2046: Startpoint fan out implementation
dnishimura commented on a change in pull request #1027: SAMZA-2046: Startpoint fan out implementation URL: https://github.com/apache/samza/pull/1027#discussion_r284874627 ## File path: samza-core/src/main/java/org/apache/samza/startpoint/StartpointManager.java ## @@ -173,71 +207,148 @@ public void deleteStartpoint(SystemStreamPartition ssp, TaskName taskName) { Preconditions.checkState(!stopped, "Underlying metadata store not available"); Preconditions.checkNotNull(ssp, "SystemStreamPartition cannot be null"); -metadataStore.delete(toStoreKey(ssp, taskName)); +readWriteStore.delete(toReadWriteStoreKey(ssp, taskName)); } /** - * For {@link Startpoint}s keyed only by {@link SystemStreamPartition}, this method re-maps the Startpoints from - * SystemStreamPartition to SystemStreamPartition+{@link TaskName} for all tasks provided by the {@link JobModel} + * The Startpoints that are written to with {@link #writeStartpoint(SystemStreamPartition, Startpoint)} and with + * {@link #writeStartpoint(SystemStreamPartition, TaskName, Startpoint)} are moved from a "read-write" namespace + * to a "fan out" namespace. * This method is not atomic or thread-safe. The intent is for the Samza Processor's coordinator to use this * method to assign the Startpoints to the appropriate tasks. - * @param jobModel The {@link JobModel} is used to determine which {@link TaskName} each {@link SystemStreamPartition} maps to. - * @return The list of {@link SystemStreamPartition}s that were fanned out to SystemStreamPartition+TaskName. + * @param taskToSSPs Determines which {@link TaskName} each {@link SystemStreamPartition} maps to. + * @return The set of active {@link TaskName}s that were fanned out to. */ - public Set fanOutStartpointsToTasks(JobModel jobModel) { + public Map> fanOut(Map> taskToSSPs) throws IOException { Preconditions.checkState(!stopped, "Underlying metadata store not available"); -Preconditions.checkNotNull(jobModel, "JobModel cannot be null"); - -HashSet sspsToDelete = new HashSet<>(); - -// Inspect the job model for TaskName-to-SSPs mapping and re-map startpoints from SSP-only keys to SSP+TaskName keys. -for (ContainerModel containerModel: jobModel.getContainers().values()) { - for (TaskModel taskModel : containerModel.getTasks().values()) { -TaskName taskName = taskModel.getTaskName(); -for (SystemStreamPartition ssp : taskModel.getSystemStreamPartitions()) { - Startpoint startpoint = readStartpoint(ssp); // Read SSP-only key - if (startpoint == null) { -LOG.debug("No Startpoint for SSP: {} in task: {}", ssp, taskName); -continue; - } - - LOG.info("Grouping Startpoint keyed on SSP: {} to tasks determined by the job model.", ssp); - Startpoint startpointForTask = readStartpoint(ssp, taskName); - if (startpointForTask == null || startpointForTask.getCreationTimestamp() < startpoint.getCreationTimestamp()) { -writeStartpoint(ssp, taskName, startpoint); -sspsToDelete.add(ssp); // Mark for deletion -LOG.info("Startpoint for SSP: {} remapped with task: {}.", ssp, taskName); - } else { -LOG.info("Startpoint for SSP: {} and task: {} already exists and will not be overwritten.", ssp, taskName); - } +Preconditions.checkArgument(MapUtils.isNotEmpty(taskToSSPs), "taskToSSPs cannot be null or empty"); +// construct fan out with the existing readWriteStore entries and mark the entries for deletion after fan out +Instant now = Instant.now(); +HashMultimap deleteKeys = HashMultimap.create(); +HashMap fanOuts = new HashMap<>(); +for (TaskName taskName : taskToSSPs.keySet()) { + Set ssps = taskToSSPs.get(taskName); + if (CollectionUtils.isEmpty(ssps)) { +LOG.warn("No SSPs are mapped to taskName: {}", taskName.getTaskName()); +continue; + } + for (SystemStreamPartition ssp : ssps) { +Optional startpoint = readStartpoint(ssp); // Read SSP-only key +startpoint.ifPresent(sp -> deleteKeys.put(ssp, null)); + +Optional startpointForTask = readStartpoint(ssp, taskName); // Read SSP+taskName key +startpointForTask.ifPresent(sp -> deleteKeys.put(ssp, taskName)); + +Optional startpointWithPrecedence = resolveStartpointPrecendence(startpoint, startpointForTask); +if (!startpointWithPrecedence.isPresent()) { + continue; } + +fanOuts.putIfAbsent(taskName, new StartpointFanOutPerTask(now)); +fanOuts.get(taskName).getFanOuts().put(ssp, startpointWithPrecedence.get()); } } -// Delete SSP-only keys -sspsToDelete.forEach(ssp -> { -deleteStartpoint(ssp); -LOG.info("All Startpoints for SSP: {} have been grouped to the appropriate tasks and the SSP was deleted."); - }); +
[GitHub] [samza] shanthoosh commented on a change in pull request #1027: SAMZA-2046: Startpoint fan out implementation
shanthoosh commented on a change in pull request #1027: SAMZA-2046: Startpoint fan out implementation URL: https://github.com/apache/samza/pull/1027#discussion_r284853798 ## File path: samza-core/src/main/java/org/apache/samza/startpoint/StartpointManager.java ## @@ -173,71 +207,148 @@ public void deleteStartpoint(SystemStreamPartition ssp, TaskName taskName) { Preconditions.checkState(!stopped, "Underlying metadata store not available"); Preconditions.checkNotNull(ssp, "SystemStreamPartition cannot be null"); -metadataStore.delete(toStoreKey(ssp, taskName)); +readWriteStore.delete(toReadWriteStoreKey(ssp, taskName)); } /** - * For {@link Startpoint}s keyed only by {@link SystemStreamPartition}, this method re-maps the Startpoints from - * SystemStreamPartition to SystemStreamPartition+{@link TaskName} for all tasks provided by the {@link JobModel} + * The Startpoints that are written to with {@link #writeStartpoint(SystemStreamPartition, Startpoint)} and with + * {@link #writeStartpoint(SystemStreamPartition, TaskName, Startpoint)} are moved from a "read-write" namespace + * to a "fan out" namespace. * This method is not atomic or thread-safe. The intent is for the Samza Processor's coordinator to use this * method to assign the Startpoints to the appropriate tasks. - * @param jobModel The {@link JobModel} is used to determine which {@link TaskName} each {@link SystemStreamPartition} maps to. - * @return The list of {@link SystemStreamPartition}s that were fanned out to SystemStreamPartition+TaskName. + * @param taskToSSPs Determines which {@link TaskName} each {@link SystemStreamPartition} maps to. + * @return The set of active {@link TaskName}s that were fanned out to. */ - public Set fanOutStartpointsToTasks(JobModel jobModel) { + public Map> fanOut(Map> taskToSSPs) throws IOException { Preconditions.checkState(!stopped, "Underlying metadata store not available"); -Preconditions.checkNotNull(jobModel, "JobModel cannot be null"); - -HashSet sspsToDelete = new HashSet<>(); - -// Inspect the job model for TaskName-to-SSPs mapping and re-map startpoints from SSP-only keys to SSP+TaskName keys. -for (ContainerModel containerModel: jobModel.getContainers().values()) { - for (TaskModel taskModel : containerModel.getTasks().values()) { -TaskName taskName = taskModel.getTaskName(); -for (SystemStreamPartition ssp : taskModel.getSystemStreamPartitions()) { - Startpoint startpoint = readStartpoint(ssp); // Read SSP-only key - if (startpoint == null) { -LOG.debug("No Startpoint for SSP: {} in task: {}", ssp, taskName); -continue; - } - - LOG.info("Grouping Startpoint keyed on SSP: {} to tasks determined by the job model.", ssp); - Startpoint startpointForTask = readStartpoint(ssp, taskName); - if (startpointForTask == null || startpointForTask.getCreationTimestamp() < startpoint.getCreationTimestamp()) { -writeStartpoint(ssp, taskName, startpoint); -sspsToDelete.add(ssp); // Mark for deletion -LOG.info("Startpoint for SSP: {} remapped with task: {}.", ssp, taskName); - } else { -LOG.info("Startpoint for SSP: {} and task: {} already exists and will not be overwritten.", ssp, taskName); - } +Preconditions.checkArgument(MapUtils.isNotEmpty(taskToSSPs), "taskToSSPs cannot be null or empty"); +// construct fan out with the existing readWriteStore entries and mark the entries for deletion after fan out +Instant now = Instant.now(); +HashMultimap deleteKeys = HashMultimap.create(); +HashMap fanOuts = new HashMap<>(); +for (TaskName taskName : taskToSSPs.keySet()) { + Set ssps = taskToSSPs.get(taskName); + if (CollectionUtils.isEmpty(ssps)) { +LOG.warn("No SSPs are mapped to taskName: {}", taskName.getTaskName()); +continue; + } + for (SystemStreamPartition ssp : ssps) { +Optional startpoint = readStartpoint(ssp); // Read SSP-only key +startpoint.ifPresent(sp -> deleteKeys.put(ssp, null)); + +Optional startpointForTask = readStartpoint(ssp, taskName); // Read SSP+taskName key +startpointForTask.ifPresent(sp -> deleteKeys.put(ssp, taskName)); + +Optional startpointWithPrecedence = resolveStartpointPrecendence(startpoint, startpointForTask); +if (!startpointWithPrecedence.isPresent()) { + continue; } + +fanOuts.putIfAbsent(taskName, new StartpointFanOutPerTask(now)); +fanOuts.get(taskName).getFanOuts().put(ssp, startpointWithPrecedence.get()); } } -// Delete SSP-only keys -sspsToDelete.forEach(ssp -> { -deleteStartpoint(ssp); -LOG.info("All Startpoints for SSP: {} have been grouped to the appropriate tasks and the SSP was deleted."); - }); +
[GitHub] [samza] weisong44 commented on a change in pull request #1034: SAMZA-2185: Add ability to expose remote store specific features in remote table
weisong44 commented on a change in pull request #1034: SAMZA-2185: Add ability to expose remote store specific features in remote table URL: https://github.com/apache/samza/pull/1034#discussion_r284841742 ## File path: samza-core/src/main/java/org/apache/samza/table/remote/RemoteTable.java ## @@ -241,64 +249,76 @@ public void putAll(List> entries) { CompletableFuture deleteFuture = deleteKeys.isEmpty() ? CompletableFuture.completedFuture(null) -: deleteAllAsync(deleteKeys); +: deleteAllAsync(deleteKeys, args); // Return the combined future return CompletableFuture.allOf( deleteFuture, -instrument(() -> asyncTable.putAllAsync(putRecords), metrics.numPutAlls, metrics.putAllNs)) +instrument(() -> asyncTable.putAllAsync(putRecords, args), metrics.numPutAlls, metrics.putAllNs)) .exceptionally(e -> { String strKeys = records.stream().map(r -> r.getKey().toString()).collect(Collectors.joining(",")); throw new SamzaException(String.format("Failed to put records with keys=" + strKeys), e); }); } @Override - public void delete(K key) { + public void delete(K key, Object ... args) { try { - deleteAsync(key).get(); + deleteAsync(key, args).get(); } catch (Exception e) { throw new SamzaException(e); } } @Override - public CompletableFuture deleteAsync(K key) { + public CompletableFuture deleteAsync(K key, Object ... args) { Preconditions.checkNotNull(writeFn, "null write function"); Preconditions.checkNotNull(key, "null key"); -return instrument(() -> asyncTable.deleteAsync(key), metrics.numDeletes, metrics.deleteNs) +return instrument(() -> asyncTable.deleteAsync(key, args), metrics.numDeletes, metrics.deleteNs) .exceptionally(e -> { throw new SamzaException(String.format("Failed to delete the record for " + key), (Throwable) e); }); } @Override - public void deleteAll(List keys) { + public void deleteAll(List keys, Object ... args) { try { - deleteAllAsync(keys).get(); + deleteAllAsync(keys, args).get(); } catch (Exception e) { throw new SamzaException(e); } } @Override - public CompletableFuture deleteAllAsync(List keys) { + public CompletableFuture deleteAllAsync(List keys, Object ... args) { Preconditions.checkNotNull(writeFn, "null write function"); Preconditions.checkNotNull(keys, "null keys"); if (keys.isEmpty()) { return CompletableFuture.completedFuture(null); } -return instrument(() -> asyncTable.deleteAllAsync(keys), metrics.numDeleteAlls, metrics.deleteAllNs) +return instrument(() -> asyncTable.deleteAllAsync(keys, args), metrics.numDeleteAlls, metrics.deleteAllNs) .exceptionally(e -> { -throw new SamzaException(String.format("Failed to delete records for " + keys), (Throwable) e); +throw new SamzaException(String.format("Failed to delete records for " + keys), e); + }); + } + + @Override + public CompletableFuture writeAsync(int opId, Object... args) { +return (CompletableFuture) instrument(() -> asyncTable.writeAsync(opId, args), metrics.numWrites, metrics.writeNs) +.exceptionally(e -> { +throw new SamzaException(String.format("Failed to write, opId=%d", opId), e); }); } @Override public void init(Context context) { super.init(context); asyncTable.init(context); +readFn.init(context, this); Review comment: Since I need to pass this here, so that table functions get a reference of the remote table to get rate limiter, etc. hooked in during invocation. I removed the init part from AsyncRemoteTable and added a comment to indicate the move. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [samza] shanthoosh commented on a change in pull request #1027: SAMZA-2046: Startpoint fan out implementation
shanthoosh commented on a change in pull request #1027: SAMZA-2046: Startpoint fan out implementation URL: https://github.com/apache/samza/pull/1027#discussion_r284837838 ## File path: samza-core/src/main/java/org/apache/samza/startpoint/StartpointManager.java ## @@ -20,87 +20,115 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import com.google.common.collect.HashMultimap; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; +import java.io.IOException; import java.time.Duration; import java.time.Instant; -import java.util.HashSet; -import java.util.Objects; +import java.util.HashMap; +import java.util.Map; import java.util.Set; +import org.apache.commons.collections4.CollectionUtils; +import org.apache.commons.collections4.MapUtils; +import org.apache.commons.lang3.ArrayUtils; import org.apache.samza.SamzaException; -import org.apache.samza.config.Config; import org.apache.samza.container.TaskName; import org.apache.samza.coordinator.metadatastore.NamespaceAwareCoordinatorStreamStore; -import org.apache.samza.job.model.ContainerModel; -import org.apache.samza.job.model.JobModel; -import org.apache.samza.job.model.TaskModel; import org.apache.samza.metadatastore.MetadataStore; -import org.apache.samza.metadatastore.MetadataStoreFactory; -import org.apache.samza.metrics.MetricsRegistry; -import org.apache.samza.serializers.JsonSerdeV2; import org.apache.samza.system.SystemStreamPartition; +import org.codehaus.jackson.map.ObjectMapper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * The StartpointManager reads and writes {@link Startpoint} to the {@link MetadataStore} defined by - * the configuration task.startpoint.metadata.store.factory. - * - * Startpoints are keyed in the MetadataStore by two different formats: - * 1) Only by {@link SystemStreamPartition} - * 2) A combination of {@link SystemStreamPartition} and {@link TaskName} + * The StartpointManager reads and writes {@link Startpoint} to the provided {@link MetadataStore} * * The intention for the StartpointManager is to maintain a strong contract between the caller * and how Startpoints are stored in the underlying MetadataStore. + * + * Startpoints are written in the MetadataStore using keys of two different formats: + * 1) {@link SystemStreamPartition} only + * 2) A combination of {@link SystemStreamPartition} and {@link TaskName} + * + * Startpoints are then fanned out to a fan out namespace in the MetadataStore by the + * {@link org.apache.samza.clustermanager.ClusterBasedJobCoordinator} or the standalone + * {@link org.apache.samza.coordinator.JobCoordinator} upon startup and the + * {@link org.apache.samza.checkpoint.OffsetManager} gets the fan outs to set the starting offsets per task and per + * {@link SystemStreamPartition}. The fan outs are deleted once the offsets are committed to the checkpoint. + * + * The read, write and delete methods are intended for external callers. + * The fan out methods are intended to be used within a job coordinator. */ public class StartpointManager { - private static final Logger LOG = LoggerFactory.getLogger(StartpointManager.class); - public static final String NAMESPACE = "samza-startpoint-v1"; + public static final Integer VERSION = 1; + public static final String NAMESPACE = "samza-startpoint-v" + VERSION; static final Duration DEFAULT_EXPIRATION_DURATION = Duration.ofHours(12); - private final MetadataStore metadataStore; - private final StartpointSerde startpointSerde = new StartpointSerde(); - - private boolean stopped = false; + private static final Logger LOG = LoggerFactory.getLogger(StartpointManager.class); + private static final String NAMESPACE_FAN_OUT = NAMESPACE + "-fan-out"; - /** - * Constructs a {@link StartpointManager} instance by instantiating a new metadata store connection. - * This is primarily used for testing. - */ - @VisibleForTesting - StartpointManager(MetadataStoreFactory metadataStoreFactory, Config config, MetricsRegistry metricsRegistry) { -Preconditions.checkNotNull(metadataStoreFactory, "MetadataStoreFactory cannot be null"); -Preconditions.checkNotNull(config, "Config cannot be null"); -Preconditions.checkNotNull(metricsRegistry, "MetricsRegistry cannot be null"); + private final MetadataStore metadataStore; + private final NamespaceAwareCoordinatorStreamStore fanOutStore; + private final NamespaceAwareCoordinatorStreamStore readWriteStore; + private final ObjectMapper objectMapper = StartpointObjectMapper.getObjectMapper(); -this.metadataStore = metadataStoreFactory.getMetadataStore(NAMESPACE, config, metricsRegistry); -LOG.info("StartpointManager created with metadata store: {}", metadataStore.getClass().getCanonicalName()); -this.metadataStore.init(); - } + private boolean stopped = true; /** * Builds the
[GitHub] [samza] shanthoosh commented on a change in pull request #1027: SAMZA-2046: Startpoint fan out implementation
shanthoosh commented on a change in pull request #1027: SAMZA-2046: Startpoint fan out implementation URL: https://github.com/apache/samza/pull/1027#discussion_r284837838 ## File path: samza-core/src/main/java/org/apache/samza/startpoint/StartpointManager.java ## @@ -20,87 +20,115 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import com.google.common.collect.HashMultimap; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; +import java.io.IOException; import java.time.Duration; import java.time.Instant; -import java.util.HashSet; -import java.util.Objects; +import java.util.HashMap; +import java.util.Map; import java.util.Set; +import org.apache.commons.collections4.CollectionUtils; +import org.apache.commons.collections4.MapUtils; +import org.apache.commons.lang3.ArrayUtils; import org.apache.samza.SamzaException; -import org.apache.samza.config.Config; import org.apache.samza.container.TaskName; import org.apache.samza.coordinator.metadatastore.NamespaceAwareCoordinatorStreamStore; -import org.apache.samza.job.model.ContainerModel; -import org.apache.samza.job.model.JobModel; -import org.apache.samza.job.model.TaskModel; import org.apache.samza.metadatastore.MetadataStore; -import org.apache.samza.metadatastore.MetadataStoreFactory; -import org.apache.samza.metrics.MetricsRegistry; -import org.apache.samza.serializers.JsonSerdeV2; import org.apache.samza.system.SystemStreamPartition; +import org.codehaus.jackson.map.ObjectMapper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * The StartpointManager reads and writes {@link Startpoint} to the {@link MetadataStore} defined by - * the configuration task.startpoint.metadata.store.factory. - * - * Startpoints are keyed in the MetadataStore by two different formats: - * 1) Only by {@link SystemStreamPartition} - * 2) A combination of {@link SystemStreamPartition} and {@link TaskName} + * The StartpointManager reads and writes {@link Startpoint} to the provided {@link MetadataStore} * * The intention for the StartpointManager is to maintain a strong contract between the caller * and how Startpoints are stored in the underlying MetadataStore. + * + * Startpoints are written in the MetadataStore using keys of two different formats: + * 1) {@link SystemStreamPartition} only + * 2) A combination of {@link SystemStreamPartition} and {@link TaskName} + * + * Startpoints are then fanned out to a fan out namespace in the MetadataStore by the + * {@link org.apache.samza.clustermanager.ClusterBasedJobCoordinator} or the standalone + * {@link org.apache.samza.coordinator.JobCoordinator} upon startup and the + * {@link org.apache.samza.checkpoint.OffsetManager} gets the fan outs to set the starting offsets per task and per + * {@link SystemStreamPartition}. The fan outs are deleted once the offsets are committed to the checkpoint. + * + * The read, write and delete methods are intended for external callers. + * The fan out methods are intended to be used within a job coordinator. */ public class StartpointManager { - private static final Logger LOG = LoggerFactory.getLogger(StartpointManager.class); - public static final String NAMESPACE = "samza-startpoint-v1"; + public static final Integer VERSION = 1; + public static final String NAMESPACE = "samza-startpoint-v" + VERSION; static final Duration DEFAULT_EXPIRATION_DURATION = Duration.ofHours(12); - private final MetadataStore metadataStore; - private final StartpointSerde startpointSerde = new StartpointSerde(); - - private boolean stopped = false; + private static final Logger LOG = LoggerFactory.getLogger(StartpointManager.class); + private static final String NAMESPACE_FAN_OUT = NAMESPACE + "-fan-out"; - /** - * Constructs a {@link StartpointManager} instance by instantiating a new metadata store connection. - * This is primarily used for testing. - */ - @VisibleForTesting - StartpointManager(MetadataStoreFactory metadataStoreFactory, Config config, MetricsRegistry metricsRegistry) { -Preconditions.checkNotNull(metadataStoreFactory, "MetadataStoreFactory cannot be null"); -Preconditions.checkNotNull(config, "Config cannot be null"); -Preconditions.checkNotNull(metricsRegistry, "MetricsRegistry cannot be null"); + private final MetadataStore metadataStore; + private final NamespaceAwareCoordinatorStreamStore fanOutStore; + private final NamespaceAwareCoordinatorStreamStore readWriteStore; + private final ObjectMapper objectMapper = StartpointObjectMapper.getObjectMapper(); -this.metadataStore = metadataStoreFactory.getMetadataStore(NAMESPACE, config, metricsRegistry); -LOG.info("StartpointManager created with metadata store: {}", metadataStore.getClass().getCanonicalName()); -this.metadataStore.init(); - } + private boolean stopped = true; /** * Builds the
[GitHub] [samza] dengpanyin commented on a change in pull request #1034: SAMZA-2185: Add ability to expose remote store specific features in remote table
dengpanyin commented on a change in pull request #1034: SAMZA-2185: Add ability to expose remote store specific features in remote table URL: https://github.com/apache/samza/pull/1034#discussion_r284818562 ## File path: samza-core/src/main/java/org/apache/samza/table/remote/AsyncRemoteTable.java ## @@ -46,45 +48,66 @@ public AsyncRemoteTable(TableReadFunction readFn, TableWriteFunction } @Override - public CompletableFuture getAsync(K key) { -return readFn.getAsync(key); + public CompletableFuture getAsync(K key, Object ... args) { Review comment: checkNotNull for args, maybe key as well? Should "args.length" check be deferred to the raadFn by passing the args to raadFn? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [samza] dengpanyin commented on a change in pull request #1034: SAMZA-2185: Add ability to expose remote store specific features in remote table
dengpanyin commented on a change in pull request #1034: SAMZA-2185: Add ability to expose remote store specific features in remote table URL: https://github.com/apache/samza/pull/1034#discussion_r284828100 ## File path: samza-core/src/main/java/org/apache/samza/table/remote/RemoteTable.java ## @@ -241,64 +249,76 @@ public void putAll(List> entries) { CompletableFuture deleteFuture = deleteKeys.isEmpty() ? CompletableFuture.completedFuture(null) -: deleteAllAsync(deleteKeys); +: deleteAllAsync(deleteKeys, args); // Return the combined future return CompletableFuture.allOf( deleteFuture, -instrument(() -> asyncTable.putAllAsync(putRecords), metrics.numPutAlls, metrics.putAllNs)) +instrument(() -> asyncTable.putAllAsync(putRecords, args), metrics.numPutAlls, metrics.putAllNs)) .exceptionally(e -> { String strKeys = records.stream().map(r -> r.getKey().toString()).collect(Collectors.joining(",")); throw new SamzaException(String.format("Failed to put records with keys=" + strKeys), e); }); } @Override - public void delete(K key) { + public void delete(K key, Object ... args) { try { - deleteAsync(key).get(); + deleteAsync(key, args).get(); } catch (Exception e) { throw new SamzaException(e); } } @Override - public CompletableFuture deleteAsync(K key) { + public CompletableFuture deleteAsync(K key, Object ... args) { Preconditions.checkNotNull(writeFn, "null write function"); Preconditions.checkNotNull(key, "null key"); -return instrument(() -> asyncTable.deleteAsync(key), metrics.numDeletes, metrics.deleteNs) +return instrument(() -> asyncTable.deleteAsync(key, args), metrics.numDeletes, metrics.deleteNs) .exceptionally(e -> { throw new SamzaException(String.format("Failed to delete the record for " + key), (Throwable) e); }); } @Override - public void deleteAll(List keys) { + public void deleteAll(List keys, Object ... args) { try { - deleteAllAsync(keys).get(); + deleteAllAsync(keys, args).get(); } catch (Exception e) { throw new SamzaException(e); } } @Override - public CompletableFuture deleteAllAsync(List keys) { + public CompletableFuture deleteAllAsync(List keys, Object ... args) { Preconditions.checkNotNull(writeFn, "null write function"); Preconditions.checkNotNull(keys, "null keys"); if (keys.isEmpty()) { return CompletableFuture.completedFuture(null); } -return instrument(() -> asyncTable.deleteAllAsync(keys), metrics.numDeleteAlls, metrics.deleteAllNs) +return instrument(() -> asyncTable.deleteAllAsync(keys, args), metrics.numDeleteAlls, metrics.deleteAllNs) .exceptionally(e -> { -throw new SamzaException(String.format("Failed to delete records for " + keys), (Throwable) e); +throw new SamzaException(String.format("Failed to delete records for " + keys), e); + }); + } + + @Override + public CompletableFuture writeAsync(int opId, Object... args) { +return (CompletableFuture) instrument(() -> asyncTable.writeAsync(opId, args), metrics.numWrites, metrics.writeNs) +.exceptionally(e -> { +throw new SamzaException(String.format("Failed to write, opId=%d", opId), e); }); } @Override public void init(Context context) { super.init(context); asyncTable.init(context); +readFn.init(context, this); Review comment: asyncTable already initizize readFn/writeFn, why still need this? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [samza] dengpanyin commented on a change in pull request #1034: SAMZA-2185: Add ability to expose remote store specific features in remote table
dengpanyin commented on a change in pull request #1034: SAMZA-2185: Add ability to expose remote store specific features in remote table URL: https://github.com/apache/samza/pull/1034#discussion_r284809446 ## File path: samza-api/src/main/java/org/apache/samza/table/remote/TableRateLimiter.java ## @@ -63,9 +63,20 @@ * Get the number of credits required for the {@code key} and {@code value} pair. * @param key table key * @param value table record + * @param args additional arguments * @return number of credits */ -int getCredits(K key, V value); +int getCredits(K key, V value, Object ... args); + +/** + * Get the number of credits required for the {@code opId} and associated {@code args}. + * @param opId operation Id + * @param args additional arguments + * @return number of credits + */ +default int getCredit(int opId, Object ... args) { Review comment: rename it be to "getCredits" to make the name consistent ? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [samza] mynameborat commented on a change in pull request #938: SAMZA-1531: Support run.id in standalone for batch processing.
mynameborat commented on a change in pull request #938: SAMZA-1531: Support run.id in standalone for batch processing. URL: https://github.com/apache/samza/pull/938#discussion_r284819953 ## File path: samza-core/src/main/java/org/apache/samza/coordinator/ClusterMembership.java ## @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.coordinator; + +import org.apache.samza.annotation.InterfaceStability; + +/** + * Coordination Primitive to maintain the list of processors in the quorum + * + * Guarantees: Review comment: I feel the documentation needs to address two aspects of the contract. 1. What are the responsibilities/expectations of the implementors of the interface? 2. What are the guarantees provided to the users of the interface? It will be nice to have this separation since right now, it is not unclear who is targeted in these sections. Also, I suppose what you mean by 1 and 2 under guarantees is referred to as `Linearizability`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [samza] mynameborat commented on a change in pull request #938: SAMZA-1531: Support run.id in standalone for batch processing.
mynameborat commented on a change in pull request #938: SAMZA-1531: Support run.id in standalone for batch processing. URL: https://github.com/apache/samza/pull/938#discussion_r284829002 ## File path: samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java ## @@ -77,20 +93,81 @@ */ public LocalApplicationRunner(SamzaApplication app, Config config) { this.appDesc = ApplicationDescriptorUtil.getAppDescriptor(app, config); -this.planner = new LocalJobPlanner(appDesc); +isAppModeBatch = new ApplicationConfig(config).getAppMode() == ApplicationConfig.ApplicationMode.BATCH; +coordinationUtils = getCoordinationUtils(config); } /** * Constructor only used in unit test to allow injection of {@link LocalJobPlanner} */ @VisibleForTesting - LocalApplicationRunner(ApplicationDescriptorImpl appDesc, LocalJobPlanner planner) { + LocalApplicationRunner(ApplicationDescriptorImpl appDesc, Optional coordinationUtils) { this.appDesc = appDesc; -this.planner = planner; +isAppModeBatch = new ApplicationConfig(appDesc.getConfig()).getAppMode() == ApplicationConfig.ApplicationMode.BATCH; +this.coordinationUtils = coordinationUtils; + } + + private Optional getCoordinationUtils(Config config) { +if (!isAppModeBatch) { + return Optional.empty(); +} +JobCoordinatorConfig jcConfig = new JobCoordinatorConfig(config); +CoordinationUtils coordinationUtils = jcConfig.getCoordinationUtilsFactory().getCoordinationUtils(CoordinationConstants.APPLICATION_RUNNER_PATH_SUFFIX, PROCESSOR_ID, config); +return Optional.ofNullable(coordinationUtils); + } + + /** + * @return LocalJobPlanner created + */ + @VisibleForTesting + LocalJobPlanner getPlanner() { +boolean isAppModeBatch = new ApplicationConfig(appDesc.getConfig()).getAppMode() == ApplicationConfig.ApplicationMode.BATCH; +if (!isAppModeBatch) { + return new LocalJobPlanner(appDesc, PROCESSOR_ID); +} +CoordinationUtils coordinationUtils = null; +String runId = null; +if (this.coordinationUtils.isPresent()) { + coordinationUtils = this.coordinationUtils.get(); +} +if (this.runId.isPresent()) { + runId = this.runId.get(); +} +return new LocalJobPlanner(appDesc, coordinationUtils, PROCESSOR_ID, runId); + } + + + private void initializeRunId() { Review comment: It is hard to read through this. Especially around when the runId isn't initialized. We should at least separate the scenarios into categories 1. We don't need to initialize runId because we explicitly chose to. e.g. stream mode 2. We can't initialize the runId because we don't have everything we need to construct runIdGenerator. 3. We didn't initialize runId because we failed to generate runId. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [samza] mynameborat commented on a change in pull request #938: SAMZA-1531: Support run.id in standalone for batch processing.
mynameborat commented on a change in pull request #938: SAMZA-1531: Support run.id in standalone for batch processing. URL: https://github.com/apache/samza/pull/938#discussion_r284806205 ## File path: samza-azure/src/main/java/org/apache/samza/coordinator/AzureCoordinationUtils.java ## @@ -45,11 +44,19 @@ public Latch getLatch(int size, String latchId) throws UnsupportedOperationExcep return null; } + // To support DistributedLock in Azure, even MetadataStore needs to be implemented. Review comment: nit: can we change this to java docs instead of comments? also, link the `MetadataStore` to the interface so that it is clear on what is referred here. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [samza] mynameborat commented on a change in pull request #938: SAMZA-1531: Support run.id in standalone for batch processing.
mynameborat commented on a change in pull request #938: SAMZA-1531: Support run.id in standalone for batch processing. URL: https://github.com/apache/samza/pull/938#discussion_r284829361 ## File path: samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java ## @@ -77,20 +93,81 @@ */ public LocalApplicationRunner(SamzaApplication app, Config config) { this.appDesc = ApplicationDescriptorUtil.getAppDescriptor(app, config); -this.planner = new LocalJobPlanner(appDesc); +isAppModeBatch = new ApplicationConfig(config).getAppMode() == ApplicationConfig.ApplicationMode.BATCH; +coordinationUtils = getCoordinationUtils(config); } /** * Constructor only used in unit test to allow injection of {@link LocalJobPlanner} */ @VisibleForTesting - LocalApplicationRunner(ApplicationDescriptorImpl appDesc, LocalJobPlanner planner) { + LocalApplicationRunner(ApplicationDescriptorImpl appDesc, Optional coordinationUtils) { this.appDesc = appDesc; -this.planner = planner; +isAppModeBatch = new ApplicationConfig(appDesc.getConfig()).getAppMode() == ApplicationConfig.ApplicationMode.BATCH; +this.coordinationUtils = coordinationUtils; + } + + private Optional getCoordinationUtils(Config config) { +if (!isAppModeBatch) { + return Optional.empty(); +} +JobCoordinatorConfig jcConfig = new JobCoordinatorConfig(config); +CoordinationUtils coordinationUtils = jcConfig.getCoordinationUtilsFactory().getCoordinationUtils(CoordinationConstants.APPLICATION_RUNNER_PATH_SUFFIX, PROCESSOR_ID, config); +return Optional.ofNullable(coordinationUtils); + } + + /** + * @return LocalJobPlanner created + */ + @VisibleForTesting + LocalJobPlanner getPlanner() { +boolean isAppModeBatch = new ApplicationConfig(appDesc.getConfig()).getAppMode() == ApplicationConfig.ApplicationMode.BATCH; +if (!isAppModeBatch) { + return new LocalJobPlanner(appDesc, PROCESSOR_ID); +} +CoordinationUtils coordinationUtils = null; +String runId = null; +if (this.coordinationUtils.isPresent()) { + coordinationUtils = this.coordinationUtils.get(); +} +if (this.runId.isPresent()) { + runId = this.runId.get(); +} +return new LocalJobPlanner(appDesc, coordinationUtils, PROCESSOR_ID, runId); + } + + + private void initializeRunId() { +try { + MetadataStore metadataStore = getMetadataStore(); + if (coordinationUtils.isPresent() && metadataStore != null) { +runIdGenerator = Optional.of(new RunIdGenerator(coordinationUtils.get(), metadataStore)); + } + if (!coordinationUtils.isPresent() || !isAppModeBatch || !runIdGenerator.isPresent()) { +LOG.warn("coordination utils or run id generator could not be created successfully!"); +return; + } + runId = runIdGenerator.get().getRunId(); Review comment: runIdGenerator check can be removed and you can simplify this to `runId = runIdGenerator.ifPresent(RunIdGenerator::getRunId);` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [samza] mynameborat commented on a change in pull request #938: SAMZA-1531: Support run.id in standalone for batch processing.
mynameborat commented on a change in pull request #938: SAMZA-1531: Support run.id in standalone for batch processing. URL: https://github.com/apache/samza/pull/938#discussion_r284817518 ## File path: samza-core/src/main/java/org/apache/samza/coordinator/ClusterMembership.java ## @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.coordinator; + +import org.apache.samza.annotation.InterfaceStability; + +/** + * Coordination Primitive to maintain the list of processors in the quorum + * + * Guarantees: + * 1. after registerProcessor, getNumberOfProcessors should return count inclusive of at least the current processor + * 2. after unregisterProcessor, getNumberOfProcessors should return count exclusive of at least the current processor + * + * Non-guarantees: + * 1. thread safe + * 2. concurrent access of the list Review comment: what list are we referring to? can you clarify the comment a bit more? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [samza] mynameborat commented on a change in pull request #938: SAMZA-1531: Support run.id in standalone for batch processing.
mynameborat commented on a change in pull request #938: SAMZA-1531: Support run.id in standalone for batch processing. URL: https://github.com/apache/samza/pull/938#discussion_r284826566 ## File path: samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java ## @@ -77,20 +93,81 @@ */ public LocalApplicationRunner(SamzaApplication app, Config config) { this.appDesc = ApplicationDescriptorUtil.getAppDescriptor(app, config); -this.planner = new LocalJobPlanner(appDesc); +isAppModeBatch = new ApplicationConfig(config).getAppMode() == ApplicationConfig.ApplicationMode.BATCH; +coordinationUtils = getCoordinationUtils(config); } /** * Constructor only used in unit test to allow injection of {@link LocalJobPlanner} */ @VisibleForTesting - LocalApplicationRunner(ApplicationDescriptorImpl appDesc, LocalJobPlanner planner) { + LocalApplicationRunner(ApplicationDescriptorImpl appDesc, Optional coordinationUtils) { this.appDesc = appDesc; -this.planner = planner; +isAppModeBatch = new ApplicationConfig(appDesc.getConfig()).getAppMode() == ApplicationConfig.ApplicationMode.BATCH; +this.coordinationUtils = coordinationUtils; + } + + private Optional getCoordinationUtils(Config config) { +if (!isAppModeBatch) { + return Optional.empty(); +} +JobCoordinatorConfig jcConfig = new JobCoordinatorConfig(config); +CoordinationUtils coordinationUtils = jcConfig.getCoordinationUtilsFactory().getCoordinationUtils(CoordinationConstants.APPLICATION_RUNNER_PATH_SUFFIX, PROCESSOR_ID, config); +return Optional.ofNullable(coordinationUtils); + } + + /** + * @return LocalJobPlanner created + */ + @VisibleForTesting + LocalJobPlanner getPlanner() { +boolean isAppModeBatch = new ApplicationConfig(appDesc.getConfig()).getAppMode() == ApplicationConfig.ApplicationMode.BATCH; +if (!isAppModeBatch) { + return new LocalJobPlanner(appDesc, PROCESSOR_ID); +} +CoordinationUtils coordinationUtils = null; +String runId = null; +if (this.coordinationUtils.isPresent()) { + coordinationUtils = this.coordinationUtils.get(); +} +if (this.runId.isPresent()) { + runId = this.runId.get(); +} +return new LocalJobPlanner(appDesc, coordinationUtils, PROCESSOR_ID, runId); + } + + + private void initializeRunId() { +try { + MetadataStore metadataStore = getMetadataStore(); + if (coordinationUtils.isPresent() && metadataStore != null) { +runIdGenerator = Optional.of(new RunIdGenerator(coordinationUtils.get(), metadataStore)); + } + if (!coordinationUtils.isPresent() || !isAppModeBatch || !runIdGenerator.isPresent()) { +LOG.warn("coordination utils or run id generator could not be created successfully!"); +return; + } + runId = runIdGenerator.get().getRunId(); +} catch (Exception e) { + LOG.warn("Failed to generate run id. Will continue execution without a run id. Caused by {}", e); +} + } + + public String getRunid() { +if (runId.isPresent()) { Review comment: can be simplified to `return runId.orElse(null);` independently, why are we returning null here? Why not simply return `Optional` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [samza] mynameborat commented on a change in pull request #938: SAMZA-1531: Support run.id in standalone for batch processing.
mynameborat commented on a change in pull request #938: SAMZA-1531: Support run.id in standalone for batch processing. URL: https://github.com/apache/samza/pull/938#discussion_r284829974 ## File path: samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java ## @@ -77,20 +93,81 @@ */ public LocalApplicationRunner(SamzaApplication app, Config config) { this.appDesc = ApplicationDescriptorUtil.getAppDescriptor(app, config); -this.planner = new LocalJobPlanner(appDesc); +isAppModeBatch = new ApplicationConfig(config).getAppMode() == ApplicationConfig.ApplicationMode.BATCH; +coordinationUtils = getCoordinationUtils(config); } /** * Constructor only used in unit test to allow injection of {@link LocalJobPlanner} */ @VisibleForTesting - LocalApplicationRunner(ApplicationDescriptorImpl appDesc, LocalJobPlanner planner) { + LocalApplicationRunner(ApplicationDescriptorImpl appDesc, Optional coordinationUtils) { this.appDesc = appDesc; -this.planner = planner; +isAppModeBatch = new ApplicationConfig(appDesc.getConfig()).getAppMode() == ApplicationConfig.ApplicationMode.BATCH; +this.coordinationUtils = coordinationUtils; + } + + private Optional getCoordinationUtils(Config config) { +if (!isAppModeBatch) { + return Optional.empty(); +} +JobCoordinatorConfig jcConfig = new JobCoordinatorConfig(config); +CoordinationUtils coordinationUtils = jcConfig.getCoordinationUtilsFactory().getCoordinationUtils(CoordinationConstants.APPLICATION_RUNNER_PATH_SUFFIX, PROCESSOR_ID, config); +return Optional.ofNullable(coordinationUtils); + } + + /** + * @return LocalJobPlanner created + */ + @VisibleForTesting + LocalJobPlanner getPlanner() { +boolean isAppModeBatch = new ApplicationConfig(appDesc.getConfig()).getAppMode() == ApplicationConfig.ApplicationMode.BATCH; +if (!isAppModeBatch) { + return new LocalJobPlanner(appDesc, PROCESSOR_ID); +} +CoordinationUtils coordinationUtils = null; +String runId = null; +if (this.coordinationUtils.isPresent()) { + coordinationUtils = this.coordinationUtils.get(); +} +if (this.runId.isPresent()) { + runId = this.runId.get(); +} +return new LocalJobPlanner(appDesc, coordinationUtils, PROCESSOR_ID, runId); + } + + + private void initializeRunId() { +try { + MetadataStore metadataStore = getMetadataStore(); + if (coordinationUtils.isPresent() && metadataStore != null) { +runIdGenerator = Optional.of(new RunIdGenerator(coordinationUtils.get(), metadataStore)); + } + if (!coordinationUtils.isPresent() || !isAppModeBatch || !runIdGenerator.isPresent()) { +LOG.warn("coordination utils or run id generator could not be created successfully!"); +return; + } + runId = runIdGenerator.get().getRunId(); +} catch (Exception e) { + LOG.warn("Failed to generate run id. Will continue execution without a run id. Caused by {}", e); +} + } + + public String getRunid() { +if (runId.isPresent()) { + return runId.get(); +} +return null; } @Override public void run(ExternalContext externalContext) { +if (isAppModeBatch) { Review comment: can we remove this check since we already do this check inside the `initializeRunId()` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [samza] mynameborat commented on a change in pull request #938: SAMZA-1531: Support run.id in standalone for batch processing.
mynameborat commented on a change in pull request #938: SAMZA-1531: Support run.id in standalone for batch processing. URL: https://github.com/apache/samza/pull/938#discussion_r284830168 ## File path: samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationUtils.java ## @@ -52,11 +53,12 @@ public Latch getLatch(int size, String latchId) { } @Override - public DistributedLockWithState getLockWithState(String lockId) { + public DistributedLock getLock(String lockId) { return new ZkDistributedLock(processorIdStr, zkUtils, lockId); } public void close() { +System.out.println("Manasa: closing coordination utils"); Review comment: nit: remove This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [samza] mynameborat commented on a change in pull request #938: SAMZA-1531: Support run.id in standalone for batch processing.
mynameborat commented on a change in pull request #938: SAMZA-1531: Support run.id in standalone for batch processing. URL: https://github.com/apache/samza/pull/938#discussion_r284820619 ## File path: samza-core/src/main/java/org/apache/samza/coordinator/RunIdGenerator.java ## @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.coordinator; + +import com.google.common.base.Preconditions; +import java.io.UnsupportedEncodingException; +import java.time.Duration; +import java.util.Optional; +import java.util.UUID; +import org.apache.samza.SamzaException; +import org.apache.samza.metadatastore.MetadataStore; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Generates RunId for Standalone use case + * If there is only one processor in the quorum (registered with ClusterMembership) then create new runid and add to store + * Else read runid from the store + * + * Steps to generate: + * 1. acquire lock + * 2. add self to quorum (register itself with ClusterMembership) + * 3. get number of processors in quorum + * 4. if qurorum size is 1 (only self) then create new runid and write to store + * 5. if quorum size if greater than 1 then read runid from store + * 6. unlock + */ +public class RunIdGenerator { + private static final Logger LOG = LoggerFactory.getLogger(RunIdGenerator.class); + + private final CoordinationUtils coordinationUtils; + private final MetadataStore metadataStore; + private final ClusterMembership clusterMembership; + private String processorId = null; + private boolean closed = false; Review comment: should be volatile? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [samza] mynameborat commented on a change in pull request #938: SAMZA-1531: Support run.id in standalone for batch processing.
mynameborat commented on a change in pull request #938: SAMZA-1531: Support run.id in standalone for batch processing. URL: https://github.com/apache/samza/pull/938#discussion_r284824438 ## File path: samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java ## @@ -77,20 +93,81 @@ */ public LocalApplicationRunner(SamzaApplication app, Config config) { this.appDesc = ApplicationDescriptorUtil.getAppDescriptor(app, config); -this.planner = new LocalJobPlanner(appDesc); +isAppModeBatch = new ApplicationConfig(config).getAppMode() == ApplicationConfig.ApplicationMode.BATCH; +coordinationUtils = getCoordinationUtils(config); } /** * Constructor only used in unit test to allow injection of {@link LocalJobPlanner} */ @VisibleForTesting - LocalApplicationRunner(ApplicationDescriptorImpl appDesc, LocalJobPlanner planner) { + LocalApplicationRunner(ApplicationDescriptorImpl appDesc, Optional coordinationUtils) { this.appDesc = appDesc; -this.planner = planner; +isAppModeBatch = new ApplicationConfig(appDesc.getConfig()).getAppMode() == ApplicationConfig.ApplicationMode.BATCH; +this.coordinationUtils = coordinationUtils; + } + + private Optional getCoordinationUtils(Config config) { +if (!isAppModeBatch) { + return Optional.empty(); +} +JobCoordinatorConfig jcConfig = new JobCoordinatorConfig(config); +CoordinationUtils coordinationUtils = jcConfig.getCoordinationUtilsFactory().getCoordinationUtils(CoordinationConstants.APPLICATION_RUNNER_PATH_SUFFIX, PROCESSOR_ID, config); +return Optional.ofNullable(coordinationUtils); + } + + /** + * @return LocalJobPlanner created + */ + @VisibleForTesting + LocalJobPlanner getPlanner() { +boolean isAppModeBatch = new ApplicationConfig(appDesc.getConfig()).getAppMode() == ApplicationConfig.ApplicationMode.BATCH; +if (!isAppModeBatch) { + return new LocalJobPlanner(appDesc, PROCESSOR_ID); +} +CoordinationUtils coordinationUtils = null; Review comment: Can be simplified to `coordinationUtils = this.coordinationUtils.orElse(null);` `runId = this.runId.orElse(null)` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [samza] mynameborat commented on a change in pull request #938: SAMZA-1531: Support run.id in standalone for batch processing.
mynameborat commented on a change in pull request #938: SAMZA-1531: Support run.id in standalone for batch processing. URL: https://github.com/apache/samza/pull/938#discussion_r284821188 ## File path: samza-core/src/main/java/org/apache/samza/coordinator/RunIdGenerator.java ## @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.coordinator; + +import com.google.common.base.Preconditions; +import java.io.UnsupportedEncodingException; +import java.time.Duration; +import java.util.Optional; +import java.util.UUID; +import org.apache.samza.SamzaException; +import org.apache.samza.metadatastore.MetadataStore; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Generates RunId for Standalone use case + * If there is only one processor in the quorum (registered with ClusterMembership) then create new runid and add to store + * Else read runid from the store + * + * Steps to generate: + * 1. acquire lock + * 2. add self to quorum (register itself with ClusterMembership) + * 3. get number of processors in quorum + * 4. if qurorum size is 1 (only self) then create new runid and write to store + * 5. if quorum size if greater than 1 then read runid from store + * 6. unlock + */ +public class RunIdGenerator { + private static final Logger LOG = LoggerFactory.getLogger(RunIdGenerator.class); + + private final CoordinationUtils coordinationUtils; + private final MetadataStore metadataStore; + private final ClusterMembership clusterMembership; + private String processorId = null; + private boolean closed = false; + + public RunIdGenerator(CoordinationUtils coordinationUtils, MetadataStore metadataStore) { +Preconditions.checkNotNull(coordinationUtils, "CoordinationUtils cannot be null"); +Preconditions.checkNotNull(metadataStore, "MetadataStore cannot be null"); +this.coordinationUtils = coordinationUtils; +this.metadataStore = metadataStore; +this.clusterMembership = coordinationUtils.getClusterMembership(); +Preconditions.checkNotNull(this.clusterMembership, "Failed to create utils for run id generation"); + } + + public Optional getRunId() { +DistributedLock runIdLock; +String runId = null; + +runIdLock = coordinationUtils.getLock(CoordinationConstants.RUNID_LOCK_ID); +if (runIdLock == null) { + throw new SamzaException("Failed to create utils for run id generation"); +} + +try { + // acquire lock to write or read run.id + if (runIdLock.lock(Duration.ofMillis(CoordinationConstants.LOCK_TIMEOUT_MS))) { +LOG.info("lock acquired for run.id generation by this processor"); +processorId = clusterMembership.registerProcessor(); +int numberOfActiveProcessors = clusterMembership.getNumberOfProcessors(); +if (numberOfActiveProcessors == 0) { + String msg = String.format("Processor failed to fetch number of processors for run.id generation"); + throw new SamzaException(msg); +} +if (numberOfActiveProcessors == 1) { + runId = + String.valueOf(System.currentTimeMillis()) + "-" + UUID.randomUUID().toString().substring(0, 8); + LOG.info("Writing the run id for this run as {}", runId); + metadataStore.put(CoordinationConstants.RUNID_STORE_KEY, runId.getBytes("UTF-8")); +} else { + runId = new String(metadataStore.get(CoordinationConstants.RUNID_STORE_KEY)); + LOG.info("Read the run id for this run as {}", runId); +} +runIdLock.unlock(); + } else { +throw new SamzaException("Processor timed out waiting to acquire lock for run.id generation"); + } +} catch (UnsupportedEncodingException e) { + throw new SamzaException("Processor could not serialize/deserialize string for run.id generation", e); +} +return Optional.ofNullable(runId); + } + + /** + * might be called several times and hence should be idempotent + */ + public void close() { +if (!closed && clusterMembership != null && processorId != null) { Review comment: `clusterMembership` is never null right? We ensure that in the constructor.
[GitHub] [samza] xinyuiscool commented on a change in pull request #1034: SAMZA-2185: Add ability to expose remote store specific features in remote table
xinyuiscool commented on a change in pull request #1034: SAMZA-2185: Add ability to expose remote store specific features in remote table URL: https://github.com/apache/samza/pull/1034#discussion_r284824006 ## File path: samza-api/src/main/java/org/apache/samza/table/ReadWriteTable.java ## @@ -59,33 +61,37 @@ * * @param key the key with which the specified {@code value} is to be associated. * @param value the value with which the specified {@code key} is to be associated. + * @param args additional arguments * @throws NullPointerException if the specified {@code key} is {@code null}. */ - void put(K key, V value); + void put(K key, V value, Object ... args); /** * Updates the mappings of the specified key-value {@code entries}. * * A key is deleted from the table if its corresponding value is {@code null}. * * @param entries the updated mappings to put into this table. + * @param args additional arguments * @throws NullPointerException if any of the specified {@code entries} has {@code null} as key. */ - void putAll(List> entries); + void putAll(List> entries, Object ... args); /** * Deletes the mapping for the specified {@code key} from this table (if such mapping exists). * * @param key the key for which the mapping is to be deleted. + * @param args additional arguments * @throws NullPointerException if the specified {@code key} is {@code null}. */ - void delete(K key); + void delete(K key, Object ... args); /** * Deletes the mappings for the specified {@code keys} from this table. * * @param keys the keys for which the mappings are to be deleted. + * @param args additional arguments * @throws NullPointerException if the specified {@code keys} list, or any of the keys, is {@code null}. */ - void deleteAll(List keys); + void deleteAll(List keys, Object ... args); } Review comment: Shall we also add read() and write() too? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [samza] cameronlee314 commented on a change in pull request #1027: SAMZA-2046: Startpoint fan out implementation
cameronlee314 commented on a change in pull request #1027: SAMZA-2046: Startpoint fan out implementation URL: https://github.com/apache/samza/pull/1027#discussion_r284821797 ## File path: samza-core/src/main/java/org/apache/samza/startpoint/StartpointObjectMapper.java ## @@ -39,12 +40,16 @@ static ObjectMapper getObjectMapper() { SimpleModule module = new SimpleModule("StartpointModule", new Version(1, 0, 0, "")); module.addSerializer(Instant.class, new CustomInstantSerializer()); module.addDeserializer(Instant.class, new CustomInstantDeserializer()); - objectMapper.registerModule(module); -objectMapper.registerSubtypes(StartpointSpecific.class); -objectMapper.registerSubtypes(StartpointTimestamp.class); -objectMapper.registerSubtypes(StartpointUpcoming.class); -objectMapper.registerSubtypes(StartpointOldest.class); + +// 1. To support polymorphism for serialization, the Startpoint subtypes must be registered here. +// 2. The NamedType container class provides a logical name as an external identifier so that the full canonical Review comment: Sounds good. Just wanted to make sure there wasn't any gotcha with using the new way. I didn't need it to be easier than the old way. It's good that it makes the serialization more compact. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Resolved] (SAMZA-2195) Fix javadoc in SerdeUtils
[ https://issues.apache.org/jira/browse/SAMZA-2195?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Wei Song resolved SAMZA-2195. - Resolution: Fixed > Fix javadoc in SerdeUtils > - > > Key: SAMZA-2195 > URL: https://issues.apache.org/jira/browse/SAMZA-2195 > Project: Samza > Issue Type: Improvement >Reporter: Wei Song >Assignee: Wei Song >Priority: Major > Time Spent: 20m > Remaining Estimate: 0h > > As per subject -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[samza] branch master updated (d7b5034 -> 57cce59)
This is an automated email from the ASF dual-hosted git repository. weisong44 pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/samza.git. from d7b5034 Disable flaky test in samza-sql (#1033) new 3f7ed71 Added self to committer list new 5cbf9af Merge remote-tracking branch 'upstream/master' new a15a7c9 Merge remote-tracking branch 'upstream/master' new aae0f38 Merge remote-tracking branch 'upstream/master' new 0440f75 Merge remote-tracking branch 'upstream/master' new 4782c61 Merge remote-tracking branch 'upstream/master' new f28b491 Merge remote-tracking branch 'upstream/master' new df2f8d7 Merge remote-tracking branch 'upstream/master' new de708f5 Merge remote-tracking branch 'upstream/master' new 5156239 Merge remote-tracking branch 'upstream/master' new eca0020 Merge remote-tracking branch 'upstream/master' new 239a095 Merge remote-tracking branch 'upstream/master' new 41299b5 Merge remote-tracking branch 'upstream/master' new a6c94ad Merge remote-tracking branch 'upstream/master' new 1c6a2ea Merge remote-tracking branch 'upstream/master' new 8ee7844 Merge remote-tracking branch 'upstream/master' new e19b4dc Merge remote-tracking branch 'upstream/master' new ec7d840 Merge remote-tracking branch 'upstream/master' new 242d844 Merge remote-tracking branch 'upstream/master' new c85604e Merge remote-tracking branch 'upstream/master' new 1e5de45 Merge remote-tracking branch 'upstream/master' new f5731b1 Merge remote-tracking branch 'upstream/master' new 7706ab1 Merge remote-tracking branch 'upstream/master' new f748050 Merge remote-tracking branch 'upstream/master' new 05822f0 Merge remote-tracking branch 'upstream/master' new 097958c Merge remote-tracking branch 'upstream/master' new a56c28d Merge remote-tracking branch 'upstream/master' new 2c679c3 Merge remote-tracking branch 'upstream/master' new a06e8ec Merge remote-tracking branch 'upstream/master' new 7c777fe Merge remote-tracking branch 'upstream/master' new c9e8bf7 Merge remote-tracking branch 'upstream/master' new a53e562 SAMZA-1964 Make getTableSpec() in RemoteTableDescriptor reentrant new 89bfc14 Merge remote-tracking branch 'upstream/master' new 9c12120 Merge remote-tracking branch 'upstream/master' new e82daa1 Merge remote-tracking branch 'upstream/master' new b0c544a Merge remote-tracking branch 'upstream/master' new cc49da0 Merge remote-tracking branch 'upstream/master' new cd14501 Merge remote-tracking branch 'upstream/master' new 043523d Merge remote-tracking branch 'upstream/master' new 9f6aa1f Merge remote-tracking branch 'upstream/master' new 9e05362 Merge remote-tracking branch 'upstream/master' new 49ea480 Merge remote-tracking branch 'upstream/master' new 160f1d7 Merge remote-tracking branch 'upstream/master' new 094ceb4 Merge remote-tracking branch 'upstream/master' new 25a4a16 Merge remote-tracking branch 'upstream/master' new cfd483e Merge remote-tracking branch 'upstream/master' new c738ee6 Merge remote-tracking branch 'upstream/master' new d8bf7fb Merge remote-tracking branch 'upstream/master' new 327b076 Merge remote-tracking branch 'upstream/master' new eee8d36 Merge remote-tracking branch 'upstream/master' new 0e719e4 Merge remote-tracking branch 'upstream/master' new 28b7a88 Merge remote-tracking branch 'upstream/master' new a824db6 Merge remote-tracking branch 'upstream/master' new 9127d92 Merge remote-tracking branch 'upstream/master' new 5fbe3b5 Merge remote-tracking branch 'upstream/master' new 6bea33c Merge remote-tracking branch 'upstream/master' new f871463 Merge remote-tracking branch 'upstream/master' new 77fd2c3 Merge remote-tracking branch 'upstream/master' new f11648b Merge remote-tracking branch 'upstream/master' new 5e28a65 Merge remote-tracking branch 'upstream/master' new b8c8a15 Merge remote-tracking branch 'upstream/master' new 9f3a1e7 Merge remote-tracking branch 'upstream/master' new 8c1821e Merge remote-tracking branch 'upstream/master' new b5abcf2 Merge remote-tracking branch 'upstream/master' new 22bd3ac Merge remote-tracking branch 'upstream/master' new 3373012 Merge remote-tracking branch 'upstream/master' new 60b42d6 Merge remote-tracking branch 'upstream/master' new fc2d5b9 Merge remote-tracking branch 'upstream/master' new 1854072 SAMZA-2195: Fix javadoc in SerdeUtils new 57cce59 Merge pull request #1035 from weisong44/SAMZA-2195 The 1764 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to
[GitHub] [samza] weisong44 merged pull request #1035: SAMZA-2195: Fix javadoc in SerdeUtils
weisong44 merged pull request #1035: SAMZA-2195: Fix javadoc in SerdeUtils URL: https://github.com/apache/samza/pull/1035 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Created] (SAMZA-2195) Fix javadoc in SerdeUtils
Wei Song created SAMZA-2195: --- Summary: Fix javadoc in SerdeUtils Key: SAMZA-2195 URL: https://issues.apache.org/jira/browse/SAMZA-2195 Project: Samza Issue Type: Improvement Reporter: Wei Song Assignee: Wei Song As per subject -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [samza] weisong44 opened a new pull request #1035: SAMZA-2195: Fix javadoc in SerdeUtils
weisong44 opened a new pull request #1035: SAMZA-2195: Fix javadoc in SerdeUtils URL: https://github.com/apache/samza/pull/1035 As per subject This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [samza] dnishimura commented on a change in pull request #1027: SAMZA-2046: Startpoint fan out implementation
dnishimura commented on a change in pull request #1027: SAMZA-2046: Startpoint fan out implementation URL: https://github.com/apache/samza/pull/1027#discussion_r284799604 ## File path: samza-core/src/main/java/org/apache/samza/startpoint/StartpointObjectMapper.java ## @@ -39,12 +40,16 @@ static ObjectMapper getObjectMapper() { SimpleModule module = new SimpleModule("StartpointModule", new Version(1, 0, 0, "")); module.addSerializer(Instant.class, new CustomInstantSerializer()); module.addDeserializer(Instant.class, new CustomInstantDeserializer()); - objectMapper.registerModule(module); -objectMapper.registerSubtypes(StartpointSpecific.class); -objectMapper.registerSubtypes(StartpointTimestamp.class); -objectMapper.registerSubtypes(StartpointUpcoming.class); -objectMapper.registerSubtypes(StartpointOldest.class); + +// 1. To support polymorphism for serialization, the Startpoint subtypes must be registered here. +// 2. The NamedType container class provides a logical name as an external identifier so that the full canonical Review comment: Not necessarily easier, but it seems to be a best practice based on the examples I've seen. Underneath the hood, it just uses the simple class name and not the full canonical name, so ordering doesn't matter. The `NamedType` container constructor also has a 2nd optional parameter where we can specify the logical name ourselves too. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [samza] cameronlee314 commented on a change in pull request #1027: SAMZA-2046: Startpoint fan out implementation
cameronlee314 commented on a change in pull request #1027: SAMZA-2046: Startpoint fan out implementation URL: https://github.com/apache/samza/pull/1027#discussion_r284796109 ## File path: samza-core/src/main/java/org/apache/samza/startpoint/StartpointObjectMapper.java ## @@ -39,12 +40,16 @@ static ObjectMapper getObjectMapper() { SimpleModule module = new SimpleModule("StartpointModule", new Version(1, 0, 0, "")); module.addSerializer(Instant.class, new CustomInstantSerializer()); module.addDeserializer(Instant.class, new CustomInstantDeserializer()); - objectMapper.registerModule(module); -objectMapper.registerSubtypes(StartpointSpecific.class); -objectMapper.registerSubtypes(StartpointTimestamp.class); -objectMapper.registerSubtypes(StartpointUpcoming.class); -objectMapper.registerSubtypes(StartpointOldest.class); + +// 1. To support polymorphism for serialization, the Startpoint subtypes must be registered here. +// 2. The NamedType container class provides a logical name as an external identifier so that the full canonical Review comment: Just double checking: Does the "logical name" allow for easily adding new types in the future? Is there a certain way new types need to be added? For example, if the "logical name" is just a counter, then we would need to add a new subtype at the end of the list. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [samza] dnishimura commented on a change in pull request #1027: SAMZA-2046: Startpoint fan out implementation
dnishimura commented on a change in pull request #1027: SAMZA-2046: Startpoint fan out implementation URL: https://github.com/apache/samza/pull/1027#discussion_r284751424 ## File path: samza-api/src/main/java/org/apache/samza/startpoint/Startpoint.java ## @@ -22,11 +22,13 @@ import com.google.common.base.Objects; import java.time.Instant; import org.apache.samza.annotation.InterfaceStability; +import org.codehaus.jackson.annotate.JsonTypeInfo; /** * Startpoint represents a position in a stream partition. */ @InterfaceStability.Evolving +@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, property = "@type") Review comment: [There seems to be a way](https://stackoverflow.com/a/41982776), but it's not working when I try it out for some reason. It may be due to the fact that we're on a really old version of `jackson`. The version we're using is so old that the recent versions of `jackson` have different package names. I'll leave the annotation for now and investigate moving us to a newer version of jackson in a separate PR/ticket. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [samza] dnishimura commented on a change in pull request #1027: SAMZA-2046: Startpoint fan out implementation
dnishimura commented on a change in pull request #1027: SAMZA-2046: Startpoint fan out implementation URL: https://github.com/apache/samza/pull/1027#discussion_r284754923 ## File path: samza-core/src/main/java/org/apache/samza/startpoint/StartpointManager.java ## @@ -139,21 +169,26 @@ public Startpoint readStartpoint(SystemStreamPartition ssp) { * @param taskName The {@link TaskName} to fetch the {@link Startpoint} for. * @return {@link Startpoint} for the {@link SystemStreamPartition}, or null if it does not exist or if it is too stale. Review comment: Will do. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [samza] dnishimura commented on a change in pull request #1027: SAMZA-2046: Startpoint fan out implementation
dnishimura commented on a change in pull request #1027: SAMZA-2046: Startpoint fan out implementation URL: https://github.com/apache/samza/pull/1027#discussion_r284754194 ## File path: samza-core/src/main/java/org/apache/samza/startpoint/StartpointManager.java ## @@ -129,7 +159,7 @@ public void writeStartpoint(SystemStreamPartition ssp, TaskName taskName, Startp * @param ssp The {@link SystemStreamPartition} to fetch the {@link Startpoint} for. * @return {@link Startpoint} for the {@link SystemStreamPartition}, or null if it does not exist or if it is too stale Review comment: Will do. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [samza] dnishimura commented on a change in pull request #1027: SAMZA-2046: Startpoint fan out implementation
dnishimura commented on a change in pull request #1027: SAMZA-2046: Startpoint fan out implementation URL: https://github.com/apache/samza/pull/1027#discussion_r284753937 ## File path: samza-core/src/main/java/org/apache/samza/startpoint/StartpointManager.java ## @@ -20,87 +20,117 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; -import com.google.common.collect.ImmutableSet; +import com.google.common.collect.HashMultimap; +import com.google.common.collect.ImmutableMap; +import java.io.IOException; import java.time.Duration; import java.time.Instant; -import java.util.HashSet; -import java.util.Objects; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; import java.util.Set; +import java.util.stream.Collectors; +import org.apache.commons.collections4.CollectionUtils; +import org.apache.commons.collections4.MapUtils; +import org.apache.commons.lang3.ArrayUtils; +import org.apache.commons.lang3.StringUtils; import org.apache.samza.SamzaException; -import org.apache.samza.config.Config; import org.apache.samza.container.TaskName; import org.apache.samza.coordinator.metadatastore.NamespaceAwareCoordinatorStreamStore; -import org.apache.samza.job.model.ContainerModel; -import org.apache.samza.job.model.JobModel; -import org.apache.samza.job.model.TaskModel; import org.apache.samza.metadatastore.MetadataStore; -import org.apache.samza.metadatastore.MetadataStoreFactory; -import org.apache.samza.metrics.MetricsRegistry; -import org.apache.samza.serializers.JsonSerdeV2; import org.apache.samza.system.SystemStreamPartition; +import org.codehaus.jackson.map.ObjectMapper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * The StartpointManager reads and writes {@link Startpoint} to the {@link MetadataStore} defined by - * the configuration task.startpoint.metadata.store.factory. - * - * Startpoints are keyed in the MetadataStore by two different formats: - * 1) Only by {@link SystemStreamPartition} - * 2) A combination of {@link SystemStreamPartition} and {@link TaskName} + * The StartpointManager reads and writes {@link Startpoint} to the provided {@link MetadataStore} * * The intention for the StartpointManager is to maintain a strong contract between the caller * and how Startpoints are stored in the underlying MetadataStore. + * + * Startpoints are written in the MetadataStore using keys of two different formats: + * 1) {@link SystemStreamPartition} only + * 2) A combination of {@link SystemStreamPartition} and {@link TaskName} + * + * Startpoints are then fanned out to a fan out namespace in the MetadataStore by the + * {@link org.apache.samza.clustermanager.ClusterBasedJobCoordinator} or the standalone + * {@link org.apache.samza.coordinator.JobCoordinator} upon startup and the + * {@link org.apache.samza.checkpoint.OffsetManager} gets the fan outs to set the starting offsets per task and per + * {@link SystemStreamPartition}. The fan outs are deleted once the offsets are committed to the checkpoint. + * + * The read, write and delete methods are intended for external callers. + * The fan out methods are intended to be used within a job coordinator. */ public class StartpointManager { - private static final Logger LOG = LoggerFactory.getLogger(StartpointManager.class); - public static final String NAMESPACE = "samza-startpoint-v1"; + private static final Integer VERSION = 1; + public static final String NAMESPACE = "samza-startpoint-v" + VERSION; static final Duration DEFAULT_EXPIRATION_DURATION = Duration.ofHours(12); - private final MetadataStore metadataStore; - private final StartpointSerde startpointSerde = new StartpointSerde(); - - private boolean stopped = false; + private static final Logger LOG = LoggerFactory.getLogger(StartpointManager.class); + private static final String NAMESPACE_FAN_OUT = NAMESPACE + "-fan-out"; - /** - * Constructs a {@link StartpointManager} instance by instantiating a new metadata store connection. - * This is primarily used for testing. - */ - @VisibleForTesting - StartpointManager(MetadataStoreFactory metadataStoreFactory, Config config, MetricsRegistry metricsRegistry) { -Preconditions.checkNotNull(metadataStoreFactory, "MetadataStoreFactory cannot be null"); -Preconditions.checkNotNull(config, "Config cannot be null"); -Preconditions.checkNotNull(metricsRegistry, "MetricsRegistry cannot be null"); + private final MetadataStore metadataStore; Review comment: True, it's no longer needed. Will remove. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [samza] dnishimura commented on a change in pull request #1027: SAMZA-2046: Startpoint fan out implementation
dnishimura commented on a change in pull request #1027: SAMZA-2046: Startpoint fan out implementation URL: https://github.com/apache/samza/pull/1027#discussion_r284756502 ## File path: samza-core/src/main/java/org/apache/samza/startpoint/StartpointObjectMapper.java ## @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.startpoint; + +import java.io.IOException; +import java.time.Instant; +import org.codehaus.jackson.JsonGenerator; +import org.codehaus.jackson.JsonParser; +import org.codehaus.jackson.JsonProcessingException; +import org.codehaus.jackson.Version; +import org.codehaus.jackson.map.DeserializationContext; +import org.codehaus.jackson.map.JsonDeserializer; +import org.codehaus.jackson.map.JsonSerializer; +import org.codehaus.jackson.map.ObjectMapper; +import org.codehaus.jackson.map.SerializerProvider; +import org.codehaus.jackson.map.module.SimpleModule; + + +class StartpointObjectMapper { + + static ObjectMapper getObjectMapper() { +ObjectMapper objectMapper = new ObjectMapper(); +SimpleModule module = new SimpleModule("StartpointModule", new Version(1, 0, 0, "")); +module.addSerializer(Instant.class, new CustomInstantSerializer()); +module.addDeserializer(Instant.class, new CustomInstantDeserializer()); + +objectMapper.registerModule(module); Review comment: I can't think of any way other than that the new serde tests for any new Startpoints will surface the requirement for registering the subtype. I'll leave a comment here also. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [samza] dnishimura commented on a change in pull request #1027: SAMZA-2046: Startpoint fan out implementation
dnishimura commented on a change in pull request #1027: SAMZA-2046: Startpoint fan out implementation URL: https://github.com/apache/samza/pull/1027#discussion_r284753684 ## File path: samza-core/src/main/java/org/apache/samza/startpoint/StartpointFanOutPerTask.java ## @@ -20,33 +20,45 @@ import com.google.common.base.MoreObjects; import com.google.common.base.Objects; -import org.apache.samza.container.TaskName; +import java.time.Instant; +import java.util.HashMap; +import java.util.Map; +import org.apache.samza.serializers.model.SamzaObjectMapper; import org.apache.samza.system.SystemStreamPartition; +import org.codehaus.jackson.map.annotate.JsonDeserialize; import org.codehaus.jackson.map.annotate.JsonSerialize; -@JsonSerialize(using = StartpointKeySerializer.class) -class StartpointKey { - private final SystemStreamPartition systemStreamPartition; - private final TaskName taskName; +/** + * Holds the {@link Startpoint} fan outs for each {@link SystemStreamPartition}. Each StartpointFanOutPerTask maps to a + * {@link org.apache.samza.container.TaskName} + */ +class StartpointFanOutPerTask { + @JsonSerialize + @JsonDeserialize + private final Instant timestamp; + + @JsonDeserialize(keyUsing = SamzaObjectMapper.SystemStreamPartitionKeyDeserializer.class) Review comment: Related to the above comment, adding it to the `StartpointObjectMapper` via `SimpleModule#addKeySerializer` and `SimpleModule#addKeyDeserializer` doesn't seem to work when the map is nested in an object. Again, might be due to the old version of `jackson` we're on. I'll let comments to remove the annotations if we can consolidate all serialization code into the `StartpointObjectMapper` with a newer version of `jackson`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [samza] dnishimura commented on a change in pull request #1027: SAMZA-2046: Startpoint fan out implementation
dnishimura commented on a change in pull request #1027: SAMZA-2046: Startpoint fan out implementation URL: https://github.com/apache/samza/pull/1027#discussion_r284757971 ## File path: samza-core/src/test/java/org/apache/samza/startpoint/TestStartpointSerde.java ## @@ -18,45 +18,47 @@ */ package org.apache.samza.startpoint; +import java.io.IOException; +import org.codehaus.jackson.map.ObjectMapper; import org.junit.Assert; import org.junit.Test; public class TestStartpointSerde { Review comment: Good point, will do This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [samza] dnishimura commented on a change in pull request #1027: SAMZA-2046: Startpoint fan out implementation
dnishimura commented on a change in pull request #1027: SAMZA-2046: Startpoint fan out implementation URL: https://github.com/apache/samza/pull/1027#discussion_r284754067 ## File path: samza-core/src/main/java/org/apache/samza/startpoint/StartpointManager.java ## @@ -20,87 +20,117 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; -import com.google.common.collect.ImmutableSet; +import com.google.common.collect.HashMultimap; +import com.google.common.collect.ImmutableMap; +import java.io.IOException; import java.time.Duration; import java.time.Instant; -import java.util.HashSet; -import java.util.Objects; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; import java.util.Set; +import java.util.stream.Collectors; +import org.apache.commons.collections4.CollectionUtils; +import org.apache.commons.collections4.MapUtils; +import org.apache.commons.lang3.ArrayUtils; +import org.apache.commons.lang3.StringUtils; import org.apache.samza.SamzaException; -import org.apache.samza.config.Config; import org.apache.samza.container.TaskName; import org.apache.samza.coordinator.metadatastore.NamespaceAwareCoordinatorStreamStore; -import org.apache.samza.job.model.ContainerModel; -import org.apache.samza.job.model.JobModel; -import org.apache.samza.job.model.TaskModel; import org.apache.samza.metadatastore.MetadataStore; -import org.apache.samza.metadatastore.MetadataStoreFactory; -import org.apache.samza.metrics.MetricsRegistry; -import org.apache.samza.serializers.JsonSerdeV2; import org.apache.samza.system.SystemStreamPartition; +import org.codehaus.jackson.map.ObjectMapper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * The StartpointManager reads and writes {@link Startpoint} to the {@link MetadataStore} defined by - * the configuration task.startpoint.metadata.store.factory. - * - * Startpoints are keyed in the MetadataStore by two different formats: - * 1) Only by {@link SystemStreamPartition} - * 2) A combination of {@link SystemStreamPartition} and {@link TaskName} + * The StartpointManager reads and writes {@link Startpoint} to the provided {@link MetadataStore} * * The intention for the StartpointManager is to maintain a strong contract between the caller * and how Startpoints are stored in the underlying MetadataStore. + * + * Startpoints are written in the MetadataStore using keys of two different formats: + * 1) {@link SystemStreamPartition} only + * 2) A combination of {@link SystemStreamPartition} and {@link TaskName} + * + * Startpoints are then fanned out to a fan out namespace in the MetadataStore by the + * {@link org.apache.samza.clustermanager.ClusterBasedJobCoordinator} or the standalone + * {@link org.apache.samza.coordinator.JobCoordinator} upon startup and the + * {@link org.apache.samza.checkpoint.OffsetManager} gets the fan outs to set the starting offsets per task and per + * {@link SystemStreamPartition}. The fan outs are deleted once the offsets are committed to the checkpoint. + * + * The read, write and delete methods are intended for external callers. + * The fan out methods are intended to be used within a job coordinator. */ public class StartpointManager { - private static final Logger LOG = LoggerFactory.getLogger(StartpointManager.class); - public static final String NAMESPACE = "samza-startpoint-v1"; + private static final Integer VERSION = 1; + public static final String NAMESPACE = "samza-startpoint-v" + VERSION; static final Duration DEFAULT_EXPIRATION_DURATION = Duration.ofHours(12); - private final MetadataStore metadataStore; - private final StartpointSerde startpointSerde = new StartpointSerde(); - - private boolean stopped = false; + private static final Logger LOG = LoggerFactory.getLogger(StartpointManager.class); + private static final String NAMESPACE_FAN_OUT = NAMESPACE + "-fan-out"; - /** - * Constructs a {@link StartpointManager} instance by instantiating a new metadata store connection. - * This is primarily used for testing. - */ - @VisibleForTesting - StartpointManager(MetadataStoreFactory metadataStoreFactory, Config config, MetricsRegistry metricsRegistry) { -Preconditions.checkNotNull(metadataStoreFactory, "MetadataStoreFactory cannot be null"); -Preconditions.checkNotNull(config, "Config cannot be null"); -Preconditions.checkNotNull(metricsRegistry, "MetricsRegistry cannot be null"); + private final MetadataStore metadataStore; + private final NamespaceAwareCoordinatorStreamStore fanOutStore; + private final NamespaceAwareCoordinatorStreamStore readWriteStore; + private final ObjectMapper objectMapper = StartpointObjectMapper.getObjectMapper(); -this.metadataStore = metadataStoreFactory.getMetadataStore(NAMESPACE, config, metricsRegistry); -LOG.info("StartpointManager created with metadata store: {}",
[GitHub] [samza] dnishimura commented on a change in pull request #1027: SAMZA-2046: Startpoint fan out implementation
dnishimura commented on a change in pull request #1027: SAMZA-2046: Startpoint fan out implementation URL: https://github.com/apache/samza/pull/1027#discussion_r284756719 ## File path: samza-core/src/test/java/org/apache/samza/startpoint/TestStartpointFanOutPerTaskSerde.java ## @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.startpoint; + +import java.io.IOException; +import java.time.Instant; +import org.apache.samza.Partition; +import org.apache.samza.system.SystemStreamPartition; +import org.codehaus.jackson.map.ObjectMapper; +import org.junit.Assert; +import org.junit.Test; + + +public class TestStartpointFanOutPerTaskSerde { Review comment: Sure This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services