Pulling API Endpoints into Kafka Topics in Avro

2017-03-28 Thread Steve Champagne
I'm in the process of creating an ingest workflow that will pull into Kafka
topics a number of API endpoints on an hourly basis. I'd like convert them
from JSON to AVRO when I bring them in. I have, however, run into a few
problems that I haven't been able to figure out and haven't turned anything
up through searches. This seems like it would be a fairly common use case
of NiFi, so I figured I'd ask around to see what others are doing in these
cases.

The first problem that I'm running into is that some of the endpoints have
objects of the form:

{
  "metricsPerAgent": {
"6453": {
  "connectedEngagements": 3,
  "nonInteractiveTotalHandlingTime": 0
},
"6454": {
  "connectedEngagements": 1,
  "nonInteractiveTotalHandlingTime": 0
}
  }
}

I'm using an UpdateAttribute processor to add a schema that I get from
running the object through the InferAvroSchema processor and running the
flowfile into a ConvertJSONToAvro processor. There, unfortunately, I'm
getting an error with the ConvertJSONToAvro processor not liking the field
names being numbers. What do people normally do in cases like these?

Thanks!


Re: Pulling API Endpoints into Kafka Topics in Avro

2017-03-28 Thread Steve Champagne
Ah, that worked great! I hadn't known about the Avro map type. Thanks! 

On Tue, Mar 28, 2017 at 11:51 AM, James Wing <jvw...@gmail.com> wrote:

> Steve,
>
> The inferred schemas can be helpful to get you started, but I recommend
> providing your own Avro schema based on your knowledge of what should be
> guaranteed to downstream systems.  If you want to pass untyped data, you
> can't really beat JSON.  Avro schema isn't so bad, honest.
>
> As part of the numeric key issue, I think your snippet above suggests that
> the keys are not fixed in each sample?  It might be covered by using an
> Avro "map" type rather than a "record":
>
> {
> "type": "record",
> "name": "testrecord",
> "fields": [
> {
> "name": "metricsPerAgent",
> "type": {
> "type": "map",
> "values": {
> "type": "record",
> "name": "agentMetrics",
> "fields": [
> {
> "name": "connectedEngagements",
> "type": "long"
> },
>     {
> "name": "nonInteractiveTotalHandlingTime",
> "type": "long"
> }
> ]
> }
> }
> }
> ]
> }
>
> Thanks,
>
> James
>
>
>
> On Tue, Mar 28, 2017 at 7:24 AM, Steve Champagne <champa...@gmail.com>
> wrote:
>
>> I'm in the process of creating an ingest workflow that will pull into
>> Kafka topics a number of API endpoints on an hourly basis. I'd like convert
>> them from JSON to AVRO when I bring them in. I have, however, run into a
>> few problems that I haven't been able to figure out and haven't turned
>> anything up through searches. This seems like it would be a fairly common
>> use case of NiFi, so I figured I'd ask around to see what others are doing
>> in these cases.
>>
>> The first problem that I'm running into is that some of the endpoints
>> have objects of the form:
>>
>> {
>>   "metricsPerAgent": {
>> "6453": {
>>   "connectedEngagements": 3,
>>   "nonInteractiveTotalHandlingTime": 0
>> },
>> "6454": {
>>   "connectedEngagements": 1,
>>   "nonInteractiveTotalHandlingTime": 0
>> }
>>   }
>> }
>>
>> I'm using an UpdateAttribute processor to add a schema that I get from
>> running the object through the InferAvroSchema processor and running the
>> flowfile into a ConvertJSONToAvro processor. There, unfortunately, I'm
>> getting an error with the ConvertJSONToAvro processor not liking the field
>> names being numbers. What do people normally do in cases like these?
>>
>> Thanks!
>>
>
>


Merging Small Files

2017-08-06 Thread Steve Champagne
Hello,

I'm pulling data from API endpoints every five minutes and putting it into
HDFS. This, however, is giving me quite a few small files. 288 files per
day times however many endpoints I am reading. My current approach for
handling them is to load the small files into some sort of staging
directory under each of the endpoint directories. I then have list and
fetch HDFS processors pulling them back into NiFi so that I can merge them
based on size. This way I can keep the files in HDFS as they are waiting to
be merged so they can be queried at any time. When they get close to an
HDFS block size, I then merge them into an archive directory and delete the
small files that were merged.

