Repository: incubator-beam
Updated Branches:
  refs/heads/master 861562239 -> 442435ed0


Remove InProcessBundle

InProcessBundle is an implementation detail of InProcessBundleFactory,
which it should be contained within.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/3987a05b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/3987a05b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/3987a05b

Branch: refs/heads/master
Commit: 3987a05b83de46823efde2742d585cb5b7b98cc3
Parents: 8615622
Author: Thomas Groh <tg...@google.com>
Authored: Mon Apr 18 12:55:26 2016 -0700
Committer: bchambers <bchamb...@google.com>
Committed: Tue Apr 19 08:49:39 2016 -0700

----------------------------------------------------------------------
 .../sdk/runners/inprocess/InProcessBundle.java  | 124 ----------------
 .../EncodabilityEnforcementFactoryTest.java     |  11 +-
 .../ImmutabilityEnforcementFactoryTest.java     |   8 +-
 .../inprocess/InMemoryWatermarkManagerTest.java |   2 +-
 .../runners/inprocess/InProcessBundleTest.java  | 145 -------------------
 .../InProcessEvaluationContextTest.java         |  26 +---
 .../inprocess/TransformExecutorTest.java        |   6 +-
 7 files changed, 23 insertions(+), 299 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3987a05b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/InProcessBundle.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/InProcessBundle.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/InProcessBundle.java
