Thanks, I'll check it out. I split inputAdaptor.adapt() into different DoFn for testing and it threw the same exception for the new DoFn. So I guess it's because of inputAdaptor.adapt().
On Fri, May 6, 2022 at 11:45 AM Reuven Lax <[email protected]> wrote: > I meant to say .equals() not compareTo. > > On Fri, May 6, 2022 at 11:44 AM Reuven Lax <[email protected]> wrote: > >> Unfortunately I'm not very familiar with Scio. However this could also be >> caused by an object that either doesn't properly implement the compareTo >> method or the coder doesn't return such an object in structuralValue. >> >> On Fri, May 6, 2022 at 11:26 AM Yuri Jin <[email protected]> wrote: >> >>> Reuven, thanks for the reply. >>> >>> The input type is "KafkaRecord[Array[Byte], Array[Byte]]" and uses the >>> "KafkaRecordCoder.of(NullableCoder.of(ByteArrayCoder.of), >>> ByteArrayCoder.of)" coder. >>> I can't paste the code for DoFn due to company policy, but here's the >>> structure: >>> >>> ////////////////////////////////////// >>> Pipeline.scala >>> ------------------------------- >>> type InputType = KafkaRecord[Array[Byte], Array[Byte]] >>> >>> ////////////////////////////////////// >>> ParsePayloadDoFn.scala >>> ------------------------------- >>> class ParsePayloadDoFn[InputType]( >>> inputAdaptor: RowAdaptor[InputType], >>> ... >>> deadLetterTag: TupleTag[KV[KeyType, ValueType]]) extends >>> DoFn[InputType, OutpuType] { >>> >>> @Setup >>> def setup(): Unit = >>> inputAdaptor.setup() >>> >>> @ProcessElement >>> def processElement(c: DoFn[InputType, OutputType]#ProcessContext): >>> Unit = >>> Try { >>> val result: Result = inputAdaptor.adapt(c.element()) // parse >>> payload >>> ... >>> val deadLetterMessageFn: KV[KeyType, ValueType] => Unit = >>> c.output(deadLetterTag, _) >>> val outputPayloadFn: OutputType => Unit = c.output >>> >>> result.protocol match { >>> case error: ErrorType => >>> deadLetterMessageFn(KV.of(..., ...)) >>> case payload: Payload => >>> payload.events.zipWithIndex.foreach { >>> case failure: ParsingFailure => >>> deadLetterMessageFn(KV.of(..., ...)) >>> case (message: Message, index: Int) => >>> // extract body from Message >>> val body = ... >>> // make a GET http call and compose output >>> val output = ... >>> >>> outputPayloadFn( >>> OutputType( >>> output, >>> ... >>> payload.header, >>> body, >>> index >>> ) >>> ) >>> } >>> } >>> } match { >>> case Failure(exception) => >>> error( >>> s"ParsePayloadDoFn - unhandled exception: >>> ${exception.getMessage}\nStack trace: >>> ${ExceptionUtils.getStackTrace(exception)}" >>> ) >>> case Success(_) => () >>> } >>> } >>> ////////////////////////////////////// >>> >>> For reference, we are using Scio v0.11.5 and Beam v2.36.0. >>> >>> Thank you, >>> Yuri Jin >>> >>> >>> On Thu, May 5, 2022 at 10:36 PM Reuven Lax <[email protected]> wrote: >>> >>>> What is the type of the input - do you have a custom coder? Are you >>>> able to paste the code for your DoFn? >>>> >>>> In answer to your question - Direct runner tests for this, because it >>>> is a testing runner. This error scenario can cause random unexpected >>>> behavior in production runners, which is why the testing runner tries to >>>> explicitly detect it. >>>> >>>> Reuven >>>> >>>> On Thu, May 5, 2022 at 8:52 PM Yuri Jin <[email protected]> wrote: >>>> >>>>> Hi Beam users, >>>>> >>>>> We have a DoFn that reads data from Kafka and parses an array byte >>>>> payload. It works fine with dataflow runner, but throws >>>>> IllegalMutationException with direct runner. It does not directly modify >>>>> the input value. Therefore, I am guessing that the output is different >>>>> when >>>>> there are multiple input values. >>>>> >>>>> The detailed error is as follows. >>>>> >>>>> Exception in thread "main" >>>>> org.apache.beam.sdk.util.IllegalMutationException: PTransform Parse >>>>> Payload >>>>> mutated value "DoFnOutputA" after it was output (new value was >>>>> "DoFnOutputB"). Values must not be mutated in any way after being output. >>>>> at >>>>> org.apache.beam.runners.direct.ImmutabilityCheckingBundleFactory$ImmutabilityEnforcingBundle.commit(ImmutabilityCheckingBundleFactory.java:137) >>>>> at >>>>> org.apache.beam.runners.direct.EvaluationContext.commitBundles(EvaluationContext.java:228) >>>>> at >>>>> org.apache.beam.runners.direct.EvaluationContext.handleResult(EvaluationContext.java:160) >>>>> at >>>>> org.apache.beam.runners.direct.QuiescenceDriver$TimerIterableCompletionCallback.handleResult(QuiescenceDriver.java:292) >>>>> at >>>>> org.apache.beam.runners.direct.DirectTransformExecutor.finishBundle(DirectTransformExecutor.java:194) >>>>> at >>>>> org.apache.beam.runners.direct.DirectTransformExecutor.run(DirectTransformExecutor.java:131) >>>>> at >>>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) >>>>> at java.util.concurrent.FutureTask.run(FutureTask.java:266) >>>>> at >>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) >>>>> at >>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) >>>>> at java.lang.Thread.run(Thread.java:748) >>>>> Caused by: org.apache.beam.sdk.util.IllegalMutationException: Value >>>>> "DoFnOutputA" mutated illegally, new value was "DoFnOutputB". Encoding was >>>>> "Base64EncodedA", now "Base64EncodedB". >>>>> at >>>>> org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector.illegalMutation(MutationDetectors.java:158) >>>>> at >>>>> org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector.verifyUnmodifiedThrowingCheckedExceptions(MutationDetectors.java:153) >>>>> at >>>>> org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector.verifyUnmodified(MutationDetectors.java:128) >>>>> at >>>>> org.apache.beam.runners.direct.ImmutabilityCheckingBundleFactory$ImmutabilityEnforcingBundle.commit(ImmutabilityCheckingBundleFactory.java:127) >>>>> ... 10 more >>>>> >>>>> * "DoFnOutputA" and "DoFnOutputB" are the same, but "Base64EncodedA" >>>>> is different from "Base64EncodedB". >>>>> >>>>> I was wondering if you could give me some advice on the following >>>>> questions. >>>>> >>>>> 1. How can we find the problematic part? I did some unit tests, but I >>>>> couldn't reproduce them. >>>>> 2. Have you experienced the same error and solved it? >>>>> 3. Only Direct runner enforces immutability for DoFns. Is it safe to >>>>> use the "enforceImmutability=false" option? >>>>> >>>>> >>>>> Any comments would be appreciated. >>>>> >>>>> Thanks, >>>>> Yuri Jin >>>>> >>>> >>> >>> -- >>> Yuri Jin >>> Senior Software Developer, Data Platform >>> [email protected] >>> (+1) 778-858-3585 <(778)%20858-3585> >>> unity.com >>> >> -- Yuri Jin Senior Software Developer, Data Platform [email protected] (+1) 778-858-3585 unity.com
