Re: Accessing keyed state in portable timer callbacks

2018-11-02 Thread Lukasz Cwik
Yes, I was hoping to merge the logic within the
SplittableProcessElementsRunner back into the FnApiDoFnRunner since there
is some duplication and it is the original reason why FnApiStateAccessor
exists. Previously we just referenced the correct object directly within
the class based upon the current context
(StartBundle/FinishBundle/ProcessElement/OnTimer).

On Thu, Nov 1, 2018 at 2:43 AM Maximilian Michels  wrote:

> Hi Lukasz,
>
> Thanks for promptly fixing this [1]. I saw that the current element was
> not set correctly when timers are processed, but wanted to make sure any
> changes would be aligned with the harness processing model.
>
> I think I favor the currentElementOrTimer approach because it makes
> things more explicit, but the solution is fine for now.
>
> Thanks,
> Max
>
> [1] https://github.com/apache/beam/pull/6902
>
> On 31.10.18 19:09, Lukasz Cwik wrote:
> > I filed https://issues.apache.org/jira/browse/BEAM-5930.
> >
> > On Wed, Oct 31, 2018 at 10:22 AM Lukasz Cwik  > > wrote:
> >
> > That looks like a bug in the FnApiDoFnRunner.java
> >
> > The FnApiStateAccessor is given a callback to get the current
> > element and it is not handling the case where the current element is
> > a timer.
> >
> > callback:
> >
> https://github.com/apache/beam/blob/29c443162a2fe4c89d26336b30aa6e3a3bfbade8/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java#L212
> > where the current "element" gets set:
> >
> https://github.com/apache/beam/blob/29c443162a2fe4c89d26336b30aa6e3a3bfbade8/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java#L220
> > where the current "timer" gets set:
> >
> https://github.com/apache/beam/blob/29c443162a2fe4c89d26336b30aa6e3a3bfbade8/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java#L237
> >
> > The easiest fix would be to have the callback return the first non
> > null from currentElement/currentTimer but longer term I think we'll
> > want a different solution. Alternatively, we could collapse
> > currentElement and currentTimer to be currentElementOrTimer which
> > would solve the accessor issue.
> >
> > On Wed, Oct 31, 2018 at 9:50 AM Maximilian Michels  > > wrote:
> >
> > Hi,
> >
> > I have a question regarding user state during timer callback in
> > the FnApiDoFnRunner (Java SDK Harness).
> >
> > I've started implementing Timers for the portable Flink Runner.
> > I can register a timer via the timer output collection and fire
> > the timer via the timer input of the SDK Harness. But when I try
> > to access state in the Timer callback, I get the exception below.
> >
> > Is this a bug or if not, how is the timer's key supposed to be
> > set? I assume that it should be set from the timer element which
> > contains the key.
> >
> > Thanks,
> > Max
> >
> >
> > Caused by: java.util.concurrent.ExecutionException:
> > java.lang.RuntimeException: Error received from SDK harness for
> > instruction 72: java.util.concurrent.ExecutionException:
> > java.lang.NullPointerException
> >  at
> >
>  java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
> >  at
> >
>  java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
> >  at
> >
>  
> org.apache.beam.sdk.fn.data.CompletableFutureInboundDataClient.awaitCompletion(CompletableFutureInboundDataClient.java:49)
> >  at
> >
>  
> org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver.awaitCompletion(BeamFnDataInboundObserver.java:90)
> >  at
> >
>  
> org.apache.beam.fn.harness.BeamFnDataReadRunner.blockTillReadFinishes(BeamFnDataReadRunner.java:185)
> >  at
> >
>  
> org.apache.beam.fn.harness.control.ProcessBundleHandler.processBundle(ProcessBundleHandler.java:292)
> >  at
> >
>  
> org.apache.beam.fn.harness.control.BeamFnControlClient.delegateOnInstructionRequestType(BeamFnControlClient.java:161)
> >  at
> >
>  
> org.apache.beam.fn.harness.control.BeamFnControlClient.lambda$processInstructionRequests$0(BeamFnControlClient.java:145)
> >  at
> >
>  
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> >  at
> >
>  
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> >  at java.lang.Thread.run(Thread.java:748)
> > Caused by: java.lang.NullPointerException
> >  at
> >
>  
> org.apache.beam.model.fnexecution.v1.BeamFnApi$StateKey$BagUserState$Builder.setKey(BeamFnApi.java:49694)
> >  at
> >
>  
> org.apache.beam.fn.harness.state.FnApiStateAccessor.createBagUserStateKey(FnApiStateAccessor.java:451)
> >  

