Sounds like a bug. Can you please create a JIRA with instructions for reproducing the issue ?
Thanks, Cham On Thu, May 24, 2018 at 9:09 AM Edward Pricer <[email protected]> wrote: > Hi - hoping for a helping hand > > When trivially reading from an ActiveMQ queue, I eventually get a > java.lang.NoSuchMethodException: javax.jms.Message.<init>() exception. > > The queue is populated out-of-process rapidly with a text message. > > Exception sometimes appears immediately, sometimes not for some time. > Faster queue writes appears to exacerbate the problem. > > Beam 2.4.0, Java, DirectRunner > > Apparently the internals of DirectRunner are trying to clone a > JmsCheckpointMark by > encoding and decoding with a generic AvroCoder, but that's failing > because part of the payload doesn't have a default constructor (in > fact, it's trying to instantiate an interface). Do I need to be using > JmsIO differently, is > this a limitation of the DirectRunner, or is this actually a bug? > > Here's the test-case code. I don't think the publisher side is > relevant, but let me know if it is. > > Pipeline p = > Pipeline.create(PipelineOptionsFactory.fromArgs(args).withValidation().create()); > > // read from the queue > ConnectionFactory factory = new > ActiveMQConnectionFactory("tcp://localhost:61616"); > > PCollection<String> inputStrings = p.apply("Read from queue", > JmsIO.<String>readMessage() .withConnectionFactory(factory) > .withQueue("somequeue") .withCoder(StringUtf8Coder.of()) > .withMessageMapper((JmsIO.MessageMapper<String>) message -> > ((TextMessage) message).getText())); > > // decode PCollection<String> asStrings = inputStrings.apply("Decode > Message", ParDo.of(new DoFn<String, String>() { @ProcessElement public > void processElement(ProcessContext context) { > System.out.println(context.element()); > context.output(context.element()); } })); p.run(); > > > Full stack trace: > > Exception in thread "main" java.lang.RuntimeException: > java.lang.NoSuchMethodException: javax.jms.Message.<init>() > at org.apache.avro.specific.SpecificData.newInstance(SpecificData.java:353) > at org.apache.avro.specific.SpecificData.newRecord(SpecificData.java:369) > at org.apache.avro.reflect.ReflectData.newRecord(ReflectData.java:901) > at > org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:212) > at > org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175) > at > org.apache.avro.reflect.ReflectDatumReader.readCollection(ReflectDatumReader.java:219) > at > org.apache.avro.reflect.ReflectDatumReader.readArray(ReflectDatumReader.java:137) > at > org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:177) > at > org.apache.avro.reflect.ReflectDatumReader.readField(ReflectDatumReader.java:302) > at > org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222) > at > org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175) > at > org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153) > at > org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:145) > at org.apache.beam.sdk.coders.AvroCoder.decode(AvroCoder.java:318) > at org.apache.beam.sdk.coders.Coder.decode(Coder.java:170) > at > org.apache.beam.sdk.util.CoderUtils.decodeFromSafeStream(CoderUtils.java:122) > at > org.apache.beam.sdk.util.CoderUtils.decodeFromByteArray(CoderUtils.java:105) > at > org.apache.beam.sdk.util.CoderUtils.decodeFromByteArray(CoderUtils.java:99) > at org.apache.beam.sdk.util.CoderUtils.clone(CoderUtils.java:148) > at > org.apache.beam.runners.direct.UnboundedReadEvaluatorFactory$UnboundedReadEvaluator.getReader(UnboundedReadEvaluatorFactory.java:194) > at > org.apache.beam.runners.direct.UnboundedReadEvaluatorFactory$UnboundedReadEvaluator.processElement(UnboundedReadEvaluatorFactory.java:124) > at > org.apache.beam.runners.direct.DirectTransformExecutor.processElements(DirectTransformExecutor.java:161) > at > org.apache.beam.runners.direct.DirectTransformExecutor.run(DirectTransformExecutor.java:125) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.lang.NoSuchMethodException: javax.jms.Message.<init>() > at java.lang.Class.getConstructor0(Class.java:3082) > at java.lang.Class.getDeclaredConstructor(Class.java:2178) > at org.apache.avro.specific.SpecificData.newInstance(SpecificData.java:347) > > Thanks! >