My biggest problem with this is that I have to pull the files into NiFi
where they might sit for extended periods waiting to be merged. This causes
problems that I think are related to the problems brought up in
NIFI-3376 where my content repository continues to grow unbounded and fills
up my disk.

I was wondering what other patterns people are using for this sort of stuff.

Thanks!


HDFS Environments

2017-08-17 Thread Steve Champagne
Hello,

How would I handle environment separation in HDFS? My initial thought was
to use a directory structure like /data///,
but I'm running into problems with reading the files back out of HDFS (for
example merging small files into larger files). For the ListHDFS processor,
it doesn't allow input connections, so I can't specify the environment with
an attribute. Would something like this require me to use two instances of
NiFi and some sort of environment system variable lookup in EL? Is it even
common practice to encode the environment information in the directory
structure, or do people generally have an HDFS instance per environment
instead? Sorry if this question sort of extends outside of the scope of
NiFi.

Thanks!


Wrapping a JSON string

2018-08-27 Thread Steve Champagne
Hello,

I'm ingesting some JSON data that I'd like to wrap in a json_string field
as a string type. I tried using a JsonPathReader with a dynamic property
'json_string' and a value of $, but I seem to be getting back a string
version of the JSON:

{"partition_date":"2018-01-01T00:00:00.000Z","json_string":"{@timestamp=2018-01-01T00:00:00.000Z,
id=1, name=John}"}

I was wondering if there was a way that I could do this and preserve the
raw JSON format?

Thanks,
Steve


Re: Wrapping a JSON string

2018-09-04 Thread Steve Champagne
I was hoping to use the record processors otherwise I would definitely use
jolt. I was eventually able to get what I was after using a ScriptedReader.
I do like 'payload' better than 'json_string' though. :)

Thanks,
Steve

On Mon, Sep 3, 2018 at 3:09 AM DEHAY Aurelien 
wrote:

> Hello.
>
>
>
> You will have to remove the “ in the wrapped json, and process it like a
> standard string, so I would use the text processors instead of a json one.
>
>
>
> We did something different from our side: we use encodeJSON do encapsulate
> the json payload, encoded (“ => \” mainly) in a payload string:
>
> Extract the value with evaluatejsonpath with payload attribute as $
>
> Update attribute with payload = ${payload:escapeJson()}
>
> And a jolt processor with default operation like:
>
> [
>
>   {
>
> "operation": "shift",
>
> "spec": {
>
>   "PLANT": "DE_PLANT"
>
> }
>
>   },
>
>   {
>
> "operation": "default",
>
> "spec": {
>
>   "PAYLOAD": "${payload}"
>
> }
>
>   }
>
> ]
>
>
>
>
>
>
>
>
> *Aurélien DEHAY *Big Data Architect
> +33 616 815 441
>
> aurelien.de...@faurecia.com
>
> 2 rue Hennape - 92735 Nanterre Cedex – France
>
> [image: Faurecia_inspiring_mobility_logo-RVB_150]
>
>
>
> *From:* Steve Champagne [mailto:champa...@gmail.com]
> *Sent:* mardi 28 août 2018 00:53
> *To:* users@nifi.apache.org
> *Subject:* Wrapping a JSON string
>
>
>
> Hello,
>
>
>
> I'm ingesting some JSON data that I'd like to wrap in a json_string field
> as a string type. I tried using a JsonPathReader with a dynamic property
> 'json_string' and a value of $, but I seem to be getting back a string
> version of the JSON:
>
>
>
> {"partition_date":"2018-01-01T00:00:00.000Z","json_string":"{@timestamp=2018-01-01T00:00:00.000Z,
> id=1, name=John}"}
>
>
>
> I was wondering if there was a way that I could do this and preserve the
> raw JSON format?
>
>
>
> Thanks,
>
> Steve
>
> This electronic transmission (and any attachments thereto) is intended
> solely for the use of the addressee(s). It may contain confidential or
> legally privileged information. If you are not the intended recipient of
> this message, you must delete it immediately and notify the sender. Any
> unauthorized use or disclosure of this message is strictly prohibited.
> Faurecia does not guarantee the integrity of this transmission and shall
> therefore never be liable if the message is altered or falsified nor for
> any virus, interception or damage to your system.
>