Re: Accessing keyed state in portable timer callbacks

2018-11-01 Thread Maximilian Michels

Hi Lukasz,

Thanks for promptly fixing this [1]. I saw that the current element was 
not set correctly when timers are processed, but wanted to make sure any 
changes would be aligned with the harness processing model.


I think I favor the currentElementOrTimer approach because it makes 
things more explicit, but the solution is fine for now.


Thanks,
Max

[1] https://github.com/apache/beam/pull/6902

On 31.10.18 19:09, Lukasz Cwik wrote:

I filed https://issues.apache.org/jira/browse/BEAM-5930.

On Wed, Oct 31, 2018 at 10:22 AM Lukasz Cwik > wrote:


That looks like a bug in the FnApiDoFnRunner.java

The FnApiStateAccessor is given a callback to get the current
element and it is not handling the case where the current element is
a timer.

callback:

https://github.com/apache/beam/blob/29c443162a2fe4c89d26336b30aa6e3a3bfbade8/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java#L212
where the current "element" gets set:

https://github.com/apache/beam/blob/29c443162a2fe4c89d26336b30aa6e3a3bfbade8/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java#L220
where the current "timer" gets set:

https://github.com/apache/beam/blob/29c443162a2fe4c89d26336b30aa6e3a3bfbade8/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java#L237

The easiest fix would be to have the callback return the first non
null from currentElement/currentTimer but longer term I think we'll
want a different solution. Alternatively, we could collapse
currentElement and currentTimer to be currentElementOrTimer which
would solve the accessor issue.

On Wed, Oct 31, 2018 at 9:50 AM Maximilian Michels mailto:m...@apache.org>> wrote:

Hi,

I have a question regarding user state during timer callback in
the FnApiDoFnRunner (Java SDK Harness).

I've started implementing Timers for the portable Flink Runner.
I can register a timer via the timer output collection and fire
the timer via the timer input of the SDK Harness. But when I try
to access state in the Timer callback, I get the exception below.

Is this a bug or if not, how is the timer's key supposed to be
set? I assume that it should be set from the timer element which
contains the key.

Thanks,
Max


Caused by: java.util.concurrent.ExecutionException:
java.lang.RuntimeException: Error received from SDK harness for
instruction 72: java.util.concurrent.ExecutionException:
java.lang.NullPointerException
     at

java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
     at
java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
     at

org.apache.beam.sdk.fn.data.CompletableFutureInboundDataClient.awaitCompletion(CompletableFutureInboundDataClient.java:49)
     at

org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver.awaitCompletion(BeamFnDataInboundObserver.java:90)
     at

org.apache.beam.fn.harness.BeamFnDataReadRunner.blockTillReadFinishes(BeamFnDataReadRunner.java:185)
     at

org.apache.beam.fn.harness.control.ProcessBundleHandler.processBundle(ProcessBundleHandler.java:292)
     at

org.apache.beam.fn.harness.control.BeamFnControlClient.delegateOnInstructionRequestType(BeamFnControlClient.java:161)
     at

org.apache.beam.fn.harness.control.BeamFnControlClient.lambda$processInstructionRequests$0(BeamFnControlClient.java:145)
     at

java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
     at

java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
     at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NullPointerException
     at

