Reuven, thanks for the reply.
The input type is "KafkaRecord[Array[Byte], Array[Byte]]" and uses the
"KafkaRecordCoder.of(NullableCoder.of(ByteArrayCoder.of),
ByteArrayCoder.of)" coder.
I can't paste the code for DoFn due to company policy, but here's the
structure:
//////////////////////////////////////
Pipeline.scala
-------------------------------
type InputType = KafkaRecord[Array[Byte], Array[Byte]]
//////////////////////////////////////
ParsePayloadDoFn.scala
-------------------------------
class ParsePayloadDoFn[InputType](
inputAdaptor: RowAdaptor[InputType],
...
deadLetterTag: TupleTag[KV[KeyType, ValueType]]) extends DoFn[InputType,
OutpuType] {
@Setup
def setup(): Unit =
inputAdaptor.setup()
@ProcessElement
def processElement(c: DoFn[InputType, OutputType]#ProcessContext): Unit =
Try {
val result: Result = inputAdaptor.adapt(c.element()) // parse payload
...
val deadLetterMessageFn: KV[KeyType, ValueType] => Unit =
c.output(deadLetterTag, _)
val outputPayloadFn: OutputType => Unit = c.output
result.protocol match {
case error: ErrorType =>
deadLetterMessageFn(KV.of(..., ...))
case payload: Payload =>
payload.events.zipWithIndex.foreach {
case failure: ParsingFailure =>
deadLetterMessageFn(KV.of(..., ...))
case (message: Message, index: Int) =>
// extract body from Message
val body = ...
// make a GET http call and compose output
val output = ...
outputPayloadFn(
OutputType(
output,
...
payload.header,
body,
index
)
)
}
}
} match {
case Failure(exception) =>
error(
s"ParsePayloadDoFn - unhandled exception:
${exception.getMessage}\nStack trace:
${ExceptionUtils.getStackTrace(exception)}"
)
case Success(_) => ()
}
}
//////////////////////////////////////
For reference, we are using Scio v0.11.5 and Beam v2.36.0.
Thank you,
Yuri Jin
On Thu, May 5, 2022 at 10:36 PM Reuven Lax <[email protected]> wrote:
> What is the type of the input - do you have a custom coder? Are you able
> to paste the code for your DoFn?
>
> In answer to your question - Direct runner tests for this, because it is a
> testing runner. This error scenario can cause random unexpected behavior in
> production runners, which is why the testing runner tries to explicitly
> detect it.
>
> Reuven
>
> On Thu, May 5, 2022 at 8:52 PM Yuri Jin <[email protected]> wrote:
>
>> Hi Beam users,
>>
>> We have a DoFn that reads data from Kafka and parses an array byte
>> payload. It works fine with dataflow runner, but throws
>> IllegalMutationException with direct runner. It does not directly modify
>> the input value. Therefore, I am guessing that the output is different when
>> there are multiple input values.
>>
>> The detailed error is as follows.
>>
>> Exception in thread "main"
>> org.apache.beam.sdk.util.IllegalMutationException: PTransform Parse Payload
>> mutated value "DoFnOutputA" after it was output (new value was
>> "DoFnOutputB"). Values must not be mutated in any way after being output.
>> at
>> org.apache.beam.runners.direct.ImmutabilityCheckingBundleFactory$ImmutabilityEnforcingBundle.commit(ImmutabilityCheckingBundleFactory.java:137)
>> at
>> org.apache.beam.runners.direct.EvaluationContext.commitBundles(EvaluationContext.java:228)
>> at
>> org.apache.beam.runners.direct.EvaluationContext.handleResult(EvaluationContext.java:160)
>> at
>> org.apache.beam.runners.direct.QuiescenceDriver$TimerIterableCompletionCallback.handleResult(QuiescenceDriver.java:292)
>> at
>> org.apache.beam.runners.direct.DirectTransformExecutor.finishBundle(DirectTransformExecutor.java:194)
>> at
>> org.apache.beam.runners.direct.DirectTransformExecutor.run(DirectTransformExecutor.java:131)
>> at
>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>> at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>> at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>> at java.lang.Thread.run(Thread.java:748)
>> Caused by: org.apache.beam.sdk.util.IllegalMutationException: Value
>> "DoFnOutputA" mutated illegally, new value was "DoFnOutputB". Encoding was
>> "Base64EncodedA", now "Base64EncodedB".
>> at
>> org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector.illegalMutation(MutationDetectors.java:158)
>> at
>> org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector.verifyUnmodifiedThrowingCheckedExceptions(MutationDetectors.java:153)
>> at
>> org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector.verifyUnmodified(MutationDetectors.java:128)
>> at
>> org.apache.beam.runners.direct.ImmutabilityCheckingBundleFactory$ImmutabilityEnforcingBundle.commit(ImmutabilityCheckingBundleFactory.java:127)
>> ... 10 more
>>
>> * "DoFnOutputA" and "DoFnOutputB" are the same, but "Base64EncodedA" is
>> different from "Base64EncodedB".
>>
>> I was wondering if you could give me some advice on the following
>> questions.
>>
>> 1. How can we find the problematic part? I did some unit tests, but I
>> couldn't reproduce them.
>> 2. Have you experienced the same error and solved it?
>> 3. Only Direct runner enforces immutability for DoFns. Is it safe to use
>> the "enforceImmutability=false" option?
>>
>>
>> Any comments would be appreciated.
>>
>> Thanks,
>> Yuri Jin
>>
>
--
Yuri Jin
Senior Software Developer, Data Platform
[email protected]
(+1) 778-858-3585
unity.com