Hello,

What is the Schema Access Strategy set to in your AvroRecordSetWriter?

Since you said you have an embedded schema, I would expect your
reader's Schema Access Strategy to be set to "Embedded" and then the
writer would be set to "Inherit from Reader" or "Embedded". Maybe you
have something different selected?

The error is because whatever schema the writer is using has a field
named "p_AUX" which is not valid according to Avro's schema.

You can see in the stracktrace the code comes from Avro's schema class
in a method called validateName which has this code:

if (!(Character.isLetterOrDigit(c) || c == '_'))
  throw new SchemaParseException("Illegal character in: "+name);

Thanks,

Bryan

On Mon, May 7, 2018 at 3:37 AM, Agarwal, Kirti (MIND)
<[email protected]> wrote:
> Hi,
>
>
>
> we have produced some data in kafka in Avro format with embedded schema.
>
>
>
> We are able to consume this data using the storm topology code at our end.
> Also, tested the same data with Streamsets and it is parsed successfully.
>
>
>
> But, when we tried to read same data through “ConsumeKafkaRecord_0_10” in
> Apache NiFi, it is giving an error as below:
>
>
>
> Failed to properly receive messages from Kafka. Will roll back session and
> any un-committed offsets from Kafka.:
> org.apache.nifi.processor.exception.ProcessException: Could not determine
> the Avro Schema to use for writing the content
>
>
>
> Please help, and let me know in case of more details required.
>
>
>
> Complete Stack Trace is as below:
>
> 2018-05-04 07:11:19,796 ERROR [Timer-Driven Process Thread-1]
> o.a.n.p.k.pubsub.ConsumeKafkaRecord_0_10
> ConsumeKafkaRecord_0_10[id=a68b13d0-1001-1163-c2e2-c460278f7145] Failed to
> properly receive messages from Kafka. Will roll back session and any
> un-committed offsets from Kafka.:
> org.apache.nifi.processor.exception.ProcessException: Could not determine
> the Avro Schema to use for writing the content
>
> org.apache.nifi.processor.exception.ProcessException: Could not determine
> the Avro Schema to use for writing the content
>
> at
> org.apache.nifi.avro.AvroRecordSetWriter.createWriter(AvroRecordSetWriter.java:109)
>
> at sun.reflect.GeneratedMethodAccessor436.invoke(Unknown Source)
>
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>
> at java.lang.reflect.Method.invoke(Method.java:498)
>
> at
> org.apache.nifi.controller.service.StandardControllerServiceInvocationHandler.invoke(StandardControllerServiceInvocationHandler.java:89)
>
> at com.sun.proxy.$Proxy164.createWriter(Unknown Source)
>
> at
> org.apache.nifi.processors.kafka.pubsub.ConsumerLease.writeRecordData(ConsumerLease.java:510)
>
> at
> org.apache.nifi.processors.kafka.pubsub.ConsumerLease.lambda$processRecords$2(ConsumerLease.java:322)
>
> at java.util.HashMap$KeySpliterator.forEachRemaining(HashMap.java:1553)
>
> at
> java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:580)
>
> at
> org.apache.nifi.processors.kafka.pubsub.ConsumerLease.processRecords(ConsumerLease.java:309)
>
> at
> org.apache.nifi.processors.kafka.pubsub.ConsumerLease.poll(ConsumerLease.java:170)
>
> at
> org.apache.nifi.processors.kafka.pubsub.ConsumeKafkaRecord_0_10.onTrigger(ConsumeKafkaRecord_0_10.java:328)
>
> at
> org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27)
>
> at
> org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1147)
>
> at
> org.apache.nifi.controller.tasks.ConnectableTask.invoke(ConnectableTask.java:175)
>
> at
> org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:117)
>
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>
> at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
>
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
>
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
>
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>
> at java.lang.Thread.run(Thread.java:748)
>
> Caused by: org.apache.nifi.schema.access.SchemaNotFoundException: Failed to
> compile Avro Schema
>
> at
> org.apache.nifi.avro.AvroRecordSetWriter.createWriter(AvroRecordSetWriter.java:100)
>
> ... 23 common frames omitted
>
> Caused by: org.apache.avro.SchemaParseException: Illegal character in: p_AUX
> Code
>
> at org.apache.avro.Schema.validateName(Schema.java:1151)
>
> at org.apache.avro.Schema.access$200(Schema.java:81)
>
> at org.apache.avro.Schema$Field.<init>(Schema.java:403)
>
> at org.apache.avro.Schema.parse(Schema.java:1281)
>
> at org.apache.avro.Schema$Parser.parse(Schema.java:1032)
>
> at org.apache.avro.Schema$Parser.parse(Schema.java:1020)
>
> at
> org.apache.nifi.avro.AvroRecordSetWriter.compileAvroSchema(AvroRecordSetWriter.java:131)
>
> at
> org.apache.nifi.avro.AvroRecordSetWriter.createWriter(AvroRecordSetWriter.java:92)
>
> ... 23 common frames omitted
>
>
>
>
>
> Regards,
>
> Kirti Agarwal
>
>
>
>
> ________________________________
>
> The information contained in this electronic message and any attachments to
> this message are intended for the exclusive use of the addressee(s) and may
> contain proprietary, confidential or privileged information. If you are not
> the intended recipient, you should not disseminate, distribute or copy this
> e-mail. Please notify the sender immediately and destroy all copies of this
> message and any attachments. WARNING: Computer viruses can be transmitted
> via email. The recipient should check this email and any attachments for the
> presence of viruses. The company accepts no liability for any damage caused
> by any virus/trojan/worms/malicious code transmitted by this email.
> www.motherson.com

Reply via email to