org.apache.beam.model.fnexecution.v1.BeamFnApi$StateKey$BagUserState$Builder.setKey(BeamFnApi.java:49694)
     at

org.apache.beam.fn.harness.state.FnApiStateAccessor.createBagUserStateKey(FnApiStateAccessor.java:451)
     at

org.apache.beam.fn.harness.state.FnApiStateAccessor.bindBag(FnApiStateAccessor.java:244)
     at

org.apache.beam.sdk.state.StateSpecs$BagStateSpec.bind(StateSpecs.java:487)
     at

org.apache.beam.sdk.state.StateSpecs$BagStateSpec.bind(StateSpecs.java:477)
     at

org.apache.beam.fn.harness.FnApiDoFnRunner$OnTimerContext.state(FnApiDoFnRunner.java:671)
     at
StateTest$5$OnTimerInvoker$expiry$ZXhwaXJ5.invokeOnTimer(Unknown
Source)
     at


Re: Accessing keyed state in portable timer callbacks

2018-10-31 Thread Lukasz Cwik
I filed https://issues.apache.org/jira/browse/BEAM-5930.

On Wed, Oct 31, 2018 at 10:22 AM Lukasz Cwik  wrote:

> That looks like a bug in the FnApiDoFnRunner.java
>
> The FnApiStateAccessor is given a callback to get the current element and
> it is not handling the case where the current element is a timer.
>
> callback:
> https://github.com/apache/beam/blob/29c443162a2fe4c89d26336b30aa6e3a3bfbade8/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java#L212
> where the current "element" gets set:
> https://github.com/apache/beam/blob/29c443162a2fe4c89d26336b30aa6e3a3bfbade8/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java#L220
> where the current "timer" gets set:
> https://github.com/apache/beam/blob/29c443162a2fe4c89d26336b30aa6e3a3bfbade8/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java#L237
>
> The easiest fix would be to have the callback return the first non null
> from currentElement/currentTimer but longer term I think we'll want a
> different solution. Alternatively, we could collapse currentElement and
> currentTimer to be currentElementOrTimer which would solve the accessor
> issue.
>
> On Wed, Oct 31, 2018 at 9:50 AM Maximilian Michels  wrote:
>
>> Hi,
>>
>> I have a question regarding user state during timer callback in the
>> FnApiDoFnRunner (Java SDK Harness).
>>
>> I've started implementing Timers for the portable Flink Runner. I can
>> register a timer via the timer output collection and fire the timer via the
>> timer input of the SDK Harness. But when I try to access state in the Timer
>> callback, I get the exception below.
>>
>> Is this a bug or if not, how is the timer's key supposed to be set? I
>> assume that it should be set from the timer element which contains the key.
>>
>> Thanks,
>> Max
>>
>>
>> Caused by: java.util.concurrent.ExecutionException:
>> java.lang.RuntimeException: Error received from SDK harness for instruction
>> 72: java.util.concurrent.ExecutionException: java.lang.NullPointerException
>> at
>> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
>> at
>> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
>> at
>> org.apache.beam.sdk.fn.data.CompletableFutureInboundDataClient.awaitCompletion(CompletableFutureInboundDataClient.java:49)
>> at
>> org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver.awaitCompletion(BeamFnDataInboundObserver.java:90)
>> at
>> org.apache.beam.fn.harness.BeamFnDataReadRunner.blockTillReadFinishes(BeamFnDataReadRunner.java:185)
>> at
>> org.apache.beam.fn.harness.control.ProcessBundleHandler.processBundle(ProcessBundleHandler.java:292)
>> at
>> org.apache.beam.fn.harness.control.BeamFnControlClient.delegateOnInstructionRequestType(BeamFnControlClient.java:161)
>> at
>> org.apache.beam.fn.harness.control.BeamFnControlClient.lambda$processInstructionRequests$0(BeamFnControlClient.java:145)
>> at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>> at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>> at java.lang.Thread.run(Thread.java:748)
>> Caused by: java.lang.NullPointerException
>> at
>> org.apache.beam.model.fnexecution.v1.BeamFnApi$StateKey$BagUserState$Builder.setKey(BeamFnApi.java:49694)
>> at
>> org.apache.beam.fn.harness.state.FnApiStateAccessor.createBagUserStateKey(FnApiStateAccessor.java:451)
>> at
>> org.apache.beam.fn.harness.state.FnApiStateAccessor.bindBag(FnApiStateAccessor.java:244)
>> at
>> org.apache.beam.sdk.state.StateSpecs$BagStateSpec.bind(StateSpecs.java:487)
>> at
>> org.apache.beam.sdk.state.StateSpecs$BagStateSpec.bind(StateSpecs.java:477)
>> at
>> org.apache.beam.fn.harness.FnApiDoFnRunner$OnTimerContext.state(FnApiDoFnRunner.java:671)
>> at
>> StateTest$5$OnTimerInvoker$expiry$ZXhwaXJ5.invokeOnTimer(Unknown Source)
>> at
>> org.apache.beam.sdk.transforms.reflect.ByteBuddyDoFnInvokerFactory$DoFnInvokerBase.invokeOnTimer(ByteBuddyDoFnInvokerFactory.java:187)
>> at
>> org.apache.beam.fn.harness.FnApiDoFnRunner.processTimer(FnApiDoFnRunner.java:244)
>> at
>> org.apache.beam.fn.harness.DoFnPTransformRunnerFactory.lambda$createRunnerForPTransform$0(DoFnPTransformRunnerFactory.java:134)
>> at
>> org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver.accept(BeamFnDataInboundObserver.java:81)
>> at
>> org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver.accept(BeamFnDataInboundObserver.java:32)
>> at
>> org.apache.beam.sdk.fn.data.BeamFnDataGrpcMultiplexer$InboundObserver.onNext(BeamFnDataGrpcMultiplexer.java:139)
>> at
>> org.apache.beam.sdk.fn.data.BeamFnDataGrpcMultiplexer$InboundObserver.onNext(BeamFnDataGrpcMultiplexer.java:125)
>> at
>> 

