The

  Pipeline p = TestPipeline.create();
    p.getCoderRegistry().registerCoderForClass(KafkaRecord.class,
        KafkaRecordCoder.of(StringUtf8Coder.of(),StringUtf8Coder.of()));

approach works, I just forgot to generate fake headers (passed null to the
constructor). :)

On Wed, Dec 9, 2020 at 5:42 PM Kaymak, Tobias <[email protected]>
wrote:

> According to the documentation [0] the Create.of() works only for
> "Standard" types, but shouldn't it in theory also work for non-standard
> types when the Coder is specified?
>
> I want to test a DoFn that receives KafkaRecord<String, String> as an
> input:
>
>    KafkaRecord input = new KafkaRecord<String, String>(topic, partition,
> offset, timestamp,
>         kafkaTimestampType, null, kv);
>    KafkaRecordCoder kafkaRecordCoder =
>         KafkaRecordCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of());
>     PCollection<KafkaRecord<String, String>> records =
>         p.apply(
>             Create.of(input).withCoder(kafkaRecordCoder));
>
> But that fails with
>
> java.lang.IllegalArgumentException: Unable to infer a coder and no Coder
> was specified. Please set a coder by invoking Create.withCoder() explicitly
>  or a schema by invoking Create.withSchema().
>
> [..]
> Caused by: org.apache.beam.sdk.coders.CannotProvideCoderException: Unable
> to provide a Coder for org.apache.beam.sdk.io.kafka.KafkaRecord.
>   Building a Coder using a registered CoderProvider failed.
>
> However, when I register a CoderProvider for that TestPipeline object:
>
>     Pipeline p = TestPipeline.create();
>     p.getCoderRegistry().registerCoderForClass(KafkaRecord.class,
>         KafkaRecordCoder.of(StringUtf8Coder.of(),StringUtf8Coder.of()));
>
> I get the following NPE:
>
> java.lang.NullPointerException
>  at
> org.apache.beam.sdk.io.kafka.KafkaRecordCoder.toIterable(KafkaRecordCoder.java:98)
>  at
> org.apache.beam.sdk.io.kafka.KafkaRecordCoder.encode(KafkaRecordCoder.java:65)
>  at
> org.apache.beam.sdk.io.kafka.KafkaRecordCoder.encode(KafkaRecordCoder.java:40)
>  at org.apache.beam.sdk.coders.Coder.encode(Coder.java:136)
>  at
> org.apache.beam.sdk.util.CoderUtils.encodeToSafeStream(CoderUtils.java:82)
>  at
> org.apache.beam.sdk.util.CoderUtils.encodeToByteArray(CoderUtils.java:66)
>  at
> org.apache.beam.sdk.util.CoderUtils.encodeToByteArray(CoderUtils.java:51)
>  at
> org.apache.beam.sdk.transforms.Create$Values$CreateSource.fromIterable(Create.java:408)
>  at org.apache.beam.sdk.transforms.Create$Values.expand(Create.java:365)
>  at org.apache.beam.sdk.transforms.Create$Values.expand(Create.java:272)
>  at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:542)
>  at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:476)
>  at org.apache.beam.sdk.values.PBegin.apply(PBegin.java:44)
>  at org.apache.beam.sdk.Pipeline.apply(Pipeline.java:171)
>  (...)
>
> And when I try to set the Coder like:
>
>      p.apply(
>             Create.of(input).withCoder(kafkaRecordCoder));
>
> My IDE says:
> java: incompatible types: org.apache.beam.sdk.values.POutput cannot be
> converted to
> org.apache.beam.sdk.values.PCollection<org.apache.beam.sdk.io.kafka.KafkaRecord<java.lang.String,java.lang.String>>
>
> What am I missing?
>
> [0] https://beam.apache.org/documentation/pipelines/test-your-pipeline/
>

Reply via email to