Thanks, I'll check it out.

I split inputAdaptor.adapt() into different DoFn for testing and it threw
the same exception for the new DoFn. So I guess it's because of
inputAdaptor.adapt().

On Fri, May 6, 2022 at 11:45 AM Reuven Lax <[email protected]> wrote:

> I meant to say .equals() not compareTo.
>
> On Fri, May 6, 2022 at 11:44 AM Reuven Lax <[email protected]> wrote:
>
>> Unfortunately I'm not very familiar with Scio. However this could also be
>> caused by an object that either doesn't properly implement the compareTo
>> method or the coder doesn't return such an object in structuralValue.
>>
>> On Fri, May 6, 2022 at 11:26 AM Yuri Jin <[email protected]> wrote:
>>
>>> Reuven, thanks for the reply.
>>>
>>> The input type is "KafkaRecord[Array[Byte], Array[Byte]]" and uses the
>>> "KafkaRecordCoder.of(NullableCoder.of(ByteArrayCoder.of),
>>> ByteArrayCoder.of)" coder.
>>> I can't paste the code for DoFn due to company policy, but here's the
>>> structure:
>>>
>>> //////////////////////////////////////
>>> Pipeline.scala
>>> -------------------------------
>>> type InputType = KafkaRecord[Array[Byte], Array[Byte]]
>>>
>>> //////////////////////////////////////
>>> ParsePayloadDoFn.scala
>>> -------------------------------
>>> class ParsePayloadDoFn[InputType](
>>>   inputAdaptor: RowAdaptor[InputType],
>>>   ...
>>>   deadLetterTag: TupleTag[KV[KeyType, ValueType]]) extends
>>> DoFn[InputType, OutpuType] {
>>>
>>>   @Setup
>>>   def setup(): Unit =
>>>     inputAdaptor.setup()
>>>
>>>   @ProcessElement
>>>   def processElement(c: DoFn[InputType, OutputType]#ProcessContext):
>>> Unit =
>>>     Try {
>>>       val result: Result = inputAdaptor.adapt(c.element()) // parse
>>> payload
>>>       ...
>>>       val deadLetterMessageFn: KV[KeyType, ValueType] => Unit =
>>> c.output(deadLetterTag, _)
>>>       val outputPayloadFn: OutputType => Unit = c.output
>>>
>>>       result.protocol match {
>>>         case error: ErrorType =>
>>>           deadLetterMessageFn(KV.of(..., ...))
>>>         case payload: Payload =>
>>>           payload.events.zipWithIndex.foreach {
>>>             case failure: ParsingFailure =>
>>>               deadLetterMessageFn(KV.of(..., ...))
>>>             case (message: Message, index: Int) =>
>>>               // extract body from Message
>>>               val body = ...
>>>               // make a GET http call and compose output
>>>               val output = ...
>>>
>>>               outputPayloadFn(
>>>                 OutputType(
>>>                   output,
>>>                   ...
>>>                   payload.header,
>>>                   body,
>>>                   index
>>>                 )
>>>               )
>>>           }
>>>       }
>>>     } match {
>>>       case Failure(exception) =>
>>>         error(
>>>           s"ParsePayloadDoFn - unhandled exception:
>>> ${exception.getMessage}\nStack trace:
>>> ${ExceptionUtils.getStackTrace(exception)}"
>>>         )
>>>       case Success(_) => ()
>>>     }
>>> }
>>> //////////////////////////////////////
>>>
>>> For reference, we are using Scio v0.11.5 and Beam v2.36.0.
>>>
>>> Thank you,
>>> Yuri Jin
>>>
>>>
>>> On Thu, May 5, 2022 at 10:36 PM Reuven Lax <[email protected]> wrote:
>>>
>>>> What is the type of the input - do you have a custom coder? Are you
>>>> able to paste the code for your DoFn?
>>>>
>>>> In answer to your question - Direct runner tests for this, because it
>>>> is a testing runner. This error scenario can cause random unexpected
>>>> behavior in production runners, which is why the testing runner tries to
>>>> explicitly detect it.
>>>>
>>>> Reuven
>>>>
>>>> On Thu, May 5, 2022 at 8:52 PM Yuri Jin <[email protected]> wrote:
>>>>
>>>>> Hi Beam users,
>>>>>
>>>>> We have a DoFn that reads data from Kafka and parses an array byte
>>>>> payload. It works fine with dataflow runner, but throws
>>>>> IllegalMutationException with direct runner. It does not directly modify
>>>>> the input value. Therefore, I am guessing that the output is different 
>>>>> when
>>>>> there are multiple input values.
>>>>>
>>>>> The detailed error is as follows.
>>>>>
>>>>> Exception in thread "main"
>>>>> org.apache.beam.sdk.util.IllegalMutationException: PTransform Parse 
>>>>> Payload
>>>>> mutated value "DoFnOutputA" after it was output (new value was
>>>>> "DoFnOutputB"). Values must not be mutated in any way after being output.
>>>>>         at
>>>>> org.apache.beam.runners.direct.ImmutabilityCheckingBundleFactory$ImmutabilityEnforcingBundle.commit(ImmutabilityCheckingBundleFactory.java:137)
>>>>>         at
>>>>> org.apache.beam.runners.direct.EvaluationContext.commitBundles(EvaluationContext.java:228)
>>>>>         at
>>>>> org.apache.beam.runners.direct.EvaluationContext.handleResult(EvaluationContext.java:160)
>>>>>         at
>>>>> org.apache.beam.runners.direct.QuiescenceDriver$TimerIterableCompletionCallback.handleResult(QuiescenceDriver.java:292)
>>>>>         at
>>>>> org.apache.beam.runners.direct.DirectTransformExecutor.finishBundle(DirectTransformExecutor.java:194)
>>>>>         at
>>>>> org.apache.beam.runners.direct.DirectTransformExecutor.run(DirectTransformExecutor.java:131)
>>>>>         at
>>>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>>>>>         at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>>>>         at
>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>>>>         at
>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>>>>         at java.lang.Thread.run(Thread.java:748)
>>>>> Caused by: org.apache.beam.sdk.util.IllegalMutationException: Value
>>>>> "DoFnOutputA" mutated illegally, new value was "DoFnOutputB". Encoding was
>>>>> "Base64EncodedA", now "Base64EncodedB".
>>>>>         at
>>>>> org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector.illegalMutation(MutationDetectors.java:158)
>>>>>         at
>>>>> org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector.verifyUnmodifiedThrowingCheckedExceptions(MutationDetectors.java:153)
>>>>>         at
>>>>> org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector.verifyUnmodified(MutationDetectors.java:128)
>>>>>         at
>>>>> org.apache.beam.runners.direct.ImmutabilityCheckingBundleFactory$ImmutabilityEnforcingBundle.commit(ImmutabilityCheckingBundleFactory.java:127)
>>>>>         ... 10 more
>>>>>
>>>>> * "DoFnOutputA" and "DoFnOutputB" are the same, but "Base64EncodedA"
>>>>> is different from "Base64EncodedB".
>>>>>
>>>>> I was wondering if you could give me some advice on the following
>>>>> questions.
>>>>>
>>>>> 1. How can we find the problematic part? I did some unit tests, but I
>>>>> couldn't reproduce them.
>>>>> 2. Have you experienced the same error and solved it?
>>>>> 3. Only Direct runner enforces immutability for DoFns. Is it safe to
>>>>> use the "enforceImmutability=false" option?
>>>>>
>>>>>
>>>>> Any comments would be appreciated.
>>>>>
>>>>> Thanks,
>>>>> Yuri Jin
>>>>>
>>>>
>>>
>>> --
>>> Yuri Jin
>>> Senior Software Developer, Data Platform
>>> [email protected]
>>> (+1) 778-858-3585 <(778)%20858-3585>
>>> unity.com
>>>
>>

-- 
Yuri Jin
Senior Software Developer, Data Platform
[email protected]
(+1) 778-858-3585
unity.com

Reply via email to