Re: Accessing keyed state in portable timer callbacks

2018-10-31 Thread Lukasz Cwik
That looks like a bug in the FnApiDoFnRunner.java

The FnApiStateAccessor is given a callback to get the current element and
it is not handling the case where the current element is a timer.

callback:
https://github.com/apache/beam/blob/29c443162a2fe4c89d26336b30aa6e3a3bfbade8/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java#L212
where the current "element" gets set:
https://github.com/apache/beam/blob/29c443162a2fe4c89d26336b30aa6e3a3bfbade8/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java#L220
where the current "timer" gets set:
https://github.com/apache/beam/blob/29c443162a2fe4c89d26336b30aa6e3a3bfbade8/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java#L237

The easiest fix would be to have the callback return the first non null
from currentElement/currentTimer but longer term I think we'll want a
different solution. Alternatively, we could collapse currentElement and
currentTimer to be currentElementOrTimer which would solve the accessor
issue.

On Wed, Oct 31, 2018 at 9:50 AM Maximilian Michels  wrote:

> Hi,
>
> I have a question regarding user state during timer callback in the
> FnApiDoFnRunner (Java SDK Harness).
>
> I've started implementing Timers for the portable Flink Runner. I can
> register a timer via the timer output collection and fire the timer via the
> timer input of the SDK Harness. But when I try to access state in the Timer
> callback, I get the exception below.
>
> Is this a bug or if not, how is the timer's key supposed to be set? I
> assume that it should be set from the timer element which contains the key.
>
> Thanks,
> Max
>
>
> Caused by: java.util.concurrent.ExecutionException:
> java.lang.RuntimeException: Error received from SDK harness for instruction
> 72: java.util.concurrent.ExecutionException: java.lang.NullPointerException
> at
> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
> at
> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
> at
> org.apache.beam.sdk.fn.data.CompletableFutureInboundDataClient.awaitCompletion(CompletableFutureInboundDataClient.java:49)
> at
> org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver.awaitCompletion(BeamFnDataInboundObserver.java:90)
> at
> org.apache.beam.fn.harness.BeamFnDataReadRunner.blockTillReadFinishes(BeamFnDataReadRunner.java:185)
> at
> org.apache.beam.fn.harness.control.ProcessBundleHandler.processBundle(ProcessBundleHandler.java:292)
> at
> org.apache.beam.fn.harness.control.BeamFnControlClient.delegateOnInstructionRequestType(BeamFnControlClient.java:161)
> at
> org.apache.beam.fn.harness.control.BeamFnControlClient.lambda$processInstructionRequests$0(BeamFnControlClient.java:145)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.NullPointerException
> at
> org.apache.beam.model.fnexecution.v1.BeamFnApi$StateKey$BagUserState$Builder.setKey(BeamFnApi.java:49694)
> at
> org.apache.beam.fn.harness.state.FnApiStateAccessor.createBagUserStateKey(FnApiStateAccessor.java:451)
> at
> org.apache.beam.fn.harness.state.FnApiStateAccessor.bindBag(FnApiStateAccessor.java:244)
> at
> org.apache.beam.sdk.state.StateSpecs$BagStateSpec.bind(StateSpecs.java:487)
> at
> org.apache.beam.sdk.state.StateSpecs$BagStateSpec.bind(StateSpecs.java:477)
> at
> org.apache.beam.fn.harness.FnApiDoFnRunner$OnTimerContext.state(FnApiDoFnRunner.java:671)
> at
> StateTest$5$OnTimerInvoker$expiry$ZXhwaXJ5.invokeOnTimer(Unknown Source)
> at
> org.apache.beam.sdk.transforms.reflect.ByteBuddyDoFnInvokerFactory$DoFnInvokerBase.invokeOnTimer(ByteBuddyDoFnInvokerFactory.java:187)
> at
> org.apache.beam.fn.harness.FnApiDoFnRunner.processTimer(FnApiDoFnRunner.java:244)
> at
> org.apache.beam.fn.harness.DoFnPTransformRunnerFactory.lambda$createRunnerForPTransform$0(DoFnPTransformRunnerFactory.java:134)
> at
> org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver.accept(BeamFnDataInboundObserver.java:81)
> at
> org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver.accept(BeamFnDataInboundObserver.java:32)
> at
> org.apache.beam.sdk.fn.data.BeamFnDataGrpcMultiplexer$InboundObserver.onNext(BeamFnDataGrpcMultiplexer.java:139)
> at
> org.apache.beam.sdk.fn.data.BeamFnDataGrpcMultiplexer$InboundObserver.onNext(BeamFnDataGrpcMultiplexer.java:125)
> at
> org.apache.beam.sdk.fn.stream.ForwardingClientResponseObserver.onNext(ForwardingClientResponseObserver.java:50)
> at
> org.apache.beam.vendor.grpc.v1.io.grpc.stub.ClientCalls$StreamObserverToCallListenerAdapter.onMessage(ClientCalls.java:407)
> at
> 

