Repository: beam
Updated Branches:
refs/heads/master 168d6a18f -> a27d3cfa2
Gives the runner access to RestrictionTracker
Changes the SplittableParDo transform so that ProcessFn uses a
runner-supplied hook to run the @ProcessElement method, giving
it, among other things, the RestrictionTracker, so the runner
can initiate checkpointing/splitting with it at will.
Introduces a default implementation of said hook, which limits
the number of outputs and duration of the call. This implementation
is used in tests and in Direct runner. Dataflow Streaming runner
will also use this implementation, while Dataflow Batch runner
will be more sophisticated.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/7dc9e86f
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/7dc9e86f
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/7dc9e86f
Branch: refs/heads/master
Commit: 7dc9e86fa76e5117bfc6825e120ed74ba3d2f910
Parents: 38208ea
Author: Eugene Kirpichov
Authored: Fri Nov 18 11:21:19 2016 -0800
Committer: Kenneth Knowles
Committed: Wed Feb 1 19:43:29 2017 -0800
--
.../apache/beam/runners/core/DoFnAdapters.java | 4 +-
...eBoundedSplittableProcessElementInvoker.java | 285 +++
.../beam/runners/core/SimpleDoFnRunner.java | 6 +-
.../beam/runners/core/SplittableParDo.java | 190 ++---
.../core/SplittableProcessElementInvoker.java | 65 +
...ndedSplittableProcessElementInvokerTest.java | 146 ++
.../beam/runners/core/SplittableParDoTest.java | 147 +++---
...littableProcessElementsEvaluatorFactory.java | 66 +++--
.../apache/beam/sdk/transforms/DoFnTester.java | 15 +-
.../sdk/transforms/reflect/DoFnInvoker.java | 4 +-
.../transforms/splittabledofn/OffsetRange.java | 71 +
.../splittabledofn/OffsetRangeTracker.java | 75 +
.../splittabledofn/RestrictionTracker.java | 2 +-
.../beam/sdk/transforms/SplittableDoFnTest.java | 68 +
.../transforms/reflect/DoFnInvokersTest.java| 2 +-
.../splittabledofn/OffsetRangeTrackerTest.java | 111
16 files changed, 956 insertions(+), 301 deletions(-)
--
http://git-wip-us.apache.org/repos/asf/beam/blob/7dc9e86f/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnAdapters.java
--
diff --git
a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnAdapters.java
b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnAdapters.java
index dcd7969..693cb2f 100644
---
a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnAdapters.java
+++
b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnAdapters.java
@@ -204,7 +204,7 @@ public class DoFnAdapters {
}
@Override
-public RestrictionTracker
restrictionTracker() {
+public RestrictionTracker restrictionTracker() {
throw new UnsupportedOperationException("This is a non-splittable DoFn");
}
@@ -306,7 +306,7 @@ public class DoFnAdapters {
}
@Override
-public RestrictionTracker
restrictionTracker() {
+public RestrictionTracker restrictionTracker() {
throw new UnsupportedOperationException("This is a non-splittable DoFn");
}
http://git-wip-us.apache.org/repos/asf/beam/blob/7dc9e86f/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java
--
diff --git
a/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java
b/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java
new file mode 100644
index 000..5aa7605
--- /dev/null
+++
b/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java
@@ -0,0 +1,285 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ *
Repository: beam
Updated Branches:
refs/heads/master 96a05a4b4 -> 08e58e1df
Add license header to website Jenkins job descriptions
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/838ef04d
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/838ef04d
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/838ef04d
Branch: refs/heads/master
Commit: 838ef04da912091db6f83be2fe48a9a3525e8279
Parents: 96a05a4
Author: Kenneth Knowles
Authored: Thu Feb 2 19:21:46 2017 -0800
Committer: Kenneth Knowles
Committed: Thu Feb 2 19:21:46 2017 -0800
--
.jenkins/job_beam_PreCommit_Website_Stage.groovy | 18 ++
.jenkins/job_beam_PreCommit_Website_Test.groovy | 18 ++
2 files changed, 36 insertions(+)
--
http://git-wip-us.apache.org/repos/asf/beam/blob/838ef04d/.jenkins/job_beam_PreCommit_Website_Stage.groovy
--
diff --git a/.jenkins/job_beam_PreCommit_Website_Stage.groovy
b/.jenkins/job_beam_PreCommit_Website_Stage.groovy
index b63d40a..69be64d 100644
--- a/.jenkins/job_beam_PreCommit_Website_Stage.groovy
+++ b/.jenkins/job_beam_PreCommit_Website_Stage.groovy
@@ -1,3 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
import common_job_properties
// Defines a job.
http://git-wip-us.apache.org/repos/asf/beam/blob/838ef04d/.jenkins/job_beam_PreCommit_Website_Test.groovy
--
diff --git a/.jenkins/job_beam_PreCommit_Website_Test.groovy
b/.jenkins/job_beam_PreCommit_Website_Test.groovy
index ada089f..2b55374 100644
--- a/.jenkins/job_beam_PreCommit_Website_Test.groovy
+++ b/.jenkins/job_beam_PreCommit_Website_Test.groovy
@@ -1,3 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
import common_job_properties
// Defines a job.
Move InMemoryStateInternals to runners/core-java
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/af391b88
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/af391b88
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/af391b88
Branch: refs/heads/master
Commit: af391b88cfc38659f594799bb58b6090a7bcd3a4
Parents: 2b6698d
Author: Kenneth Knowles
Authored: Thu Jan 26 21:03:42 2017 -0800
Committer: Kenneth Knowles
Committed: Mon Feb 6 09:26:06 2017 -0800
--
.../runners/core/InMemoryStateInternals.java| 442 +++
.../core/TestInMemoryStateInternals.java| 65 +++
.../core/GroupAlsoByWindowsProperties.java | 1 -
.../core/InMemoryStateInternalsTest.java| 359 +++
.../core/MergingActiveWindowSetTest.java| 1 -
.../beam/runners/core/ReduceFnTester.java | 1 -
.../beam/runners/core/SideInputHandlerTest.java | 1 -
.../beam/runners/core/SplittableParDoTest.java | 1 -
.../triggers/TriggerStateMachineTester.java | 2 +-
.../CopyOnAccessInMemoryStateInternals.java | 12 +-
.../spark/translation/TranslationUtils.java | 3 +-
.../sdk/util/state/InMemoryStateInternals.java | 430 --
.../util/state/TestInMemoryStateInternals.java | 61 ---
.../util/state/InMemoryStateInternalsTest.java | 348 ---
14 files changed, 874 insertions(+), 853 deletions(-)
--
http://git-wip-us.apache.org/repos/asf/beam/blob/af391b88/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryStateInternals.java
--
diff --git
a/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryStateInternals.java
b/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryStateInternals.java
new file mode 100644
index 000..059e32d
--- /dev/null
+++
b/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryStateInternals.java
@@ -0,0 +1,442 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.transforms.Combine.CombineFn;
+import org.apache.beam.sdk.transforms.Combine.KeyedCombineFn;
+import
org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
+import org.apache.beam.sdk.util.CombineFnUtil;
+import org.apache.beam.sdk.util.state.AccumulatorCombiningState;
+import org.apache.beam.sdk.util.state.BagState;
+import org.apache.beam.sdk.util.state.ReadableState;
+import org.apache.beam.sdk.util.state.State;
+import org.apache.beam.sdk.util.state.StateContext;
+import org.apache.beam.sdk.util.state.StateContexts;
+import org.apache.beam.sdk.util.state.StateInternals;
+import org.apache.beam.sdk.util.state.StateNamespace;
+import org.apache.beam.sdk.util.state.StateTable;
+import org.apache.beam.sdk.util.state.StateTag;
+import org.apache.beam.sdk.util.state.StateTag.StateBinder;
+import org.apache.beam.sdk.util.state.ValueState;
+import org.apache.beam.sdk.util.state.WatermarkHoldState;
+import org.joda.time.Instant;
+
+/**
+ * In-memory implementation of {@link StateInternals}. Used in {@code
BatchModeExecutionContext}
+ * and for running tests that need state.
+ */
+@Experimental(Kind.STATE)
+public class InMemoryStateInternals implements StateInternals {
+
+ public static InMemoryStateInternals forKey(K key) {
+return new InMemoryStateInternals<>(key);
+ }
+
+ private final K key;
+
+ protected InMemoryStateInternals(K key) {
+this.key = key;
+ }
+
+ @Override
+ public K getKey() {
+
Remove comment-only uses that block runners/core-java migrations
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/1d7c6b06
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/1d7c6b06
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/1d7c6b06
Branch: refs/heads/master
Commit: 1d7c6b0638d521cf221bb7456a0e976371ca1b07
Parents: 949ab3a
Author: Kenneth Knowles
Authored: Thu Jan 26 21:09:41 2017 -0800
Committer: Kenneth Knowles
Committed: Mon Feb 6 09:26:06 2017 -0800
--
.../java/org/apache/beam/sdk/transforms/windowing/PaneInfo.java | 2 --
.../main/java/org/apache/beam/sdk/util/state/ReadableState.java | 4 ++--
2 files changed, 2 insertions(+), 4 deletions(-)
--
http://git-wip-us.apache.org/repos/asf/beam/blob/1d7c6b06/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/PaneInfo.java
--
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/PaneInfo.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/PaneInfo.java
index c9d178a..de5b1e1 100644
---
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/PaneInfo.java
+++
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/PaneInfo.java
@@ -65,8 +65,6 @@ public final class PaneInfo {
* produces a final pane, it will not be merged into any new windows.
*
* The predictions above are made using the mechanism of watermarks.
- * See {@link org.apache.beam.sdk.util.TimerInternals} for more information
- * about watermarks.
*
* We can state some properties of {@code LATE} and {@code ON_TIME}
panes, but first need some
* definitions:
http://git-wip-us.apache.org/repos/asf/beam/blob/1d7c6b06/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/ReadableState.java
--
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/ReadableState.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/ReadableState.java
index 3b4cb7b..c3e9936 100644
---
a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/ReadableState.java
+++
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/ReadableState.java
@@ -35,14 +35,14 @@ public interface ReadableState {
*
* If there will be many calls to {@link #read} for different state in
short succession,
* you should first call {@link #readLater} for all of them so the reads can
potentially be
- * batched (depending on the underlying {@link StateInternals}
implementation}.
+ * batched (depending on the underlying implementation}.
*/
T read();
/**
* Indicate that the value will be read later.
*
- * This allows a {@link StateInternals} implementation to start an
asynchronous prefetch or
+ * This allows an implementation to start an asynchronous prefetch or
* to include this state in the next batch of reads.
*
* @return this for convenient chaining
http://git-wip-us.apache.org/repos/asf/beam/blob/e77e7f0d/sdks/java/core/src/test/java/org/apache/beam/sdk/util/state/StateNamespacesTest.java
--
diff --git
a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/state/StateNamespacesTest.java
b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/state/StateNamespacesTest.java
deleted file mode 100644
index f546e56..000
---
a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/state/StateNamespacesTest.java
+++ /dev/null
@@ -1,130 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util.state;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertThat;
-
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
-import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
-import org.hamcrest.Matchers;
-import org.joda.time.Instant;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/**
- * Tests for {@link StateNamespaces}.
- */
-@RunWith(JUnit4.class)
-public class StateNamespacesTest {
-
- private final Coder intervalCoder =
IntervalWindow.getCoder();
-
- private IntervalWindow intervalWindow(long start, long end) {
-return new IntervalWindow(new Instant(start), new Instant(end));
- }
-
- /**
- * This test should not be changed. It verifies that the stringKey matches
certain expectations.
- * If this changes, the ability to reload any pipeline that has persisted
these namespaces will
- * be impacted.
- */
- @Test
- public void testStability() {
-StateNamespace global = StateNamespaces.global();
-StateNamespace intervalWindow =
-StateNamespaces.window(intervalCoder, intervalWindow(1000, 87392));
-StateNamespace intervalWindowAndTrigger =
-StateNamespaces.windowAndTrigger(intervalCoder, intervalWindow(1000,
87392), 57);
-StateNamespace globalWindow = StateNamespaces.window(
-GlobalWindow.Coder.INSTANCE, GlobalWindow.INSTANCE);
-StateNamespace globalWindowAndTrigger = StateNamespaces.windowAndTrigger(
-GlobalWindow.Coder.INSTANCE, GlobalWindow.INSTANCE, 12);
-
-assertEquals("/", global.stringKey());
-assertEquals("/gAABVWD4ogU/", intervalWindow.stringKey());
-assertEquals("/gAABVWD4ogU/1L/", intervalWindowAndTrigger.stringKey());
-assertEquals("//", globalWindow.stringKey());
-assertEquals("//C/", globalWindowAndTrigger.stringKey());
- }
-
- /**
- * Test that WindowAndTrigger namespaces are prefixed by the related Window
namespace.
- */
- @Test
- public void testIntervalWindowPrefixing() {
-StateNamespace window =
-StateNamespaces.window(intervalCoder, intervalWindow(1000, 87392));
-StateNamespace windowAndTrigger = StateNamespaces.windowAndTrigger(
-intervalCoder, intervalWindow(1000, 87392), 57);
-assertThat(windowAndTrigger.stringKey(),
Matchers.startsWith(window.stringKey()));
-assertThat(StateNamespaces.global().stringKey(),
-Matchers.not(Matchers.startsWith(window.stringKey(;
- }
-
- /**
- * Test that WindowAndTrigger namespaces are prefixed by the related Window
namespace.
- */
- @Test
- public void testGlobalWindowPrefixing() {
-StateNamespace window =
-StateNamespaces.window(GlobalWindow.Coder.INSTANCE,
GlobalWindow.INSTANCE);
-StateNamespace windowAndTrigger = StateNamespaces.windowAndTrigger(
-GlobalWindow.Coder.INSTANCE, GlobalWindow.INSTANCE, 57);
-assertThat(windowAndTrigger.stringKey(),
Matchers.startsWith(window.stringKey()));
-assertThat(StateNamespaces.global().stringKey(),
-Matchers.not(Matchers.startsWith(window.stringKey(;
- }
-
- @Test
- public void testFromStringGlobal() {
-assertStringKeyRoundTrips(intervalCoder, StateNamespaces.global());
- }
-
- @Test
- public void testFromStringIntervalWindow() {
-assertStringKeyRoundTrips(
-intervalCoder, StateNamespaces.window(intervalCoder,
intervalWindow(1000, 8000)));
-assertStringKeyRoundTrips(
-
Repository: beam
Updated Branches:
refs/heads/master e0189f352 -> 224e44765
Move TimerInternalsFactory to runners/core-java
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/92b33bc7
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/92b33bc7
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/92b33bc7
Branch: refs/heads/master
Commit: 92b33bc7ec6d7c5c32a49c5750ac1d4381a478ce
Parents: 2e6c131
Author: Kenneth Knowles
Authored: Thu Jan 26 21:13:42 2017 -0800
Committer: Kenneth Knowles
Committed: Mon Feb 6 09:26:06 2017 -0800
--
.../beam/runners/core/SplittableParDo.java | 1 -
.../runners/core/TimerInternalsFactory.java | 36
.../beam/runners/core/SplittableParDoTest.java | 1 -
...littableProcessElementsEvaluatorFactory.java | 2 +-
.../sdk/util/state/TimerInternalsFactory.java | 36
5 files changed, 37 insertions(+), 39 deletions(-)
--
http://git-wip-us.apache.org/repos/asf/beam/blob/92b33bc7/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java
--
diff --git
a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java
b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java
index e414430..7368b2f 100644
---
a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java
+++
b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java
@@ -57,7 +57,6 @@ import org.apache.beam.sdk.util.state.StateNamespace;
import org.apache.beam.sdk.util.state.StateNamespaces;
import org.apache.beam.sdk.util.state.StateTag;
import org.apache.beam.sdk.util.state.StateTags;
-import org.apache.beam.sdk.util.state.TimerInternalsFactory;
import org.apache.beam.sdk.util.state.ValueState;
import org.apache.beam.sdk.util.state.WatermarkHoldState;
import org.apache.beam.sdk.values.KV;
http://git-wip-us.apache.org/repos/asf/beam/blob/92b33bc7/runners/core-java/src/main/java/org/apache/beam/runners/core/TimerInternalsFactory.java
--
diff --git
a/runners/core-java/src/main/java/org/apache/beam/runners/core/TimerInternalsFactory.java
b/runners/core-java/src/main/java/org/apache/beam/runners/core/TimerInternalsFactory.java
new file mode 100644
index 000..e129aed
--- /dev/null
+++
b/runners/core-java/src/main/java/org/apache/beam/runners/core/TimerInternalsFactory.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core;
+
+import java.io.Serializable;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.apache.beam.sdk.util.TimerInternals;
+
+/**
+ * A factory for providing {@link TimerInternals} for a particular key.
+ *
+ * Because it will generally be embedded in a {@link
org.apache.beam.sdk.transforms.DoFn DoFn},
+ * albeit at execution time, it is marked {@link Serializable}.
+ */
+@Experimental(Kind.STATE)
+public interface TimerInternalsFactory {
+
+ /** Returns {@link TimerInternals} for the provided key. */
+ TimerInternals timerInternalsForKey(K key);
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/92b33bc7/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java
--
diff --git
a/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java
b/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java
index b408d37..427e2f4 100644
---
a/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java
+++
b/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java
@@ -55,7 +55,6 @@ import org.apache.beam.sdk.util.SideInputReader;
import
Repository: beam
Updated Branches:
refs/heads/master a1a022d6b -> 26a2c47f4
[BEAM-882,BEAM-883,BEAM-878] Simplified API surface verifications.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/29ffaf38
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/29ffaf38
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/29ffaf38
Branch: refs/heads/master
Commit: 29ffaf3859ba9b4d8ba8529efc96fd5e105e21a3
Parents: a1a022d
Author: Stas Levin
Authored: Mon Jan 16 16:20:25 2017 +0200
Committer: Kenneth Knowles
Committed: Mon Jan 23 13:56:45 2017 -0800
--
.../org/apache/beam/sdk/util/ApiSurface.java| 420 ++-
.../org/apache/beam/SdkCoreApiSurfaceTest.java | 61 +++
.../apache/beam/sdk/util/ApiSurfaceTest.java| 152 ++-
.../apache/beam/sdk/io/gcp/ApiSurfaceTest.java | 134 --
.../beam/sdk/io/gcp/GcpApiSurfaceTest.java | 76
5 files changed, 484 insertions(+), 359 deletions(-)
--
http://git-wip-us.apache.org/repos/asf/beam/blob/29ffaf38/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ApiSurface.java
--
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ApiSurface.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ApiSurface.java
index 2040161..b6b0b32 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ApiSurface.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ApiSurface.java
@@ -17,12 +17,21 @@
*/
package org.apache.beam.sdk.util;
+import static org.hamcrest.Matchers.anyOf;
+
+import com.google.common.base.Function;
import com.google.common.base.Joiner;
+import com.google.common.base.Predicate;
import com.google.common.base.Supplier;
+import com.google.common.collect.FluentIterable;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Multimap;
import com.google.common.collect.Multimaps;
+import com.google.common.collect.Ordering;
import com.google.common.collect.Sets;
import com.google.common.reflect.ClassPath;
import com.google.common.reflect.ClassPath.ClassInfo;
@@ -45,15 +54,20 @@ import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.regex.Pattern;
+import javax.annotation.Nonnull;
+import org.hamcrest.Description;
+import org.hamcrest.Matcher;
+import org.hamcrest.StringDescription;
+import org.hamcrest.TypeSafeDiagnosingMatcher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * Represents the API surface of a package prefix. Used for accessing public
classes,
- * methods, and the types they reference, to control what dependencies are
re-exported.
+ * Represents the API surface of a package prefix. Used for accessing public
classes, methods, and
+ * the types they reference, to control what dependencies are re-exported.
*
- * For the purposes of calculating the public API surface, exposure
includes any public
- * or protected occurrence of:
+ * For the purposes of calculating the public API surface, exposure
includes any public or
+ * protected occurrence of:
*
*
* superclasses
@@ -66,42 +80,272 @@ import org.slf4j.LoggerFactory;
* wildcard bounds
*
*
- * Exposure is a transitive property. The resulting map excludes primitives
- * and array classes themselves.
+ * Exposure is a transitive property. The resulting map excludes primitives
and array classes
+ * themselves.
*
- * It is prudent (though not required) to prune prefixes like "java" via
the builder
- * method {@link #pruningPrefix} to halt the traversal so it does not
uselessly catalog references
- * that are not interesting.
+ * It is prudent (though not required) to prune prefixes like "java" via
the builder method
+ * {@link #pruningPrefix} to halt the traversal so it does not uselessly
catalog references that are
+ * not interesting.
*/
@SuppressWarnings("rawtypes")
public class ApiSurface {
private static final Logger LOG = LoggerFactory.getLogger(ApiSurface.class);
+ /** A factory method to create a {@link Class} matcher for classes residing
in a given package. */
+ public static Matcher classesInPackage(final String packageName) {
+return new Matchers.ClassInPackage(packageName);
+ }
+
+ /**
+ * A factory method to create an {@link ApiSurface} matcher, producing a
positive match if the
+ * queried api surface contains ONLY classes described by the provided
matchers.
+ */
+ public static Matcher containsOnlyClassesMatching(
+ final Set> classMatchers) {
+return
Repository: beam
Updated Branches:
refs/heads/master 18f3767e3 -> 2e766ced5
Upgrade bytebuddy to 1.6.8 to jump past asm 5.0
There is a suspected bug in asm 5.0 that is considered the likely root cause of
a bug sbt-assembly [1] that carried over to Gearpump [2]. This commit upgrades
us to depend on asm 5.2 in which those derivative bugs have cleared up.
I have not found a direct reference to what the issue is, precisely, but
the dependency effect of this is extremely small and these are libraries
that are useful to keep current.
[1] https://github.com/sbt/sbt-assembly/issues/205#issuecomment-279964607
[2] https://issues.apache.org/jira/browse/GEARPUMP-236
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/3e4c05c3
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/3e4c05c3
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/3e4c05c3
Branch: refs/heads/master
Commit: 3e4c05c3449064e7f032a48b98551a73d71a5bbb
Parents: 00ea3f7
Author: Kenneth Knowles
Authored: Wed Feb 15 14:02:38 2017 -0800
Committer: Kenneth Knowles
Committed: Wed Feb 15 14:15:36 2017 -0800
--
pom.xml | 2 +-
.../beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java | 4 ++--
.../sdk/transforms/reflect/ByteBuddyOnTimerInvokerFactory.java | 4 ++--
3 files changed, 5 insertions(+), 5 deletions(-)
--
http://git-wip-us.apache.org/repos/asf/beam/blob/3e4c05c3/pom.xml
--
diff --git a/pom.xml b/pom.xml
index d53502e..cc6de11 100644
--- a/pom.xml
+++ b/pom.xml
@@ -871,7 +871,7 @@
net.bytebuddy
byte-buddy
-1.5.5
+1.6.8
http://git-wip-us.apache.org/repos/asf/beam/blob/3e4c05c3/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java
--
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java
index 46b21d6..8e3a37c 100644
---
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java
+++
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java
@@ -427,7 +427,7 @@ public class ByteBuddyDoFnInvokerFactory implements
DoFnInvokerFactory {
// Push "this" (DoFnInvoker on top of the stack)
MethodVariableAccess.REFERENCE.loadFrom(0),
// Access this.delegate (DoFn on top of the stack)
- FieldAccess.forField(delegateField).getter(),
+ FieldAccess.forField(delegateField).read(),
// Cast it to the more precise type
TypeCasting.to(doFnType),
// Run the beforeDelegation manipulations.
@@ -637,7 +637,7 @@ public class ByteBuddyDoFnInvokerFactory implements
DoFnInvokerFactory {
StackManipulation pushDelegate =
new StackManipulation.Compound(
MethodVariableAccess.REFERENCE.loadFrom(0),
- FieldAccess.forField(delegateField).getter());
+ FieldAccess.forField(delegateField).read());
StackManipulation pushExtraContextFactory =
MethodVariableAccess.REFERENCE.loadFrom(1);
http://git-wip-us.apache.org/repos/asf/beam/blob/3e4c05c3/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyOnTimerInvokerFactory.java
--
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyOnTimerInvokerFactory.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyOnTimerInvokerFactory.java
index 786857a..123808c 100644
---
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyOnTimerInvokerFactory.java
+++
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyOnTimerInvokerFactory.java
@@ -239,7 +239,7 @@ class ByteBuddyOnTimerInvokerFactory implements
OnTimerInvokerFactory {
StackManipulation pushDelegate =
new StackManipulation.Compound(
MethodVariableAccess.REFERENCE.loadFrom(0),
- FieldAccess.forField(delegateField).getter());
+ FieldAccess.forField(delegateField).read());
StackManipulation pushExtraContextFactory =
MethodVariableAccess.REFERENCE.loadFrom(1);
@@ -295,7 +295,7 @@ class ByteBuddyOnTimerInvokerFactory implements
OnTimerInvokerFactory {
Repository: beam
Updated Branches:
refs/heads/master 763fb50b5 -> 9151676a5
Add ability to bundle any message with components in Runner API
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/368ad236
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/368ad236
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/368ad236
Branch: refs/heads/master
Commit: 368ad236ca03bd011d2f207bcc34f5961290c0a0
Parents: cd4e6e4
Author: Kenneth Knowles
Authored: Thu Feb 16 14:07:40 2017 -0800
Committer: Kenneth Knowles
Committed: Thu Feb 16 15:18:24 2017 -0800
--
.../src/main/proto/beam_runner_api.proto| 66
1 file changed, 54 insertions(+), 12 deletions(-)
--
http://git-wip-us.apache.org/repos/asf/beam/blob/368ad236/sdks/common/runner-api/src/main/proto/beam_runner_api.proto
--
diff --git a/sdks/common/runner-api/src/main/proto/beam_runner_api.proto
b/sdks/common/runner-api/src/main/proto/beam_runner_api.proto
index a9133ab..f5dc81d 100644
--- a/sdks/common/runner-api/src/main/proto/beam_runner_api.proto
+++ b/sdks/common/runner-api/src/main/proto/beam_runner_api.proto
@@ -30,17 +30,9 @@ option java_outer_classname = "RunnerApi";
import "google/protobuf/any.proto";
-// A Pipeline is a hierarchical graph of PTransforms, linked
-// by PCollections.
-//
-// This is represented by a number of by-reference maps to nodes,
-// PCollections, SDK environments, UDF, etc., for
-// supporting compact reuse and arbitrary graph structure.
-//
-// All of the keys in the maps here are arbitrary strings that are only
-// required to be internally consistent within this proto message.
-message Pipeline {
-
+// A set of mappings from id to message. This is included as an optional field
+// on any proto message that may contain references needing resolution.
+message Components {
// (Required) A map from pipeline-scoped id to GraphNode
//
// Each node is required to contain a PTransform specification.
@@ -61,9 +53,58 @@ message Pipeline {
// (Required) A map from pipeline-scoped id to FunctionSpec,
// a record for a particular user-defined function.
map function_specs = 6;
+}
+
+// A disjoint union of all the things that may contain references
+// that require Components to resolve.
+message MessageWithComponents {
+
+ // (Optional) The by-reference components of the root message,
+ // enabling a standalone message.
+ //
+ // If this is absent, it is expected that there are no
+ // references.
+ Components components = 1;
+
+ // (Required) The root message that may contain pointers
+ // that should be resolved by looking inside components.
+ oneof root {
+Coder coder = 2;
+CombinePayload combine_payload = 3;
+FunctionSpec function_spec = 4;
+GraphNode graph_node = 5;
+ParDoPayload par_do_payload = 6;
+PTransform ptransform = 7;
+PCollection pcollection = 8;
+ReadPayload read_payload = 9;
+SdkFunctionSpec sdk_function_spec = 10;
+SideInput side_input = 11;
+WindowIntoPayload window_into_payload = 12;
+WindowingStrategy windowing_strategy = 13;
+UrnWithParameter urn_with_parameter = 14;
+ }
+}
+
+// A Pipeline is a hierarchical graph of PTransforms, linked
+// by PCollections.
+//
+// This is represented by a number of by-reference maps to nodes,
+// PCollections, SDK environments, UDF, etc., for
+// supporting compact reuse and arbitrary graph structure.
+//
+// All of the keys in the maps here are arbitrary strings that are only
+// required to be internally consistent within this proto message.
+message Pipeline {
+
+ // (Required) The coders, UDFs, graph nodes, etc, that make up
+ // this pipeline.
+ Components components = 1;
+
+ // (Required) The graph node that is the root of the graph.
+ string root_graph_node = 2;
// (Required) Static display data for the pipeline.
- DisplayData display_data = 7;
+ DisplayData display_data = 3;
}
// A generic node in a bipartite directed hierarchical graph.
@@ -270,6 +311,7 @@ message ReadPayload {
// The payload for the WindowInto transform.
message WindowIntoPayload {
+
// (Required) The pipeline-scoped id for the FunctionSpec of the WindowFn.
string fn_id = 1;
}
Repository: beam
Updated Branches:
refs/heads/master 4f56acbba -> 2c71354d0
Use strings for ids in Fn API
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/9933f271
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/9933f271
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/9933f271
Branch: refs/heads/master
Commit: 9933f27140ddfe5b9ded4a0688a9c0506ef94113
Parents: 4f56acb
Author: Kenneth Knowles
Authored: Sun Feb 12 22:23:32 2017 -0800
Committer: Kenneth Knowles
Committed: Thu Feb 23 17:35:22 2017 -0800
--
.../fn-api/src/main/proto/beam_fn_api.proto | 48 +-
.../fn/harness/control/BeamFnControlClient.java | 3 +-
.../harness/control/ProcessBundleHandler.java | 8 +-
.../fn/harness/control/RegisterHandler.java | 8 +-
.../BeamFnDataBufferingOutboundObserver.java| 4 +-
.../beam/fn/harness/data/BeamFnDataClient.java | 4 +-
.../fn/harness/data/BeamFnDataGrpcClient.java | 4 +-
.../harness/data/BeamFnDataGrpcMultiplexer.java | 11 +--
.../fn/harness/logging/BeamFnLoggingClient.java | 4 +-
.../beam/runners/core/BeamFnDataReadRunner.java | 4 +-
.../runners/core/BeamFnDataWriteRunner.java | 4 +-
.../apache/beam/fn/harness/FnHarnessTest.java | 8 +-
.../control/BeamFnControlClientTest.java| 12 +--
.../control/ProcessBundleHandlerTest.java | 95 ++--
.../fn/harness/control/RegisterHandlerTest.java | 18 ++--
...BeamFnDataBufferingOutboundObserverTest.java | 9 +-
.../harness/data/BeamFnDataGrpcClientTest.java | 21 +++--
.../data/BeamFnDataGrpcMultiplexerTest.java | 12 +--
.../data/BeamFnDataInboundObserverTest.java | 4 +-
.../runners/core/BeamFnDataReadRunnerTest.java | 8 +-
.../runners/core/BeamFnDataWriteRunnerTest.java | 8 +-
21 files changed, 159 insertions(+), 138 deletions(-)
--
http://git-wip-us.apache.org/repos/asf/beam/blob/9933f271/sdks/common/fn-api/src/main/proto/beam_fn_api.proto
--
diff --git a/sdks/common/fn-api/src/main/proto/beam_fn_api.proto
b/sdks/common/fn-api/src/main/proto/beam_fn_api.proto
index 3ac0fbf..80bae2e 100644
--- a/sdks/common/fn-api/src/main/proto/beam_fn_api.proto
+++ b/sdks/common/fn-api/src/main/proto/beam_fn_api.proto
@@ -59,7 +59,7 @@ message Target {
}
// (Required) The id of the PrimitiveTransform which is the target.
- int64 primitive_transform_reference = 1;
+ string primitive_transform_reference = 1;
// (Required) The local name of an input or output defined on the primitive
// transform.
@@ -69,7 +69,7 @@ message Target {
// Information defining a PCollection
message PCollection {
// (Required) A reference to a coder.
- int64 coder_reference = 1;
+ string coder_reference = 1;
// TODO: Windowing strategy, ...
}
@@ -78,7 +78,7 @@ message PCollection {
message PrimitiveTransform {
// (Required) A pipeline level unique id which can be used as a reference to
// refer to this.
- int64 id = 1;
+ string id = 1;
// (Required) A function spec that is used by this primitive
// transform to process data.
@@ -117,7 +117,7 @@ message PrimitiveTransform {
message FunctionSpec {
// (Required) A pipeline level unique id which can be used as a reference to
// refer to this.
- int64 id = 1;
+ string id = 1;
// (Required) A globally unique name representing this user definable
// function.
@@ -131,7 +131,7 @@ message FunctionSpec {
// (Required) Reference to specification of execution environment required to
// invoke this function.
- int64 environment_reference = 3;
+ string environment_reference = 3;
// Data used to parameterize this function. Depending on the urn, this may be
// optional or required.
@@ -179,7 +179,7 @@ message Coder {
//
// TODO: Perhaps this is redundant with the data of the FunctionSpec
// for known coders?
- repeated int64 component_coder_reference = 2;
+ repeated string component_coder_reference = 2;
}
// A descriptor for connecting to a remote port using the Beam Fn Data API.
@@ -218,7 +218,7 @@ service BeamFnControl {
message InstructionRequest {
// (Required) An unique identifier provided by the runner which represents
// this requests execution. The InstructionResponse MUST have the matching
id.
- int64 instruction_id = 1;
+ string instruction_id = 1;
// (Required) A request that the SDK Harness needs to interpret.
oneof request {
@@ -235,7 +235,7 @@ message InstructionResponse {
// (Required) A reference provided by the runner which represents a requests
// execution. The InstructionResponse MUST have the matching id when
// responding to the runner.
- int64 instruction_id = 1;
+ string instruction_id = 1;
// If
Condense FunctionSpec and remove SdkFunctionSpec, merging data and params
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/3120fd9a
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/3120fd9a
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/3120fd9a
Branch: refs/heads/master
Commit: 3120fd9a499b01b08659edc4e045f5abbcc3ae07
Parents: 672d12a
Author: Kenneth Knowles
Authored: Thu Feb 23 18:06:56 2017 -0800
Committer: Kenneth Knowles
Committed: Fri Feb 24 14:38:28 2017 -0800
--
.../src/main/proto/beam_runner_api.proto| 114 ++-
.../beam/sdk/util/WindowingStrategies.java | 110 --
2 files changed, 134 insertions(+), 90 deletions(-)
--
http://git-wip-us.apache.org/repos/asf/beam/blob/3120fd9a/sdks/common/runner-api/src/main/proto/beam_runner_api.proto
--
diff --git a/sdks/common/runner-api/src/main/proto/beam_runner_api.proto
b/sdks/common/runner-api/src/main/proto/beam_runner_api.proto
index 32c53fb..989e4bb 100644
--- a/sdks/common/runner-api/src/main/proto/beam_runner_api.proto
+++ b/sdks/common/runner-api/src/main/proto/beam_runner_api.proto
@@ -74,7 +74,6 @@ message MessageWithComponents {
PTransform ptransform = 7;
PCollection pcollection = 8;
ReadPayload read_payload = 9;
-SdkFunctionSpec sdk_function_spec = 10;
SideInput side_input = 11;
WindowIntoPayload window_into_payload = 12;
WindowingStrategy windowing_strategy = 13;
@@ -100,7 +99,8 @@ message Pipeline {
// (Required) The id of the PTransform that is the root of the pipeline
string root_transform_id = 2;
- // (Required) Static display data for the pipeline.
+ // (Optional) Static display data for the pipeline. If there is none,
+ // it may be omitted.
DisplayData display_data = 3;
}
@@ -116,7 +116,8 @@ message PTransform {
// etc.
//
// If it is not stable, then the runner decides what will happen. But, most
- // importantly, it must always be here, even if it is autogenerated.
+ // importantly, it must always be here and be unique, even if it is
+ // autogenerated.
string unique_name = 5;
// (Optional) A URN and payload that, together, fully defined the semantics
@@ -125,13 +126,11 @@ message PTransform {
// If absent, this must be an "anonymous" composite transform.
//
// For primitive transform in the Runner API, this is required, and the
- // payloads are as follows:
+ // payloads are well-defined messages. When the URN indicates ParDo it
+ // is a ParDoPayload, and so on.
//
- // - when the URN is "urn:beam:transforms:pardo" it is a ParDoPayload
- // - when the URN is "urn:beam:transforms:read" it is a ReadPayload
- // - when the URN is "urn:beam:transforms:gbk" it is a GroupByKeyPayload
- // - when the URN is "urn:beam:transforms:window" it is a WindowPayload
- // - when the URN is "urn:beam:transforms:flatten" it is absent
+ // TODO: document the standardized URNs and payloads
+ // TODO: separate standardized payloads into a separate proto file
//
// For some special composite transforms, the payload is also officially
// defined:
@@ -144,7 +143,9 @@ message PTransform {
// transforms that it contains.
repeated string subtransforms = 2;
- // (Required) A map from local names of inputs to PCollection ids.
+ // (Required) A map from local names of inputs (unique only with this map,
and
+ // likely embedded in the transform payload and serialized user code) to
+ // PCollection ids.
//
// The payload for this transform may clarify the relationship of these
// inputs. For example:
@@ -157,7 +158,9 @@ message PTransform {
//
map inputs = 3;
- // (Required) A map from local names of outputs to PCollection ids.
+ // (Required) A map from local names of outputs (unique only within this map,
+ // and likely embedded in the transform payload and serialized user code)
+ // to PCollection ids.
//
// The URN or payload for this transform node may clarify the type and
// relationship of these outputs. For example:
@@ -167,7 +170,9 @@ message PTransform {
//
map outputs = 4;
- // (Required) Static display data for this PTransform application.
+ // (Optional) Static display data for this PTransform application. If
+ // there is none, or it is not relevant (such as use by the Fn API)
+ // then it may be omitted.
DisplayData display_data = 6;
}
@@ -193,7 +198,9 @@ message PCollection {
// (Required) The id of the windowing strategy for this PCollection.
string windowing_strategy_id = 4;
- // (Required) Static display data for this PCollection.
+ // (Optional) Static display data for this
Repository: beam
Updated Branches:
refs/heads/master 415546eda -> 437ba2505
[BEAM-1554] Add verification of Coder structural equality after enc/dec
This also improves the class documentation.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/2736c4ae
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/2736c4ae
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/2736c4ae
Branch: refs/heads/master
Commit: 2736c4ae901ad428f8e05fb7a80ad72f0318735e
Parents: 415546e
Author: Ismaël MejÃa
Authored: Fri Feb 24 16:46:17 2017 +0100
Committer: Ismaël MejÃa
Committed: Sat Feb 25 04:16:28 2017 +0100
--
.../beam/sdk/testing/CoderProperties.java | 66 ++--
1 file changed, 61 insertions(+), 5 deletions(-)
--
http://git-wip-us.apache.org/repos/asf/beam/blob/2736c4ae/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/CoderProperties.java
--
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/CoderProperties.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/CoderProperties.java
index 910b939..8065505 100644
---
a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/CoderProperties.java
+++
b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/CoderProperties.java
@@ -87,7 +87,6 @@ public class CoderProperties {
public static void coderDeterministicInContext(
Coder coder, Coder.Context context, T value1, T value2)
throws Exception {
-
try {
coder.verifyDeterministic();
} catch (NonDeterministicException e) {
@@ -189,22 +188,34 @@ public class CoderProperties {
}
}
+ /**
+ * Verifies that the given {@code Coder} can be correctly serialized and
+ * deserialized.
+ */
public static void coderSerializable(Coder coder) {
SerializableUtils.ensureSerializable(coder);
}
+ /**
+ * Verifies that for the given {@code Coder} and values of
+ * type {@code T}, the values are equal if and only if the
+ * encoded bytes are equal.
+ */
public static void coderConsistentWithEquals(
Coder coder, T value1, T value2)
throws Exception {
-
for (Coder.Context context : ALL_CONTEXTS) {
CoderProperties.coderConsistentWithEqualsInContext(coder, context,
value1, value2);
}
}
+ /**
+ * Verifies that for the given {@code Coder}, {@code Coder.Context}, and
+ * values of type {@code T}, the values are equal if and only if the
+ * encoded bytes are equal, in any {@code Coder.Context}.
+ */
public static void coderConsistentWithEqualsInContext(
Coder coder, Coder.Context context, T value1, T value2) throws
Exception {
-
assertEquals(
value1.equals(value2),
Arrays.equals(
@@ -212,12 +223,20 @@ public class CoderProperties {
encode(coder, context, value2)));
}
+ /**
+ * Verifies if a {@code Coder}'s encodingId is equal to a given
+ * encodingId.
+ */
public static void coderHasEncodingId(Coder coder, String encodingId)
throws Exception {
assertThat(coder.getEncodingId(), equalTo(encodingId));
assertThat(Structs.getString(coder.asCloudObject(),
PropertyNames.ENCODING_ID, ""),
equalTo(encodingId));
}
+ /**
+ * Verifies if a {@code Coder} is allowed to encode using the given
+ * encodingId.
+ */
public static void coderAllowsEncoding(Coder coder, String
encodingId) throws Exception {
assertThat(coder.getAllowedEncodings(), hasItem(encodingId));
assertThat(
@@ -230,19 +249,27 @@ public class CoderProperties {
hasItem(encodingId));
}
+ /**
+ * Verifies that for the given {@code Coder} and values of
+ * type {@code T}, the structural values are equal if and only if the
+ * encoded bytes are equal.
+ */
public static void structuralValueConsistentWithEquals(
Coder coder, T value1, T value2)
throws Exception {
-
for (Coder.Context context : ALL_CONTEXTS) {
CoderProperties.structuralValueConsistentWithEqualsInContext(
coder, context, value1, value2);
}
}
+ /**
+ * Verifies that for the given {@code Coder}, {@code Coder.Context}, and
+ * values of type {@code T}, the structural values are equal if and only if
the
+ * encoded bytes are equal, in any {@code Coder.Context}.
+ */
public static void structuralValueConsistentWithEqualsInContext(
Coder coder, Coder.Context context, T value1, T value2) throws
Exception {
-
assertEquals(
coder.structuralValue(value1).equals(coder.structuralValue(value2)),
Arrays.equals(
@@ -250,6 +277,35 @@ public class CoderProperties {
encode(coder, context, value2)));
}
+ /**
+
Make SDK-specific serialized blob really a blob
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/380b75e2
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/380b75e2
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/380b75e2
Branch: refs/heads/master
Commit: 380b75e2d0b55eb00cec0c030e5dddc796ec8af5
Parents: 5d6dafa
Author: Kenneth Knowles
Authored: Thu Feb 16 14:45:05 2017 -0800
Committer: Kenneth Knowles
Committed: Tue Feb 21 11:50:52 2017 -0800
--
sdks/common/runner-api/src/main/proto/beam_runner_api.proto | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
--
http://git-wip-us.apache.org/repos/asf/beam/blob/380b75e2/sdks/common/runner-api/src/main/proto/beam_runner_api.proto
--
diff --git a/sdks/common/runner-api/src/main/proto/beam_runner_api.proto
b/sdks/common/runner-api/src/main/proto/beam_runner_api.proto
index f5dc81d..2919580 100644
--- a/sdks/common/runner-api/src/main/proto/beam_runner_api.proto
+++ b/sdks/common/runner-api/src/main/proto/beam_runner_api.proto
@@ -682,7 +682,7 @@ message SdkFunctionSpec {
// (Required) The raw data of the function that the SDK knows how to
// deserialize, but need not be comprehensible to any other runner, SDK, or
// other entity.
- google.protobuf.Any data = 4;
+ bytes data = 4;
}
// TODO: transfer javadoc here
Repository: beam
Updated Branches:
refs/heads/master 3fcc82081 -> 154c54305
Add closing behavior to Runner API proto
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/aac38d60
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/aac38d60
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/aac38d60
Branch: refs/heads/master
Commit: aac38d60c0f76db67095522ea013403642179d9d
Parents: 380b75e
Author: Kenneth Knowles
Authored: Thu Feb 16 20:26:39 2017 -0800
Committer: Kenneth Knowles
Committed: Tue Feb 21 11:50:52 2017 -0800
--
.../src/main/proto/beam_runner_api.proto | 17 -
1 file changed, 16 insertions(+), 1 deletion(-)
--
http://git-wip-us.apache.org/repos/asf/beam/blob/aac38d60/sdks/common/runner-api/src/main/proto/beam_runner_api.proto
--
diff --git a/sdks/common/runner-api/src/main/proto/beam_runner_api.proto
b/sdks/common/runner-api/src/main/proto/beam_runner_api.proto
index 2919580..ce089f5 100644
--- a/sdks/common/runner-api/src/main/proto/beam_runner_api.proto
+++ b/sdks/common/runner-api/src/main/proto/beam_runner_api.proto
@@ -392,9 +392,12 @@ message WindowingStrategy {
// windowing strategy.
OutputTime output_time = 6;
+ // (Required) Indicate when output should be omitted upon window expiration.
+ ClosingBehavior closing_behavior = 7;
+
// (Required) The duration, in milliseconds, beyond the end of a window at
// which the window becomes droppable.
- int64 allowed_lateness = 7;
+ int64 allowed_lateness = 8;
}
// Whether or not a PCollection's WindowFn is non-merging, merging, or
@@ -428,6 +431,18 @@ enum AccumulationMode {
ACCUMULATING = 1;
}
+// Controls whether or not an aggregating transform should output data
+// when a window expires.
+enum ClosingBehavior {
+
+ // Emit output when a window expires, whether or not there has been
+ // any new data since the last output.
+ EMIT_ALWAYS = 0;
+
+ // Only emit output when new data has arrives since the last output
+ EMIT_IF_NONEMPTY = 1;
+}
+
// When a number of windowed, timestamped inputs are aggregated, the timestamp
// for the resulting output.
enum OutputTime {
Repository: beam
Updated Branches:
refs/heads/master 98d8834af -> 49809d1d4
Correct Javadoc on accessing windows in DoFn
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/c1c8d838
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/c1c8d838
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/c1c8d838
Branch: refs/heads/master
Commit: c1c8d8386fc035c65362b5ce36c20b38fe00f9a4
Parents: 0e6b379
Author: Ben Chambers
Authored: Wed Feb 1 15:08:13 2017 -0800
Committer: Kenneth Knowles
Committed: Mon Feb 13 20:44:04 2017 -0800
--
.../src/main/java/org/apache/beam/sdk/transforms/DoFn.java | 6 --
1 file changed, 4 insertions(+), 2 deletions(-)
--
http://git-wip-us.apache.org/repos/asf/beam/blob/c1c8d838/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
--
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
index a161919..1ad05bb 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
@@ -532,8 +532,10 @@ public abstract class DoFn implements
Serializable, HasDisplayD
* href="https://s.apache.org/splittable-do-fn;>splittable {@link
DoFn} subject to the
* separate requirements described below. Items below are assuming this
is not a splittable
* {@link DoFn}.
- * If one of its arguments is {@link BoundedWindow}, this argument
corresponds to the window
- * of the current element. If absent, a runner may perform additional
optimizations.
+ * If one of its arguments is a subtype of {@link BoundedWindow} then it
will
+ * be passed the window of the current element. When applied by {@link
ParDo} the subtype
+ * of {@link BoundedWindow} must match the type of windows on the input
{@link PCollection}.
+ * If the window is not accessed a runner may perform additional
optimizations.
* It must return {@code void}.
*
*
Repository: beam
Updated Branches:
refs/heads/master cd6802bec -> 490ef8f09
Add proto definition for Runner API
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/a5ce3b43
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/a5ce3b43
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/a5ce3b43
Branch: refs/heads/master
Commit: a5ce3b4380377ebbcc75933f8f0f8c3faeefdcf4
Parents: 9ec22f1
Author: Kenneth Knowles
Authored: Tue Feb 7 15:25:32 2017 -0800
Committer: Kenneth Knowles
Committed: Thu Feb 9 14:45:48 2017 -0800
--
sdks/common/pom.xml | 1 +
sdks/common/runner-api/pom.xml | 91 +++
.../src/main/proto/beam_runner_api.proto| 638 +++
3 files changed, 730 insertions(+)
--
http://git-wip-us.apache.org/repos/asf/beam/blob/a5ce3b43/sdks/common/pom.xml
--
diff --git a/sdks/common/pom.xml b/sdks/common/pom.xml
index 8364d9a..55db181 100644
--- a/sdks/common/pom.xml
+++ b/sdks/common/pom.xml
@@ -34,5 +34,6 @@
fn-api
+runner-api
http://git-wip-us.apache.org/repos/asf/beam/blob/a5ce3b43/sdks/common/runner-api/pom.xml
--
diff --git a/sdks/common/runner-api/pom.xml b/sdks/common/runner-api/pom.xml
new file mode 100644
index 000..8eaeb8e
--- /dev/null
+++ b/sdks/common/runner-api/pom.xml
@@ -0,0 +1,91 @@
+
+
+http://maven.apache.org/POM/4.0.0;
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd;>
+ 4.0.0
+
+ jar
+
+org.apache.beam
+beam-sdks-common-parent
+0.6.0-SNAPSHOT
+../pom.xml
+
+
+ beam-sdks-common-runner-api
+ Apache Beam :: SDKs :: Common :: Runner API
+ This artifact generates the stub bindings.
+
+
+
+
+src/main/resources
+true
+
+
+
${project.build.directory}/original_sources_to_package
+
+
+
+
+
+
+org.apache.maven.plugins
+maven-checkstyle-plugin
+
+ true
+
+
+
+
+
+org.codehaus.mojo
+findbugs-maven-plugin
+
+ true
+
+
+
+
+org.xolstice.maven.plugins
+protobuf-maven-plugin
+
+
com.google.protobuf:protoc:${protobuf.version}:exe:${os.detected.classifier}
+ grpc-java
+
io.grpc:protoc-gen-grpc-java:${grpc.version}:exe:${os.detected.classifier}
+
+
+
+
+ compile
+ compile-custom
+
+
+
+
+
+
+
+
+
+ com.google.protobuf
+ protobuf-java
+
+
+
http://git-wip-us.apache.org/repos/asf/beam/blob/a5ce3b43/sdks/common/runner-api/src/main/proto/beam_runner_api.proto
--
diff --git a/sdks/common/runner-api/src/main/proto/beam_runner_api.proto
b/sdks/common/runner-api/src/main/proto/beam_runner_api.proto
new file mode 100644
index 000..195ce01
--- /dev/null
+++ b/sdks/common/runner-api/src/main/proto/beam_runner_api.proto
@@ -0,0 +1,638 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * Protocol Buffers describing the Runner API, which is the runner-independent,
+ * SDK-independent definition of the Beam model.
+ */
+
+syntax = "proto3";
+
+package org.apache.beam.runner_api.v1;
+
+option java_package = "org.apache.beam.sdks.common.runner_api.v1";
+option java_outer_classname = "RunnerApi";
+
+import "google/protobuf/any.proto";
+import "google/protobuf/timestamp.proto";
+
+// A Pipeline is a hierarchical graph of PTransforms, linked
+// by PCollections.
+//
+// This is represented by a number of by-reference maps to nodes,
+// PCollections, SDK environments, UDF, etc., for
+// supporting compact
Repository: beam
Updated Branches:
refs/heads/master ed3ef11fa -> 2596d46a1
Add UsesMapState and UsesSetState JUnit categories
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/674bead8
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/674bead8
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/674bead8
Branch: refs/heads/master
Commit: 674bead8c346421ebfadc3e0b2e13eb79e0280aa
Parents: bea101a
Author: JingsongLi
Authored: Tue Feb 14 14:52:05 2017 +0800
Committer: Kenneth Knowles
Committed: Tue Feb 14 11:02:48 2017 -0800
--
.../apache/beam/sdk/testing/UsesMapState.java | 25
.../apache/beam/sdk/testing/UsesSetState.java | 25
2 files changed, 50 insertions(+)
--
http://git-wip-us.apache.org/repos/asf/beam/blob/674bead8/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesMapState.java
--
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesMapState.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesMapState.java
new file mode 100644
index 000..9bced41
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesMapState.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.testing;
+
+import org.apache.beam.sdk.util.state.MapState;
+
+/**
+ * Category tag for validation tests which utilize {@link MapState}.
+ */
+public interface UsesMapState {}
http://git-wip-us.apache.org/repos/asf/beam/blob/674bead8/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesSetState.java
--
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesSetState.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesSetState.java
new file mode 100644
index 000..6fd74bd
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesSetState.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.testing;
+
+import org.apache.beam.sdk.util.state.SetState;
+
+/**
+ * Category tag for validation tests which utilize {@link SetState}.
+ */
+public interface UsesSetState {}
Flesh out triggers in Runner API proto
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/661cd8d7
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/661cd8d7
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/661cd8d7
Branch: refs/heads/master
Commit: 661cd8d7407d5f414d5d94badacdeadb519107b7
Parents: 67854e6
Author: Kenneth Knowles
Authored: Sat Feb 11 17:32:21 2017 -0800
Committer: Kenneth Knowles
Committed: Tue Feb 14 14:55:49 2017 -0800
--
.../src/main/proto/beam_runner_api.proto| 109 ++-
1 file changed, 83 insertions(+), 26 deletions(-)
--
http://git-wip-us.apache.org/repos/asf/beam/blob/661cd8d7/sdks/common/runner-api/src/main/proto/beam_runner_api.proto
--
diff --git a/sdks/common/runner-api/src/main/proto/beam_runner_api.proto
b/sdks/common/runner-api/src/main/proto/beam_runner_api.proto
index 370b57c..91f1558 100644
--- a/sdks/common/runner-api/src/main/proto/beam_runner_api.proto
+++ b/sdks/common/runner-api/src/main/proto/beam_runner_api.proto
@@ -29,7 +29,6 @@ option java_package =
"org.apache.beam.sdk.common.runner_api.v1";
option java_outer_classname = "RunnerApi";
import "google/protobuf/any.proto";
-import "google/protobuf/timestamp.proto";
// A Pipeline is a hierarchical graph of PTransforms, linked
// by PCollections.
@@ -402,6 +401,24 @@ enum OutputTime {
EARLIEST_IN_PANE = 2;
}
+// The different time domains in the Beam model.
+enum TimeDomain {
+
+ // Event time is time from the perspective of the data
+ EVENT_TIME = 0;
+
+ // Processing time is time from the perspective of the
+ // execution of your pipeline
+ PROCESSING_TIME = 1;
+
+ // Synchronized processing time is the minimum of the
+ // processing time of all pending elements.
+ //
+ // The "processing time" of an element refers to
+ // the local processing time at which it was emitted
+ SYNCHRONIZED_PROCESSING_TIME = 2;
+}
+
// A small DSL for expressing when to emit new aggregations
// from a GroupByKey or CombinePerKey
//
@@ -439,27 +456,31 @@ message Trigger {
}
// After input arrives, ready when the specified delay has passed.
- message AfterProcessingTimeDelay {
-// (Required) The delay, in milliseconds.
-int64 delay_millis = 1;
+ message AfterProcessingTime {
+
+// (Required) The transforms to apply to an arriving element's timestamp,
+// in order
+repeated TimestampTransform timestamp_transforms = 1;
}
- // After input arrives, ready when the synchronized processing time
- // progresses as far as the given delay.
- message AfterSynchronizedProcessingTimeDelay {
-// (Required) The delay, in milliseconds.
-int64 delay_millis = 1;
+ // Ready whenever upstream processing time has all caught up with
+ // the arrival time of an input element
+ message AfterSynchronizedProcessingTime {
+ }
+
+ // The default trigger. Equivalent to Repeat { AfterEndOfWindow } but
+ // specially denoted to indicate the user did not alter the triggering.
+ message Default {
+ }
+
+ // Ready whenever the requisite number of input elements have arrived
+ message ElementCount {
+int32 element_count = 1;
}
// Never ready. There will only be an ON_TIME output and a final
// output at window expiration.
- message Never { }
-
- // Ready whenever the subtrigger is ready; resets state when the subtrigger
- // completes.
- message Repeat {
-// (Require) Trigger that is run repeatedly.
-Trigger subtrigger = 1;
+ message Never {
}
// Ready whenever either of its subtriggers are ready, but finishes output
@@ -473,9 +494,12 @@ message Trigger {
Trigger finally = 2;
}
- // The default trigger. Equivalent to Repeat { AfterEndOfWindow } but
- // specially denoted to indicate the user did not alter the triggering.
- message Default { }
+ // Ready whenever the subtrigger is ready; resets state when the subtrigger
+ // completes.
+ message Repeat {
+// (Require) Trigger that is run repeatedly.
+Trigger subtrigger = 1;
+ }
// The full disjoint union of possible triggers.
oneof trigger {
@@ -483,12 +507,39 @@ message Trigger {
AfterAny after_any = 2;
AfterEach after_each = 3;
AfterEndOfWindow after_end_of_widow = 4;
-AfterProcessingTimeDelay after_processing_time_delay = 5;
-AfterSynchronizedProcessingTimeDelay
after_synchronized_processing_time_delay = 6;
-Never never = 7;
-Repeat repeat = 8;
-OrFinally or_finally = 9;
-Default default = 10;
+AfterProcessingTime after_processing_time = 5;
+AfterSynchronizedProcessingTime after_synchronized_processing_time = 6;
+Default default = 7;
+ElementCount element_count = 8;
+
Repository: beam-site
Updated Branches:
refs/heads/asf-site 00e375003 -> f3c189568
Fix typo in stateful processing post
Project: http://git-wip-us.apache.org/repos/asf/beam-site/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam-site/commit/47b1c5cd
Tree: http://git-wip-us.apache.org/repos/asf/beam-site/tree/47b1c5cd
Diff: http://git-wip-us.apache.org/repos/asf/beam-site/diff/47b1c5cd
Branch: refs/heads/asf-site
Commit: 47b1c5cdaef819bc82fb6ec069a3a5d1c3e9165c
Parents: 00e3750
Author: Kenneth Knowles
Authored: Tue Feb 14 21:13:48 2017 -0800
Committer: Kenneth Knowles
Committed: Tue Feb 14 21:13:48 2017 -0800
--
src/_posts/2017-02-13-stateful-processing.md | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
--
http://git-wip-us.apache.org/repos/asf/beam-site/blob/47b1c5cd/src/_posts/2017-02-13-stateful-processing.md
--
diff --git a/src/_posts/2017-02-13-stateful-processing.md
b/src/_posts/2017-02-13-stateful-processing.md
index 28aee12..b00361a 100644
--- a/src/_posts/2017-02-13-stateful-processing.md
+++ b/src/_posts/2017-02-13-stateful-processing.md
@@ -25,7 +25,7 @@ what you might use it for, and what it looks like in code.
First, a quick recap: In Beam, a big data processing _pipeline_ is a directed,
acyclic graph of parallel operations called _`PTransforms`_ processing data
-from _`PCollections`_ I'll expand on that by walking through this illustration:
+from _`PCollections`_. I'll expand on that by walking through this
illustration:
Regenerate site
Project: http://git-wip-us.apache.org/repos/asf/beam-site/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam-site/commit/76021685
Tree: http://git-wip-us.apache.org/repos/asf/beam-site/tree/76021685
Diff: http://git-wip-us.apache.org/repos/asf/beam-site/diff/76021685
Branch: refs/heads/asf-site
Commit: 7602168530feee8aa6eedfd20f5549617b4545e6
Parents: 47b1c5c
Author: Kenneth Knowles
Authored: Tue Feb 14 21:29:15 2017 -0800
Committer: Kenneth Knowles
Committed: Tue Feb 14 21:29:15 2017 -0800
--
content/blog/2017/02/13/stateful-processing.html | 2 +-
content/feed.xml | 2 +-
2 files changed, 2 insertions(+), 2 deletions(-)
--
http://git-wip-us.apache.org/repos/asf/beam-site/blob/76021685/content/blog/2017/02/13/stateful-processing.html
--
diff --git a/content/blog/2017/02/13/stateful-processing.html
b/content/blog/2017/02/13/stateful-processing.html
index dcf077a..833b2fd 100644
--- a/content/blog/2017/02/13/stateful-processing.html
+++ b/content/blog/2017/02/13/stateful-processing.html
@@ -175,7 +175,7 @@ the current status in each runner.
First, a quick recap: In Beam, a big data processing pipeline is a
directed,
acyclic graph of parallel operations called PTransforms processing data
-from PCollections Iâll
expand on that by walking through this illustration:
+from PCollections. Iâll
expand on that by walking through this illustration:
http://git-wip-us.apache.org/repos/asf/beam-site/blob/76021685/content/feed.xml
--
diff --git a/content/feed.xml b/content/feed.xml
index 10bccbd..726cdd0 100644
--- a/content/feed.xml
+++ b/content/feed.xml
@@ -29,7 +29,7 @@ the current status in each runner./p
pFirst, a quick recap: In Beam, a big data processing
empipeline/em is a directed,
acyclic graph of parallel operations called emcode
class=highlighter-rougePTransforms/code/em
processing data
-from emcode
class=highlighter-rougePCollections/code/em
Iâll expand on that by walking through this illustration:/p
+from emcode
class=highlighter-rougePCollections/code/em.
Iâll expand on that by walking through this illustration:/p
pimg class=center-block
src=/images/blog/stateful-processing/pipeline.png alt=A Beam
Pipeline - PTransforms are boxes - PCollections are arrows
width=300 //p
Touch up some punctuation in state blog post
Project: http://git-wip-us.apache.org/repos/asf/beam-site/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam-site/commit/f928b50e
Tree: http://git-wip-us.apache.org/repos/asf/beam-site/tree/f928b50e
Diff: http://git-wip-us.apache.org/repos/asf/beam-site/diff/f928b50e
Branch: refs/heads/asf-site
Commit: f928b50e1594cba453f07aa608ab28afb6329aff
Parents: 8518baa
Author: Kenneth Knowles
Authored: Wed Feb 15 10:08:31 2017 -0800
Committer: Kenneth Knowles
Committed: Wed Feb 15 10:08:31 2017 -0800
--
src/_posts/2017-02-13-stateful-processing.md | 7 ---
1 file changed, 4 insertions(+), 3 deletions(-)
--
http://git-wip-us.apache.org/repos/asf/beam-site/blob/f928b50e/src/_posts/2017-02-13-stateful-processing.md
--
diff --git a/src/_posts/2017-02-13-stateful-processing.md
b/src/_posts/2017-02-13-stateful-processing.md
index b3e0c63..fbbe76b 100644
--- a/src/_posts/2017-02-13-stateful-processing.md
+++ b/src/_posts/2017-02-13-stateful-processing.md
@@ -238,11 +238,12 @@ key+window pairs, like this:
keys and windows are independent dimensions)
You can provide the opportunity for parallelism by making sure that table has
-enough columns, either via:
+enough columns. You might have many keys and many windows, or you might have
+many of just one or the other:
-- Many keys in few windows for example, a globally windowed stateful
computation
+- Many keys in few windows, for example a globally windowed stateful
computation
keyed by user ID.
-- Many windows over few keys for example, a fixed windowed stateful computation
+- Many windows over few keys, for example a fixed windowed stateful computation
over a global key.
Caveat: all Beam runners today parallelize only over the key.
Repository: beam-site
Updated Branches:
refs/heads/asf-site f3c189568 -> f48e97f67
Fix some typos and small formatting issues.
Project: http://git-wip-us.apache.org/repos/asf/beam-site/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam-site/commit/8518baa7
Tree: http://git-wip-us.apache.org/repos/asf/beam-site/tree/8518baa7
Diff: http://git-wip-us.apache.org/repos/asf/beam-site/diff/8518baa7
Branch: refs/heads/asf-site
Commit: 8518baa70e1ec0dc609dbd36889e246752bc995e
Parents: f3c1895
Author: Ismaël MejÃa
Authored: Wed Feb 15 17:45:25 2017 +0100
Committer: Ismaël MejÃa
Committed: Wed Feb 15 17:45:25 2017 +0100
--
src/_posts/2017-02-13-stateful-processing.md | 14 +-
1 file changed, 9 insertions(+), 5 deletions(-)
--
http://git-wip-us.apache.org/repos/asf/beam-site/blob/8518baa7/src/_posts/2017-02-13-stateful-processing.md
--
diff --git a/src/_posts/2017-02-13-stateful-processing.md
b/src/_posts/2017-02-13-stateful-processing.md
index b00361a..b3e0c63 100644
--- a/src/_posts/2017-02-13-stateful-processing.md
+++ b/src/_posts/2017-02-13-stateful-processing.md
@@ -196,7 +196,7 @@ want to write a transform that maps input to output like
this:
+width="180">
The order of the elements A, B, C, D, E is arbitrary, hence their assigned
indices are arbitrary, but downstream transforms just need to be OK with this.
@@ -238,9 +238,13 @@ key+window pairs, like this:
keys and windows are independent dimensions)
You can provide the opportunity for parallelism by making sure that table has
-enough columns, either via many keys in few windows - for example, a globally
-windowed stateful computation keyed by user ID - or via many windows over few
-keys - for example, a fixed windowed stateful computation over a global key.
+enough columns, either via:
+
+- Many keys in few windows for example, a globally windowed stateful
computation
+ keyed by user ID.
+- Many windows over few keys for example, a fixed windowed stateful computation
+ over a global key.
+
Caveat: all Beam runners today parallelize only over the key.
Most often your mental model of state can be focused on only a single column of
@@ -444,7 +448,7 @@ outputs from the `ParDo` that will be processed downstream.
If the
output, then you cannot use a `Filter` transform to reduce data volume
downstream.
Stateful processing lets you address both the latency problem of side inputs
-and the cost problem of excessive uninterseting output. Here is the code, using
+and the cost problem of excessive uninteresting output. Here is the code, using
only features I have already introduced:
```java