deleted file mode 100644
index d3da9e1..0000000
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/InProcessBundle.java
+++ /dev/null
@@ -1,124 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.runners.inprocess;
-
-import static com.google.common.base.Preconditions.checkState;
-
-import 
org.apache.beam.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle;
-import 
org.apache.beam.sdk.runners.inprocess.InProcessPipelineRunner.UncommittedBundle;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.values.PCollection;
-
-import com.google.common.base.MoreObjects;
-import com.google.common.collect.ImmutableList;
-
-import org.joda.time.Instant;
-
-import javax.annotation.Nullable;
-
-/**
- * A {@link UncommittedBundle} that buffers elements in memory.
- */
-public final class InProcessBundle<T> implements UncommittedBundle<T> {
-  private final PCollection<T> pcollection;
-  private final boolean keyed;
-  private final Object key;
-  private boolean committed = false;
-  private ImmutableList.Builder<WindowedValue<T>> elements;
-
-  /**
-   * Create a new {@link InProcessBundle} for the specified {@link 
PCollection} without a key.
-   */
-  public static <T> InProcessBundle<T> unkeyed(PCollection<T> pcollection) {
-    return new InProcessBundle<T>(pcollection, false, null);
-  }
-
-  /**
-   * Create a new {@link InProcessBundle} for the specified {@link 
PCollection} with the specified
-   * key.
-   *
-   * See {@link CommittedBundle#getKey()} and {@link 
CommittedBundle#isKeyed()} for more
-   * information.
-   */
-  public static <T> InProcessBundle<T> keyed(PCollection<T> pcollection, 
Object key) {
-    return new InProcessBundle<T>(pcollection, true, key);
-  }
-
-  private InProcessBundle(PCollection<T> pcollection, boolean keyed, Object 
key) {
-    this.pcollection = pcollection;
-    this.keyed = keyed;
-    this.key = key;
-    this.elements = ImmutableList.builder();
-  }
-
-  @Override
-  public PCollection<T> getPCollection() {
-    return pcollection;
-  }
-
-  @Override
-  public InProcessBundle<T> add(WindowedValue<T> element) {
-    checkState(!committed, "Can't add element %s to committed bundle %s", 
element, this);
-    elements.add(element);
-    return this;
-  }
-
-  @Override
-  public CommittedBundle<T> commit(final Instant synchronizedCompletionTime) {
-    checkState(!committed, "Can't commit already committed bundle %s", this);
-    committed = true;
-    final Iterable<WindowedValue<T>> committedElements = elements.build();
-    return new CommittedBundle<T>() {
-      @Override
-      @Nullable
-      public Object getKey() {
-        return key;
-      }
-
-      @Override
-      public boolean isKeyed() {
-        return keyed;
-      }
-
-      @Override
-      public Iterable<WindowedValue<T>> getElements() {
-        return committedElements;
-      }
-
-      @Override
-      public PCollection<T> getPCollection() {
-        return pcollection;
-      }
-
-      @Override
-      public Instant getSynchronizedProcessingOutputWatermark() {
-        return synchronizedCompletionTime;
-      }
-
-      @Override
-      public String toString() {
-        return MoreObjects.toStringHelper(this)
-            .omitNullValues()
-            .add("pcollection", pcollection)
-            .add("key", key)
-            .add("elements", committedElements)
-            .toString();
-      }
-    };
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3987a05b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/EncodabilityEnforcementFactoryTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/EncodabilityEnforcementFactoryTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/EncodabilityEnforcementFactoryTest.java
index 7720589..85c4322 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/EncodabilityEnforcementFactoryTest.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/EncodabilityEnforcementFactoryTest.java
@@ -50,6 +50,7 @@ import java.util.Collections;
 public class EncodabilityEnforcementFactoryTest {
   @Rule public ExpectedException thrown = ExpectedException.none();
   private EncodabilityEnforcementFactory factory = 
EncodabilityEnforcementFactory.create();
+  private BundleFactory bundleFactory = InProcessBundleFactory.create();
 
   @Test
   public void encodeFailsThrows() {
@@ -61,7 +62,7 @@ public class EncodabilityEnforcementFactoryTest {
 
     WindowedValue<Record> record = WindowedValue.valueInGlobalWindow(new 
Record());
     CommittedBundle<Record> input =
-        InProcessBundle.unkeyed(unencodable).add(record).commit(Instant.now());
+        
bundleFactory.createRootBundle(unencodable).add(record).commit(Instant.now());
     ModelEnforcement<Record> enforcement = factory.forBundle(input, consumer);
 
     thrown.expect(UserCodeException.class);
@@ -80,7 +81,7 @@ public class EncodabilityEnforcementFactoryTest {
     WindowedValue<Record> record = WindowedValue.valueInGlobalWindow(new 
Record());
 
     CommittedBundle<Record> input =
-        InProcessBundle.unkeyed(unencodable).add(record).commit(Instant.now());
+        
bundleFactory.createRootBundle(unencodable).add(record).commit(Instant.now());
     ModelEnforcement<Record> enforcement = factory.forBundle(input, consumer);
 
     thrown.expect(UserCodeException.class);
@@ -107,7 +108,7 @@ public class EncodabilityEnforcementFactoryTest {
             });
 
     CommittedBundle<Record> input =
-        InProcessBundle.unkeyed(unencodable).add(record).commit(Instant.now());
+        
bundleFactory.createRootBundle(unencodable).add(record).commit(Instant.now());
     ModelEnforcement<Record> enforcement = factory.forBundle(input, consumer);
 
     thrown.expect(UserCodeException.class);
@@ -131,7 +132,7 @@ public class EncodabilityEnforcementFactoryTest {
     WindowedValue<Record> record = 
WindowedValue.<Record>valueInGlobalWindow(new Record());
 
     CommittedBundle<Record> input =
-        InProcessBundle.unkeyed(unencodable).add(record).commit(Instant.now());
+        
bundleFactory.createRootBundle(unencodable).add(record).commit(Instant.now());
     ModelEnforcement<Record> enforcement = factory.forBundle(input, consumer);
 
     enforcement.beforeElement(record);
@@ -152,7 +153,7 @@ public class EncodabilityEnforcementFactoryTest {
     WindowedValue<Integer> value = WindowedValue.valueInGlobalWindow(1);
 
     CommittedBundle<Integer> input =
-        InProcessBundle.unkeyed(unencodable).add(value).commit(Instant.now());
+        
bundleFactory.createRootBundle(unencodable).add(value).commit(Instant.now());
     ModelEnforcement<Integer> enforcement = factory.forBundle(input, consumer);
 
     enforcement.beforeElement(value);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3987a05b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/ImmutabilityEnforcementFactoryTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/ImmutabilityEnforcementFactoryTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/ImmutabilityEnforcementFactoryTest.java
index 4520504..16633ed 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/ImmutabilityEnforcementFactoryTest.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/ImmutabilityEnforcementFactoryTest.java
@@ -46,12 +46,14 @@ import java.util.Collections;
 public class ImmutabilityEnforcementFactoryTest implements Serializable {
   @Rule public transient ExpectedException thrown = ExpectedException.none();
   private transient ImmutabilityEnforcementFactory factory;
+  private transient BundleFactory bundleFactory;
   private transient PCollection<byte[]> pcollection;
   private transient AppliedPTransform<?, ?, ?> consumer;
 
   @Before
   public void setup() {
     factory = new ImmutabilityEnforcementFactory();
+    bundleFactory = InProcessBundleFactory.create();
     TestPipeline p = TestPipeline.create();
     pcollection =
         p.apply(Create.of("foo".getBytes(), "spamhameggs".getBytes()))
@@ -71,7 +73,7 @@ public class ImmutabilityEnforcementFactoryTest implements 
Serializable {
   public void unchangedSucceeds() {
     WindowedValue<byte[]> element = 
WindowedValue.valueInGlobalWindow("bar".getBytes());
     CommittedBundle<byte[]> elements =
-        
InProcessBundle.unkeyed(pcollection).add(element).commit(Instant.now());
+        
bundleFactory.createRootBundle(pcollection).add(element).commit(Instant.now());
 
     ModelEnforcement<byte[]> enforcement = factory.forBundle(elements, 
consumer);
     enforcement.beforeElement(element);
@@ -86,7 +88,7 @@ public class ImmutabilityEnforcementFactoryTest implements 
Serializable {
   public void mutatedDuringProcessElementThrows() {
     WindowedValue<byte[]> element = 
WindowedValue.valueInGlobalWindow("bar".getBytes());
     CommittedBundle<byte[]> elements =
-        
InProcessBundle.unkeyed(pcollection).add(element).commit(Instant.now());
+        
bundleFactory.createRootBundle(pcollection).add(element).commit(Instant.now());
 
     ModelEnforcement<byte[]> enforcement = factory.forBundle(elements, 
consumer);
     enforcement.beforeElement(element);
@@ -107,7 +109,7 @@ public class ImmutabilityEnforcementFactoryTest implements 
Serializable {
 
     WindowedValue<byte[]> element = 
WindowedValue.valueInGlobalWindow("bar".getBytes());
     CommittedBundle<byte[]> elements =
-        
InProcessBundle.unkeyed(pcollection).add(element).commit(Instant.now());
+        
bundleFactory.createRootBundle(pcollection).add(element).commit(Instant.now());
 
     ModelEnforcement<byte[]> enforcement = factory.forBundle(elements, 
consumer);
     enforcement.beforeElement(element);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3987a05b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/InMemoryWatermarkManagerTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/InMemoryWatermarkManagerTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/InMemoryWatermarkManagerTest.java
index 077c0e7..736076c 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/InMemoryWatermarkManagerTest.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/InMemoryWatermarkManagerTest.java
@@ -798,7 +798,7 @@ public class InMemoryWatermarkManagerTest implements 
Serializable {
 
     Instant upstreamHold = new Instant(2048L);
     CommittedBundle<Integer> filteredBundle =
-        bundleFactory.createKeyedBundle(null, "key", 
filtered).commit(upstreamHold);
+        bundleFactory.createKeyedBundle(created, "key", 
filtered).commit(upstreamHold);
     manager.updateWatermarks(
         created,
         filtered.getProducingTransformInternal(),

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3987a05b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/InProcessBundleTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/InProcessBundleTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/InProcessBundleTest.java
deleted file mode 100644
index 103ace5..0000000
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/InProcessBundleTest.java
+++ /dev/null
@@ -1,145 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.runners.inprocess;
-
-import static org.hamcrest.Matchers.containsInAnyOrder;
-import static org.hamcrest.Matchers.equalTo;
-import static org.hamcrest.Matchers.is;
-import static org.hamcrest.Matchers.nullValue;
-import static org.junit.Assert.assertThat;
-
-import 
org.apache.beam.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle;
-import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.values.PCollection;
-
-import com.google.common.collect.ImmutableList;
-
-import org.hamcrest.Matcher;
-import org.hamcrest.Matchers;
-import org.joda.time.Instant;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-
-/**
- * Tests for {@link InProcessBundle}.
- */
-@RunWith(JUnit4.class)
-public class InProcessBundleTest {
-  @Rule
-  public ExpectedException thrown = ExpectedException.none();
-
-  @Test
-  public void unkeyedShouldCreateWithNullKey() {
-    PCollection<Integer> pcollection = 
TestPipeline.create().apply(Create.of(1));
-
-    InProcessBundle<Integer> inFlightBundle = 
InProcessBundle.unkeyed(pcollection);
-
-    CommittedBundle<Integer> bundle = inFlightBundle.commit(Instant.now());
-
-    assertThat(bundle.isKeyed(), is(false));
-    assertThat(bundle.getKey(), nullValue());
-  }
-
-  private void keyedCreateBundle(Object key) {
-    PCollection<Integer> pcollection = 
TestPipeline.create().apply(Create.of(1));
-
-    InProcessBundle<Integer> inFlightBundle = 
InProcessBundle.keyed(pcollection, key);
-
-    CommittedBundle<Integer> bundle = inFlightBundle.commit(Instant.now());
-    assertThat(bundle.isKeyed(), is(true));
-    assertThat(bundle.getKey(), equalTo(key));
-  }
-
-  @Test
-  public void keyedWithNullKeyShouldCreateKeyedBundle() {
-    keyedCreateBundle(null);
-  }
-
-  @Test
-  public void keyedWithKeyShouldCreateKeyedBundle() {
-    keyedCreateBundle(new Object());
-  }
-
-  private <T> void 
afterCommitGetElementsShouldHaveAddedElements(Iterable<WindowedValue<T>> elems) 
{
-    PCollection<T> pcollection = TestPipeline.create().apply(Create.<T>of());
-
-    InProcessBundle<T> bundle = InProcessBundle.unkeyed(pcollection);
-    Collection<Matcher<? super WindowedValue<T>>> expectations = new 
ArrayList<>();
-    for (WindowedValue<T> elem : elems) {
-      bundle.add(elem);
-      expectations.add(equalTo(elem));
-    }
-    Matcher<Iterable<? extends WindowedValue<T>>> containsMatcher =
-        Matchers.<WindowedValue<T>>containsInAnyOrder(expectations);
-    assertThat(bundle.commit(Instant.now()).getElements(), containsMatcher);
-  }
-
-  @Test
-  public void getElementsBeforeAddShouldReturnEmptyIterable() {
-    
afterCommitGetElementsShouldHaveAddedElements(Collections.<WindowedValue<Integer>>emptyList());
-  }
-
-  @Test
-  public void getElementsAfterAddShouldReturnAddedElements() {
-    WindowedValue<Integer> firstValue = WindowedValue.valueInGlobalWindow(1);
-    WindowedValue<Integer> secondValue =
-        WindowedValue.timestampedValueInGlobalWindow(2, new Instant(1000L));
-
-    afterCommitGetElementsShouldHaveAddedElements(ImmutableList.of(firstValue, 
secondValue));
-  }
-
-  @Test
-  public void addAfterCommitShouldThrowException() {
-    PCollection<Integer> pcollection = 
TestPipeline.create().apply(Create.<Integer>of());
-
-    InProcessBundle<Integer> bundle = InProcessBundle.unkeyed(pcollection);
-    bundle.add(WindowedValue.valueInGlobalWindow(1));
-    CommittedBundle<Integer> firstCommit = bundle.commit(Instant.now());
-    assertThat(firstCommit.getElements(), 
containsInAnyOrder(WindowedValue.valueInGlobalWindow(1)));
-
-    thrown.expect(IllegalStateException.class);
-    thrown.expectMessage("3");
-    thrown.expectMessage("committed");
-
-    bundle.add(WindowedValue.valueInGlobalWindow(3));
-  }
-
-  @Test
-  public void commitAfterCommitShouldThrowException() {
-    PCollection<Integer> pcollection = 
TestPipeline.create().apply(Create.<Integer>of());
-
-    InProcessBundle<Integer> bundle = InProcessBundle.unkeyed(pcollection);
-    bundle.add(WindowedValue.valueInGlobalWindow(1));
-    CommittedBundle<Integer> firstCommit = bundle.commit(Instant.now());
-    assertThat(firstCommit.getElements(), 
containsInAnyOrder(WindowedValue.valueInGlobalWindow(1)));
-
-    thrown.expect(IllegalStateException.class);
-    thrown.expectMessage("committed");
-
-    bundle.commit(Instant.now());
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3987a05b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/InProcessEvaluationContextTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/InProcessEvaluationContextTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/InProcessEvaluationContextTest.java
index 6736562..50b83fd 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/InProcessEvaluationContextTest.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/InProcessEvaluationContextTest.java
@@ -93,6 +93,8 @@ public class InProcessEvaluationContextTest {
   private Collection<AppliedPTransform<?, ?, ?>> rootTransforms;
   private Map<PValue, Collection<AppliedPTransform<?, ?, ?>>> valueToConsumers;
 
+  private BundleFactory bundleFactory;
+
   @Before
   public void setup() {
     InProcessPipelineRunner runner =
@@ -110,6 +112,8 @@ public class InProcessEvaluationContextTest {
     rootTransforms = cVis.getRootTransforms();
     valueToConsumers = cVis.getValueToConsumers();
 
+    bundleFactory = InProcessBundleFactory.create();
+
     context =
         InProcessEvaluationContext.create(
             runner.getPipelineOptions(),
@@ -393,37 +397,23 @@ public class InProcessEvaluationContextTest {
   }
 
   @Test
-  public void createBundleUnkeyedResultUnkeyed() {
-    CommittedBundle<KV<String, Integer>> newBundle =
-        context
-            
.createBundle(InProcessBundle.unkeyed(created).commit(Instant.now()), 
downstream)
-            .commit(Instant.now());
-    assertThat(newBundle.isKeyed(), is(false));
-  }
-
-  @Test
   public void createBundleKeyedResultPropagatesKey() {
     CommittedBundle<KV<String, Integer>> newBundle =
         context
-            .createBundle(InProcessBundle.keyed(created, 
"foo").commit(Instant.now()), downstream)
+            .createBundle(
+                bundleFactory.createKeyedBundle(null, "foo", 
created).commit(Instant.now()),
+                downstream)
             .commit(Instant.now());
-    assertThat(newBundle.isKeyed(), is(true));
     assertThat(newBundle.getKey(), Matchers.<Object>equalTo("foo"));
   }
 
   @Test
-  public void createRootBundleUnkeyed() {
-    
assertThat(context.createRootBundle(created).commit(Instant.now()).isKeyed(), 
is(false));
-  }
-
-  @Test
   public void createKeyedBundleKeyed() {
     CommittedBundle<KV<String, Integer>> keyedBundle =
         context
             .createKeyedBundle(
-                InProcessBundle.unkeyed(created).commit(Instant.now()), "foo", 
downstream)
+                bundleFactory.createRootBundle(created).commit(Instant.now()), 
"foo", downstream)
             .commit(Instant.now());
-    assertThat(keyedBundle.isKeyed(), is(true));
     assertThat(keyedBundle.getKey(), Matchers.<Object>equalTo("foo"));
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3987a05b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/TransformExecutorTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/TransformExecutorTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/TransformExecutorTest.java
index d29ffac..d3d70e0 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/TransformExecutorTest.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/TransformExecutorTest.java
@@ -332,7 +332,7 @@ public class TransformExecutorTest {
     WindowedValue<String> fooElem = WindowedValue.valueInGlobalWindow("foo");
     WindowedValue<String> barElem = WindowedValue.valueInGlobalWindow("bar");
     CommittedBundle<String> inputBundle =
-        
InProcessBundle.unkeyed(created).add(fooElem).add(barElem).commit(Instant.now());
+        
bundleFactory.createRootBundle(created).add(fooElem).add(barElem).commit(Instant.now());
     when(
             registry.forApplication(
                 downstream.getProducingTransformInternal(), inputBundle, 
evaluationContext))
@@ -393,7 +393,7 @@ public class TransformExecutorTest {
 
     WindowedValue<byte[]> fooBytes = 
WindowedValue.valueInGlobalWindow("foo".getBytes());
     CommittedBundle<byte[]> inputBundle =
-        InProcessBundle.unkeyed(pcBytes).add(fooBytes).commit(Instant.now());
+        
bundleFactory.createRootBundle(pcBytes).add(fooBytes).commit(Instant.now());
     when(
             registry.forApplication(
                 pcBytes.getProducingTransformInternal(), inputBundle, 
evaluationContext))
@@ -452,7 +452,7 @@ public class TransformExecutorTest {
 
     WindowedValue<byte[]> fooBytes = 
WindowedValue.valueInGlobalWindow("foo".getBytes());
     CommittedBundle<byte[]> inputBundle =
-        InProcessBundle.unkeyed(pcBytes).add(fooBytes).commit(Instant.now());
+        
bundleFactory.createRootBundle(pcBytes).add(fooBytes).commit(Instant.now());
     when(
             registry.forApplication(
                 pcBytes.getProducingTransformInternal(), inputBundle, 
evaluationContext))

Reply via email to