Accessing keyed state in portable timer callbacks

2018-10-31 Thread Maximilian Michels

Hi,

I have a question regarding user state during timer callback in the 
FnApiDoFnRunner (Java SDK Harness).


I've started implementing Timers for the portable Flink Runner. I can 
register a timer via the timer output collection and fire the timer via 
the timer input of the SDK Harness. But when I try to access state in 
the Timer callback, I get the exception below.


Is this a bug or if not, how is the timer's key supposed to be set? I 
assume that it should be set from the timer element which contains the key.


Thanks,
Max


Caused by: java.util.concurrent.ExecutionException: 
java.lang.RuntimeException: Error received from SDK harness for 
instruction 72: java.util.concurrent.ExecutionException: 
java.lang.NullPointerException
    at 
java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
    at 
java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
    at 
org.apache.beam.sdk.fn.data.CompletableFutureInboundDataClient.awaitCompletion(CompletableFutureInboundDataClient.java:49)
    at 
org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver.awaitCompletion(BeamFnDataInboundObserver.java:90)
    at 
org.apache.beam.fn.harness.BeamFnDataReadRunner.blockTillReadFinishes(BeamFnDataReadRunner.java:185)
    at 
org.apache.beam.fn.harness.control.ProcessBundleHandler.processBundle(ProcessBundleHandler.java:292)
    at 
