Pulling API Endpoints into Kafka Topics in Avro
I'm in the process of creating an ingest workflow that will pull into Kafka topics a number of API endpoints on an hourly basis. I'd like convert them from JSON to AVRO when I bring them in. I have, however, run into a few problems that I haven't been able to figure out and haven't turned anything up through searches. This seems like it would be a fairly common use case of NiFi, so I figured I'd ask around to see what others are doing in these cases. The first problem that I'm running into is that some of the endpoints have objects of the form: { "metricsPerAgent": { "6453": { "connectedEngagements": 3, "nonInteractiveTotalHandlingTime": 0 }, "6454": { "connectedEngagements": 1, "nonInteractiveTotalHandlingTime": 0 } } } I'm using an UpdateAttribute processor to add a schema that I get from running the object through the InferAvroSchema processor and running the flowfile into a ConvertJSONToAvro processor. There, unfortunately, I'm getting an error with the ConvertJSONToAvro processor not liking the field names being numbers. What do people normally do in cases like these? Thanks!
Re: Pulling API Endpoints into Kafka Topics in Avro
Ah, that worked great! I hadn't known about the Avro map type. Thanks! On Tue, Mar 28, 2017 at 11:51 AM, James Wing <jvw...@gmail.com> wrote: > Steve, > > The inferred schemas can be helpful to get you started, but I recommend > providing your own Avro schema based on your knowledge of what should be > guaranteed to downstream systems. If you want to pass untyped data, you > can't really beat JSON. Avro schema isn't so bad, honest. > > As part of the numeric key issue, I think your snippet above suggests that > the keys are not fixed in each sample? It might be covered by using an > Avro "map" type rather than a "record": > > { > "type": "record", > "name": "testrecord", > "fields": [ > { > "name": "metricsPerAgent", > "type": { > "type": "map", > "values": { > "type": "record", > "name": "agentMetrics", > "fields": [ > { > "name": "connectedEngagements", > "type": "long" > }, > { > "name": "nonInteractiveTotalHandlingTime", > "type": "long" > } > ] > } > } > } > ] > } > > Thanks, > > James > > > > On Tue, Mar 28, 2017 at 7:24 AM, Steve Champagne <champa...@gmail.com> > wrote: > >> I'm in the process of creating an ingest workflow that will pull into >> Kafka topics a number of API endpoints on an hourly basis. I'd like convert >> them from JSON to AVRO when I bring them in. I have, however, run into a >> few problems that I haven't been able to figure out and haven't turned >> anything up through searches. This seems like it would be a fairly common >> use case of NiFi, so I figured I'd ask around to see what others are doing >> in these cases. >> >> The first problem that I'm running into is that some of the endpoints >> have objects of the form: >> >> { >> "metricsPerAgent": { >> "6453": { >> "connectedEngagements": 3, >> "nonInteractiveTotalHandlingTime": 0 >> }, >> "6454": { >> "connectedEngagements": 1, >> "nonInteractiveTotalHandlingTime": 0 >> } >> } >> } >> >> I'm using an UpdateAttribute processor to add a schema that I get from >> running the object through the InferAvroSchema processor and running the >> flowfile into a ConvertJSONToAvro processor. There, unfortunately, I'm >> getting an error with the ConvertJSONToAvro processor not liking the field >> names being numbers. What do people normally do in cases like these? >> >> Thanks! >> > >
Merging Small Files
Hello, I'm pulling data from API endpoints every five minutes and putting it into HDFS. This, however, is giving me quite a few small files. 288 files per day times however many endpoints I am reading. My current approach for handling them is to load the small files into some sort of staging directory under each of the endpoint directories. I then have list and fetch HDFS processors pulling them back into NiFi so that I can merge them based on size. This way I can keep the files in HDFS as they are waiting to be merged so they can be queried at any time. When they get close to an HDFS block size, I then merge them into an archive directory and delete the small files that were merged. My biggest problem with this is that I have to pull the files into NiFi where they might sit for extended periods waiting to be merged. This causes problems that I think are related to the problems brought up in NIFI-3376 where my content repository continues to grow unbounded and fills up my disk. I was wondering what other patterns people are using for this sort of stuff. Thanks!
HDFS Environments
Hello, How would I handle environment separation in HDFS? My initial thought was to use a directory structure like /data///, but I'm running into problems with reading the files back out of HDFS (for example merging small files into larger files). For the ListHDFS processor, it doesn't allow input connections, so I can't specify the environment with an attribute. Would something like this require me to use two instances of NiFi and some sort of environment system variable lookup in EL? Is it even common practice to encode the environment information in the directory structure, or do people generally have an HDFS instance per environment instead? Sorry if this question sort of extends outside of the scope of NiFi. Thanks!
Wrapping a JSON string
Hello, I'm ingesting some JSON data that I'd like to wrap in a json_string field as a string type. I tried using a JsonPathReader with a dynamic property 'json_string' and a value of $, but I seem to be getting back a string version of the JSON: {"partition_date":"2018-01-01T00:00:00.000Z","json_string":"{@timestamp=2018-01-01T00:00:00.000Z, id=1, name=John}"} I was wondering if there was a way that I could do this and preserve the raw JSON format? Thanks, Steve
Re: Wrapping a JSON string
I was hoping to use the record processors otherwise I would definitely use jolt. I was eventually able to get what I was after using a ScriptedReader. I do like 'payload' better than 'json_string' though. :) Thanks, Steve On Mon, Sep 3, 2018 at 3:09 AM DEHAY Aurelien wrote: > Hello. > > > > You will have to remove the “ in the wrapped json, and process it like a > standard string, so I would use the text processors instead of a json one. > > > > We did something different from our side: we use encodeJSON do encapsulate > the json payload, encoded (“ => \” mainly) in a payload string: > > Extract the value with evaluatejsonpath with payload attribute as $ > > Update attribute with payload = ${payload:escapeJson()} > > And a jolt processor with default operation like: > > [ > > { > > "operation": "shift", > > "spec": { > > "PLANT": "DE_PLANT" > > } > > }, > > { > > "operation": "default", > > "spec": { > > "PAYLOAD": "${payload}" > > } > > } > > ] > > > > > > > > > *Aurélien DEHAY *Big Data Architect > +33 616 815 441 > > aurelien.de...@faurecia.com > > 2 rue Hennape - 92735 Nanterre Cedex – France > > [image: Faurecia_inspiring_mobility_logo-RVB_150] > > > > *From:* Steve Champagne [mailto:champa...@gmail.com] > *Sent:* mardi 28 août 2018 00:53 > *To:* users@nifi.apache.org > *Subject:* Wrapping a JSON string > > > > Hello, > > > > I'm ingesting some JSON data that I'd like to wrap in a json_string field > as a string type. I tried using a JsonPathReader with a dynamic property > 'json_string' and a value of $, but I seem to be getting back a string > version of the JSON: > > > > {"partition_date":"2018-01-01T00:00:00.000Z","json_string":"{@timestamp=2018-01-01T00:00:00.000Z, > id=1, name=John}"} > > > > I was wondering if there was a way that I could do this and preserve the > raw JSON format? > > > > Thanks, > > Steve > > This electronic transmission (and any attachments thereto) is intended > solely for the use of the addressee(s). It may contain confidential or > legally privileged information. If you are not the intended recipient of > this message, you must delete it immediately and notify the sender. Any > unauthorized use or disclosure of this message is strictly prohibited. > Faurecia does not guarantee the integrity of this transmission and shall > therefore never be liable if the message is altered or falsified nor for > any virus, interception or damage to your system. >