Hello,

You don’t need to use a properties file. In your AvroRecordReader, just
change the Schema Name property from ${schema.name} to the actual name of
the schema you want to use from the schema registry.

It just means that the record reader can only be used with one schema now,
rather than trying to dynamically determine the schema from a flow file
attribute, which it can’t do in this case because there is no incoming flow
file to ConsumeKafkaRecord.

-Bryan


On Thu, Mar 22, 2018 at 2:41 AM Colin Williams <
colin.williams.seat...@gmail.com> wrote:

> Hi Joe,
>
> Thanks for the suggestion. I started by using the ConsumeKafkaRecord0_10.
> But I had read the only way to configure the schema.name was via a
> properties file, which I read also required a restart of NiFi.
> http://apache-nifi-users-list.2361937.n4.nabble.com/Nifi-1-3-0-Problem-with-schema-name-and-ConsumeKafkaRecord-0-10-processor-td2256.html
> That's why I moved away from using ConsumeKafkaRecord and to the regular
> consumer. I didn't want to create a properties file and couldn't see how to
> set schema.name otherwise.
>
> Regarding the error information. I saw the error displayed at the
> ConsumeKafka processor, from the UI. I will look for log files, and if
> necessary configuring logging after configuring ConsumeKafkaRecord via
> properties file tomorrow.
>
> Best,
>
> Colin Williams
>
> On Wed, Mar 21, 2018 at 6:45 PM, Joe Witt <joe.w...@gmail.com> wrote:
>
>> Colin,
>>
>> You're using the ConsumeKafka processors.  Given that this is avro
>> data for which you have a schema/etc.. I strongly recommend you use
>> ConsumeKafkaRecord0_10...
>>
>> In that you get to specify the record reader/writer you'll need.  You
>> will also see dramatically higher performance.
>>
>> Lets get you reliably reading records from kafka and then move on to
>> other details such as LookupRecord/etc..
>>
>> We'll need to see the actual error information you're getting I
>> suspect to be of much help.
>>
>> Thanks
>>
>>
>>
>> On Wed, Mar 21, 2018 at 9:33 PM, Colin Williams
>> <colin.williams.seat...@gmail.com> wrote:
>> > Hi Joe,
>> >
>> > I don't believe the Avro schema included, and expect they are the data
>> > portion... I think that's why I need to use the avsc file mentioned
>> above...
>> >
>> > On Wed, Mar 21, 2018 at 6:19 PM, Joe Witt <joe.w...@gmail.com> wrote:
>> >>
>> >> Can you share a template of your process group?
>> >>
>> >> Do the messages in Kafka have the Avro schema included in them or are
>> >> they just the data portion of the record?
>> >>
>> >> On Wed, Mar 21, 2018 at 9:16 PM, Colin Williams
>> >> <colin.williams.seat...@gmail.com> wrote:
>> >> > I have an avro avsc file for a table with a definition like:
>> >> >
>> >> >
>> >> >
>> {"type":"record","name":"INV_ADJ","namespace":"NSP_SCH","fields":[{"name":"table","type":"string"},{"name":"op_type","type":"string"},{"name":"op_ts","type":"string"},{"name":"current_ts","type":"string"},{"name":"pos","type":"string"},{"name":"primary_keys","type":{"type":"array","items":"string"}},{"name":"tokens","type":{"type":"map","values":"string"},"default":{}},{"name":"before","type":["null",{"type":"record","name":"columns","fields":[{"name":"ITEM","type":["null","string"],"default":null},{"name":"ITEM_isMissing","type":"boolean"},{"name":"INV_STATUS","type":["null","long"],"default":null},{"name":"INV_STATUS_isMissing","type":"boolean"},{"name":"LOC_TYPE","type":["null","string"],"default":null},{"name":"LOC_TYPE_isMissing","type":"boolean"},{"name":"LOCATION","type":["null","long"],"default":null},{"name":"LOCATION_isMissing","type":"boolean"},{"name":"ADJ_QTY","type":["null","double"],"default":null},{"name":"ADJ_QTY_isMissing","type":"boolean"},{"name":"REASON","type":["null","long"],"default":null},{"name":"REASON_isMissing","type":"boolean"},{"name":"ADJ_DATE","type":["null","string"],"default":null},{"name":"ADJ_DATE_isMissing","type":"boolean"},{"name":"PREV_QTY","type":["null","double"],"default":null},{"name":"PREV_QTY_isMissing","type":"boolean"},{"name":"USER_ID","type":["null","string"],"default":null},{"name":"USER_ID_isMissing","type":"boolean"},{"name":"ADJ_WEIGHT","type":["null","double"],"default":null},{"name":"ADJ_WEIGHT_isMissing","type":"boolean"},{"name":"ADJ_WEIGHT_UOM","type":["null","string"],"default":null},{"name":"ADJ_WEIGHT_UOM_isMissing","type":"boolean"},{"name":"CREATE_ID","type":["null","string"],"default":null},{"name":"CREATE_ID_isMissing","type":"boolean"},{"name":"CREATE_DATETIME","type":["null","string"],"default":null},{"name":"CREATE_DATETIME_isMissing","type":"boolean"}]}],"default":null},{"name":"after","type":["null","columns"],"default":null}]}
>> >> >
>> >> > I have a kafka topic which should contain avro records using the
>> above
>> >> > definition.
>> >> >
>> >> > I've configured the avro registry, reader, and writer with the the
>> above
>> >> > definition. When I try using my nifi workflow I get exceptions like:
>> >> > invalid
>> >> > int encoding and don't seem to process any data.
>> >> >
>> >> > What am I doing wrong?
>> >
>> >
>>
>
> --
Sent from Gmail Mobile

Reply via email to