[jira] [Work logged] (BEAM-4745) SDF tests broken by innocent change due to Dataflow worker dependencies
[ https://issues.apache.org/jira/browse/BEAM-4745?focusedWorklogId=121822&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-121822 ] ASF GitHub Bot logged work on BEAM-4745: Author: ASF GitHub Bot Created on: 11/Jul/18 07:48 Start Date: 11/Jul/18 07:48 Worklog Time Spent: 10m Work Description: iemejia closed pull request #5925: [BEAM-4745, BEAM-4016] Reintroduces Setup and TearDown on SplitRestrictionFn and PairWithRestrictionFn URL: https://github.com/apache/beam/pull/5925 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDo.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDo.java index b581eecf414..8d8da216afa 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDo.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDo.java @@ -415,6 +415,7 @@ public String apply(T input) { @Setup public void setup() { invoker = DoFnInvokers.invokerFor(fn); + invoker.invokeSetup(); } @ProcessElement @@ -422,6 +423,12 @@ public void processElement(ProcessContext context) { context.output( KV.of(context.element(), invoker.invokeGetInitialRestriction(context.element(; } + +@Teardown +public void tearDown() { + invoker.invokeTeardown(); + invoker = null; +} } /** Splits the restriction using the given {@link SplitRestriction} method. */ @@ -439,6 +446,7 @@ public void processElement(ProcessContext context) { @Setup public void setup() { invoker = DoFnInvokers.invokerFor(splittableFn); + invoker.invokeSetup(); } @ProcessElement @@ -459,5 +467,11 @@ public void outputWithTimestamp(RestrictionT part, Instant timestamp) { } }); } + +@Teardown +public void tearDown() { + invoker.invokeTeardown(); + invoker = null; +} } } diff --git a/runners/google-cloud-dataflow-java/build.gradle b/runners/google-cloud-dataflow-java/build.gradle index 5705ad48cbd..98535fa7436 100644 --- a/runners/google-cloud-dataflow-java/build.gradle +++ b/runners/google-cloud-dataflow-java/build.gradle @@ -36,7 +36,7 @@ processResources { filter org.apache.tools.ant.filters.ReplaceTokens, tokens: [ 'dataflow.legacy_environment_major_version' : '7', 'dataflow.fnapi_environment_major_version' : '7', -'dataflow.container_version' : 'beam-master-20180619' +'dataflow.container_version' : 'beam-master-20180710' ] } diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java index df94b841f47..3aaabc2dd42 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java @@ -513,18 +513,19 @@ public void testLateData() throws Exception { private State state = State.BEFORE_SETUP; -@ProcessElement -public void processElement(ProcessContext c, OffsetRangeTracker tracker) { - assertEquals(State.INSIDE_BUNDLE, state); - assertTrue(tracker.tryClaim(0L)); - c.output(c.element()); -} - @GetInitialRestriction public OffsetRange getInitialRestriction(String value) { + assertEquals(State.OUTSIDE_BUNDLE, state); return new OffsetRange(0, 1); } +@SplitRestriction +public void splitRestriction( +String value, OffsetRange range, OutputReceiver receiver) { + assertEquals(State.OUTSIDE_BUNDLE, state); + receiver.output(range); +} + @Setup public void setUp() { assertEquals(State.BEFORE_SETUP, state); @@ -537,6 +538,13 @@ public void startBundle() { state = State.INSIDE_BUNDLE; } +@ProcessElement +public void processElement(ProcessContext c, OffsetRangeTracker tracker) { + assertEquals(State.INSIDE_BUNDLE, state); + assertTrue(tracker.tryClaim(0L)); + c.output(c.element()); +} + @FinishBundle public void finishBundle() { assertEquals(State.INSIDE_BUNDLE, state); @@ -553,12 +561,9 @@ public void tearDown() { @Test @Category({ValidatesRunner.class, UsesSplittableParDo.class}) public void testLifecycleMethods() throws Exception { - PCollection res = p.apply(Create.of("a", "b", "c")).apply(ParDo.of(new S
[jira] [Work logged] (BEAM-4745) SDF tests broken by innocent change due to Dataflow worker dependencies
[ https://issues.apache.org/jira/browse/BEAM-4745?focusedWorklogId=121797&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-121797 ] ASF GitHub Bot logged work on BEAM-4745: Author: ASF GitHub Bot Created on: 11/Jul/18 07:02 Start Date: 11/Jul/18 07:02 Worklog Time Spent: 10m Work Description: iemejia commented on issue #5925: [BEAM-4745, BEAM-4016] Reintroduces Setup and TearDown on SplitRestrictionFn and PairWithRestrictionFn URL: https://github.com/apache/beam/pull/5925#issuecomment-404066282 Run Java Precommit This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 121797) Time Spent: 50m (was: 40m) > SDF tests broken by innocent change due to Dataflow worker dependencies > --- > > Key: BEAM-4745 > URL: https://issues.apache.org/jira/browse/BEAM-4745 > Project: Beam > Issue Type: Bug > Components: runner-dataflow >Reporter: Eugene Kirpichov >Assignee: Eugene Kirpichov >Priority: Critical > Time Spent: 50m > Remaining Estimate: 0h > > https://github.com/apache/beam/pull/5894 broke SDF in Dataflow streaming > runner, using SDFs fails with the error below. > The reason is that Dataflow worker has a staged copy of some stuff including > runners-core-construction, and it comes before user code in the classpath. So > the pipeline includes a serialized SplittableParDo from master, but the > worker deserializes it using a stale class file. > This needs to be fixed on Dataflow side. Filing this JIRA just to track the > externally facing issue. > Meanwhile to stop the bleeding I'm going to revert the change, even though by > itself it's a correct change, but it's better to have SDFs not invoke > setup/teardown than to have them not work at all. > CC: [~iemejia] > java.lang.RuntimeException: > com.google.cloud.dataflow.worker.repackaged.com.google.common.util.concurrent.UncheckedExecutionException: > java.lang.IllegalArgumentException: unable to deserialize Serialized DoFnInfo > > com.google.cloud.dataflow.worker.IntrinsicMapTaskExecutorFactory$1.typedApply(IntrinsicMapTaskExecutorFactory.java:192) > > com.google.cloud.dataflow.worker.IntrinsicMapTaskExecutorFactory$1.typedApply(IntrinsicMapTaskExecutorFactory.java:163) > > com.google.cloud.dataflow.worker.graph.Networks$TypeSafeNodeFunction.apply(Networks.java:63) > > com.google.cloud.dataflow.worker.graph.Networks$TypeSafeNodeFunction.apply(Networks.java:50) > > com.google.cloud.dataflow.worker.graph.Networks.replaceDirectedNetworkNodes(Networks.java:87) > > com.google.cloud.dataflow.worker.IntrinsicMapTaskExecutorFactory.create(IntrinsicMapTaskExecutorFactory.java:123) > > com.google.cloud.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1143) > > com.google.cloud.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:136) > > com.google.cloud.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:966) > > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > java.lang.Thread.run(Thread.java:745) > Caused by: > com.google.cloud.dataflow.worker.repackaged.com.google.common.util.concurrent.UncheckedExecutionException: > java.lang.IllegalArgumentException: unable to deserialize Serialized DoFnInfo > > com.google.cloud.dataflow.worker.repackaged.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2214) > > com.google.cloud.dataflow.worker.repackaged.com.google.common.cache.LocalCache.get(LocalCache.java:4053) > > com.google.cloud.dataflow.worker.repackaged.com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4899) > > com.google.cloud.dataflow.worker.UserParDoFnFactory.create(UserParDoFnFactory.java:90) > > com.google.cloud.dataflow.worker.DefaultParDoFnFactory.create(DefaultParDoFnFactory.java:74) > > com.google.cloud.dataflow.worker.IntrinsicMapTaskExecutorFactory.createParDoOperation(IntrinsicMapTaskExecutorFactory.java:262) > > com.google.cloud.dataflow.worker.IntrinsicMapTaskExecutorFactory.access$000(IntrinsicMapTaskExecutorFactory.java:84) > > com.google.cloud.dataflow.worker.IntrinsicMapTaskExecutorFactory$1.typedApply(
[jira] [Work logged] (BEAM-4745) SDF tests broken by innocent change due to Dataflow worker dependencies
[ https://issues.apache.org/jira/browse/BEAM-4745?focusedWorklogId=121740&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-121740 ] ASF GitHub Bot logged work on BEAM-4745: Author: ASF GitHub Bot Created on: 11/Jul/18 01:47 Start Date: 11/Jul/18 01:47 Worklog Time Spent: 10m Work Description: jkff commented on issue #5925: [BEAM-4745, BEAM-4016] Reintroduces Setup and TearDown on SplitRestrictionFn and PairWithRestrictionFn URL: https://github.com/apache/beam/pull/5925#issuecomment-404018457 Run Dataflow ValidatesRunner This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 121740) Time Spent: 40m (was: 0.5h) > SDF tests broken by innocent change due to Dataflow worker dependencies > --- > > Key: BEAM-4745 > URL: https://issues.apache.org/jira/browse/BEAM-4745 > Project: Beam > Issue Type: Bug > Components: runner-dataflow >Reporter: Eugene Kirpichov >Assignee: Eugene Kirpichov >Priority: Critical > Time Spent: 40m > Remaining Estimate: 0h > > https://github.com/apache/beam/pull/5894 broke SDF in Dataflow streaming > runner, using SDFs fails with the error below. > The reason is that Dataflow worker has a staged copy of some stuff including > runners-core-construction, and it comes before user code in the classpath. So > the pipeline includes a serialized SplittableParDo from master, but the > worker deserializes it using a stale class file. > This needs to be fixed on Dataflow side. Filing this JIRA just to track the > externally facing issue. > Meanwhile to stop the bleeding I'm going to revert the change, even though by > itself it's a correct change, but it's better to have SDFs not invoke > setup/teardown than to have them not work at all. > CC: [~iemejia] > java.lang.RuntimeException: > com.google.cloud.dataflow.worker.repackaged.com.google.common.util.concurrent.UncheckedExecutionException: > java.lang.IllegalArgumentException: unable to deserialize Serialized DoFnInfo > > com.google.cloud.dataflow.worker.IntrinsicMapTaskExecutorFactory$1.typedApply(IntrinsicMapTaskExecutorFactory.java:192) > > com.google.cloud.dataflow.worker.IntrinsicMapTaskExecutorFactory$1.typedApply(IntrinsicMapTaskExecutorFactory.java:163) > > com.google.cloud.dataflow.worker.graph.Networks$TypeSafeNodeFunction.apply(Networks.java:63) > > com.google.cloud.dataflow.worker.graph.Networks$TypeSafeNodeFunction.apply(Networks.java:50) > > com.google.cloud.dataflow.worker.graph.Networks.replaceDirectedNetworkNodes(Networks.java:87) > > com.google.cloud.dataflow.worker.IntrinsicMapTaskExecutorFactory.create(IntrinsicMapTaskExecutorFactory.java:123) > > com.google.cloud.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1143) > > com.google.cloud.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:136) > > com.google.cloud.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:966) > > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > java.lang.Thread.run(Thread.java:745) > Caused by: > com.google.cloud.dataflow.worker.repackaged.com.google.common.util.concurrent.UncheckedExecutionException: > java.lang.IllegalArgumentException: unable to deserialize Serialized DoFnInfo > > com.google.cloud.dataflow.worker.repackaged.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2214) > > com.google.cloud.dataflow.worker.repackaged.com.google.common.cache.LocalCache.get(LocalCache.java:4053) > > com.google.cloud.dataflow.worker.repackaged.com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4899) > > com.google.cloud.dataflow.worker.UserParDoFnFactory.create(UserParDoFnFactory.java:90) > > com.google.cloud.dataflow.worker.DefaultParDoFnFactory.create(DefaultParDoFnFactory.java:74) > > com.google.cloud.dataflow.worker.IntrinsicMapTaskExecutorFactory.createParDoOperation(IntrinsicMapTaskExecutorFactory.java:262) > > com.google.cloud.dataflow.worker.IntrinsicMapTaskExecutorFactory.access$000(IntrinsicMapTaskExecutorFactory.java:84) > > com.google.cloud.dataflow.worker.IntrinsicMapTaskExecutorFactory$1.type
[jira] [Work logged] (BEAM-4745) SDF tests broken by innocent change due to Dataflow worker dependencies
[ https://issues.apache.org/jira/browse/BEAM-4745?focusedWorklogId=121739&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-121739 ] ASF GitHub Bot logged work on BEAM-4745: Author: ASF GitHub Bot Created on: 11/Jul/18 01:46 Start Date: 11/Jul/18 01:46 Worklog Time Spent: 10m Work Description: jkff opened a new pull request #5925: [BEAM-4745, BEAM-4016] Reintroduces Setup and TearDown on SplitRestrictionFn and PairWithRestrictionFn URL: https://github.com/apache/beam/pull/5925 This should be safe now, after a Dataflow worker release that is more correctly shaded and picks up fresh versions of this code. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 121739) Time Spent: 0.5h (was: 20m) > SDF tests broken by innocent change due to Dataflow worker dependencies > --- > > Key: BEAM-4745 > URL: https://issues.apache.org/jira/browse/BEAM-4745 > Project: Beam > Issue Type: Bug > Components: runner-dataflow >Reporter: Eugene Kirpichov >Assignee: Eugene Kirpichov >Priority: Critical > Time Spent: 0.5h > Remaining Estimate: 0h > > https://github.com/apache/beam/pull/5894 broke SDF in Dataflow streaming > runner, using SDFs fails with the error below. > The reason is that Dataflow worker has a staged copy of some stuff including > runners-core-construction, and it comes before user code in the classpath. So > the pipeline includes a serialized SplittableParDo from master, but the > worker deserializes it using a stale class file. > This needs to be fixed on Dataflow side. Filing this JIRA just to track the > externally facing issue. > Meanwhile to stop the bleeding I'm going to revert the change, even though by > itself it's a correct change, but it's better to have SDFs not invoke > setup/teardown than to have them not work at all. > CC: [~iemejia] > java.lang.RuntimeException: > com.google.cloud.dataflow.worker.repackaged.com.google.common.util.concurrent.UncheckedExecutionException: > java.lang.IllegalArgumentException: unable to deserialize Serialized DoFnInfo > > com.google.cloud.dataflow.worker.IntrinsicMapTaskExecutorFactory$1.typedApply(IntrinsicMapTaskExecutorFactory.java:192) > > com.google.cloud.dataflow.worker.IntrinsicMapTaskExecutorFactory$1.typedApply(IntrinsicMapTaskExecutorFactory.java:163) > > com.google.cloud.dataflow.worker.graph.Networks$TypeSafeNodeFunction.apply(Networks.java:63) > > com.google.cloud.dataflow.worker.graph.Networks$TypeSafeNodeFunction.apply(Networks.java:50) > > com.google.cloud.dataflow.worker.graph.Networks.replaceDirectedNetworkNodes(Networks.java:87) > > com.google.cloud.dataflow.worker.IntrinsicMapTaskExecutorFactory.create(IntrinsicMapTaskExecutorFactory.java:123) > > com.google.cloud.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1143) > > com.google.cloud.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:136) > > com.google.cloud.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:966) > > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > java.lang.Thread.run(Thread.java:745) > Caused by: > com.google.cloud.dataflow.worker.repackaged.com.google.common.util.concurrent.UncheckedExecutionException: > java.lang.IllegalArgumentException: unable to deserialize Serialized DoFnInfo > > com.google.cloud.dataflow.worker.repackaged.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2214) > > com.google.cloud.dataflow.worker.repackaged.com.google.common.cache.LocalCache.get(LocalCache.java:4053) > > com.google.cloud.dataflow.worker.repackaged.com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4899) > > com.google.cloud.dataflow.worker.UserParDoFnFactory.create(UserParDoFnFactory.java:90) > > com.google.cloud.dataflow.worker.DefaultParDoFnFactory.create(DefaultParDoFnFactory.java:74) > > com.google.cloud.dataflow.worker.IntrinsicMapTaskExecutorFactory.createParDoOperation(IntrinsicMapTaskExecutorFactory.java:262) > > com.google.cloud.dataflow.worker.IntrinsicMapTaskExecutorFactory.access$000(IntrinsicMapTaskExecutorFactory.java:8
[jira] [Work logged] (BEAM-4745) SDF tests broken by innocent change due to Dataflow worker dependencies
[ https://issues.apache.org/jira/browse/BEAM-4745?focusedWorklogId=120996&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-120996 ] ASF GitHub Bot logged work on BEAM-4745: Author: ASF GitHub Bot Created on: 09/Jul/18 20:12 Start Date: 09/Jul/18 20:12 Worklog Time Spent: 10m Work Description: iemejia closed pull request #5905: [BEAM-4745] Revert "[BEAM-4016] Invoke Setup and TearDown on SplitRestrictionFn and PairWithRestrictionFn" URL: https://github.com/apache/beam/pull/5905 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDo.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDo.java index 8d8da216afa..b581eecf414 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDo.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDo.java @@ -415,7 +415,6 @@ public String apply(T input) { @Setup public void setup() { invoker = DoFnInvokers.invokerFor(fn); - invoker.invokeSetup(); } @ProcessElement @@ -423,12 +422,6 @@ public void processElement(ProcessContext context) { context.output( KV.of(context.element(), invoker.invokeGetInitialRestriction(context.element(; } - -@Teardown -public void tearDown() { - invoker.invokeTeardown(); - invoker = null; -} } /** Splits the restriction using the given {@link SplitRestriction} method. */ @@ -446,7 +439,6 @@ public void tearDown() { @Setup public void setup() { invoker = DoFnInvokers.invokerFor(splittableFn); - invoker.invokeSetup(); } @ProcessElement @@ -467,11 +459,5 @@ public void outputWithTimestamp(RestrictionT part, Instant timestamp) { } }); } - -@Teardown -public void tearDown() { - invoker.invokeTeardown(); - invoker = null; -} } } diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java index fe33b1ab5b8..b7f0c10d046 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java @@ -513,19 +513,18 @@ public void testLateData() throws Exception { private State state = State.BEFORE_SETUP; +@ProcessElement +public void processElement(ProcessContext c, OffsetRangeTracker tracker) { + assertEquals(State.INSIDE_BUNDLE, state); + assertTrue(tracker.tryClaim(0L)); + c.output(c.element()); +} + @GetInitialRestriction public OffsetRange getInitialRestriction(String value) { - assertEquals(State.OUTSIDE_BUNDLE, state); return new OffsetRange(0, 1); } -@SplitRestriction -public void splitRestriction( -String value, OffsetRange range, OutputReceiver receiver) { - assertEquals(State.OUTSIDE_BUNDLE, state); - receiver.output(range); -} - @Setup public void setUp() { assertEquals(State.BEFORE_SETUP, state); @@ -538,13 +537,6 @@ public void startBundle() { state = State.INSIDE_BUNDLE; } -@ProcessElement -public void processElement(ProcessContext c, OffsetRangeTracker tracker) { - assertEquals(State.INSIDE_BUNDLE, state); - assertTrue(tracker.tryClaim(0L)); - c.output(c.element()); -} - @FinishBundle public void finishBundle() { assertEquals(State.INSIDE_BUNDLE, state); @@ -561,9 +553,12 @@ public void tearDown() { @Test @Category({ValidatesRunner.class, UsesSplittableParDo.class}) public void testLifecycleMethods() throws Exception { + PCollection res = p.apply(Create.of("a", "b", "c")).apply(ParDo.of(new SDFWithLifecycle())); + PAssert.that(res).containsInAnyOrder("a", "b", "c"); + p.run(); } This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 120996) Time Spent: 20m (was: 10m) > SDF tests broken by innocent change due to Dataflow worker dependencies >
[jira] [Work logged] (BEAM-4745) SDF tests broken by innocent change due to Dataflow worker dependencies
[ https://issues.apache.org/jira/browse/BEAM-4745?focusedWorklogId=120947&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-120947 ] ASF GitHub Bot logged work on BEAM-4745: Author: ASF GitHub Bot Created on: 09/Jul/18 18:15 Start Date: 09/Jul/18 18:15 Worklog Time Spent: 10m Work Description: jkff opened a new pull request #5905: [BEAM-4745] Revert "[BEAM-4016] Invoke Setup and TearDown on SplitRestrictionFn and PairWithRestrictionFn" URL: https://github.com/apache/beam/pull/5905 Temporarily reverts #5894 . See https://issues.apache.org/jira/browse/BEAM-4745 for reasoning. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 120947) Time Spent: 10m Remaining Estimate: 0h > SDF tests broken by innocent change due to Dataflow worker dependencies > --- > > Key: BEAM-4745 > URL: https://issues.apache.org/jira/browse/BEAM-4745 > Project: Beam > Issue Type: Bug > Components: runner-dataflow >Reporter: Eugene Kirpichov >Assignee: Eugene Kirpichov >Priority: Critical > Time Spent: 10m > Remaining Estimate: 0h > > https://github.com/apache/beam/pull/5894 broke SDF in Dataflow streaming > runner, using SDFs fails with the error below. > The reason is that Dataflow worker has a staged copy of some stuff including > runners-core-construction, and it comes before user code in the classpath. So > the pipeline includes a serialized SplittableParDo from master, but the > worker deserializes it using a stale class file. > This needs to be fixed on Dataflow side. Filing this JIRA just to track the > externally facing issue. > Meanwhile to stop the bleeding I'm going to revert the change, even though by > itself it's a correct change, but it's better to have SDFs not invoke > setup/teardown than to have them not work at all. > CC: [~iemejia] > java.lang.RuntimeException: > com.google.cloud.dataflow.worker.repackaged.com.google.common.util.concurrent.UncheckedExecutionException: > java.lang.IllegalArgumentException: unable to deserialize Serialized DoFnInfo > > com.google.cloud.dataflow.worker.IntrinsicMapTaskExecutorFactory$1.typedApply(IntrinsicMapTaskExecutorFactory.java:192) > > com.google.cloud.dataflow.worker.IntrinsicMapTaskExecutorFactory$1.typedApply(IntrinsicMapTaskExecutorFactory.java:163) > > com.google.cloud.dataflow.worker.graph.Networks$TypeSafeNodeFunction.apply(Networks.java:63) > > com.google.cloud.dataflow.worker.graph.Networks$TypeSafeNodeFunction.apply(Networks.java:50) > > com.google.cloud.dataflow.worker.graph.Networks.replaceDirectedNetworkNodes(Networks.java:87) > > com.google.cloud.dataflow.worker.IntrinsicMapTaskExecutorFactory.create(IntrinsicMapTaskExecutorFactory.java:123) > > com.google.cloud.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1143) > > com.google.cloud.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:136) > > com.google.cloud.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:966) > > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > java.lang.Thread.run(Thread.java:745) > Caused by: > com.google.cloud.dataflow.worker.repackaged.com.google.common.util.concurrent.UncheckedExecutionException: > java.lang.IllegalArgumentException: unable to deserialize Serialized DoFnInfo > > com.google.cloud.dataflow.worker.repackaged.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2214) > > com.google.cloud.dataflow.worker.repackaged.com.google.common.cache.LocalCache.get(LocalCache.java:4053) > > com.google.cloud.dataflow.worker.repackaged.com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4899) > > com.google.cloud.dataflow.worker.UserParDoFnFactory.create(UserParDoFnFactory.java:90) > > com.google.cloud.dataflow.worker.DefaultParDoFnFactory.create(DefaultParDoFnFactory.java:74) > > com.google.cloud.dataflow.worker.IntrinsicMapTaskExecutorFactory.createParDoOperation(IntrinsicMapTaskExecutorFactory.java:262) > > com.google.cloud.dataflow.worker.IntrinsicMapTaskExecutorFactory.access$000(IntrinsicMapTaskExecutorFactory.j