org.apache.beam.fn.harness.control.BeamFnControlClient.delegateOnInstructionRequestType(BeamFnControlClient.java:161)
    at 
org.apache.beam.fn.harness.control.BeamFnControlClient.lambda$processInstructionRequests$0(BeamFnControlClient.java:145)
    at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)

    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NullPointerException
    at 
org.apache.beam.model.fnexecution.v1.BeamFnApi$StateKey$BagUserState$Builder.setKey(BeamFnApi.java:49694)
    at 
org.apache.beam.fn.harness.state.FnApiStateAccessor.createBagUserStateKey(FnApiStateAccessor.java:451)
    at 
org.apache.beam.fn.harness.state.FnApiStateAccessor.bindBag(FnApiStateAccessor.java:244)
    at 
org.apache.beam.sdk.state.StateSpecs$BagStateSpec.bind(StateSpecs.java:487)
    at 
org.apache.beam.sdk.state.StateSpecs$BagStateSpec.bind(StateSpecs.java:477)
    at 
org.apache.beam.fn.harness.FnApiDoFnRunner$OnTimerContext.state(FnApiDoFnRunner.java:671)
    at 
StateTest$5$OnTimerInvoker$expiry$ZXhwaXJ5.invokeOnTimer(Unknown Source)
    at 
org.apache.beam.sdk.transforms.reflect.ByteBuddyDoFnInvokerFactory$DoFnInvokerBase.invokeOnTimer(ByteBuddyDoFnInvokerFactory.java:187)
    at 
org.apache.beam.fn.harness.FnApiDoFnRunner.processTimer(FnApiDoFnRunner.java:244)
    at 
org.apache.beam.fn.harness.DoFnPTransformRunnerFactory.lambda$createRunnerForPTransform$0(DoFnPTransformRunnerFactory.java:134)
    at 
org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver.accept(BeamFnDataInboundObserver.java:81)
    at 
org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver.accept(BeamFnDataInboundObserver.java:32)
    at 
org.apache.beam.sdk.fn.data.BeamFnDataGrpcMultiplexer$InboundObserver.onNext(BeamFnDataGrpcMultiplexer.java:139)
    at 
org.apache.beam.sdk.fn.data.BeamFnDataGrpcMultiplexer$InboundObserver.onNext(BeamFnDataGrpcMultiplexer.java:125)
    at 
org.apache.beam.sdk.fn.stream.ForwardingClientResponseObserver.onNext(ForwardingClientResponseObserver.java:50)
    at 
org.apache.beam.vendor.grpc.v1.io.grpc.stub.ClientCalls$StreamObserverToCallListenerAdapter.onMessage(ClientCalls.java:407)
    at 
org.apache.beam.vendor.grpc.v1.io.grpc.ForwardingClientCallListener.onMessage(ForwardingClientCallListener.java:33)
    at 
org.apache.beam.vendor.grpc.v1.io.grpc.ForwardingClientCallListener.onMessage(ForwardingClientCallListener.java:33)
    at 
org.apache.beam.vendor.grpc.v1.io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1MessagesAvailable.runInContext(ClientCallImpl.java:519)
    at 
org.apache.beam.vendor.grpc.v1.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
    at 
org.apache.beam.vendor.grpc.v1.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)