Lauren,

For debugging and tracking purposes, you might want to look at the fragment
attributes created by SplitJson instead of generating a UUID to add to each
split.

The "fragment.identifier" contains a unique UUID shared among all
fragments, "fragment.index" indicates the index of current split within the
set, and "fragment.count" give the total number of splits created by
SplitJson.

Regards,
Joe S

On Mon, Jul 24, 2017 at 11:58 AM, Laurens Vets <[email protected]> wrote:

> Hi Jeff,
>
> Did you by any chance found your flow? I want to compare yours against
> mine if possible.
>
> Basically, what I came up with is:
>
> 1. ListS3.
>
> 2. RouteOnAttribute. Route only when the filename contains "_CloudTrail_".
> I don't care about the digests (I think).
>
> 3. FetchS3Object.
>
> 4. UpdateAttribute: Remove S3 Path From Filename. I only care about the
> actual filename, not the included S3 path.
>
> 5. Gunzip the object.
>
> 6. SplitJson.
>
> 7. UpdateAttribute to remove the .json extension.
>
> 8. UpdateAttribute to contruct a unique filename. This adds a UUID to the
> filename, otherwise, a bunch of the split json files have the same name.
>
> 9. PublishKafka.
>
> Side node: I don't think the editing of the filename is all that
> important, it just looked clean :)
>
> I think I have a working Cloudtrail flow on my other computer...  I'll try
> to fire that up today and see what I get.  I used 1.3.0 the last time I
> looked at Cloudtrail data.
>
> On Thu, Jul 20, 2017 at 4:56 PM Laurens Vets <[email protected]> wrote:
>
>> Please see inline for my answers and some additional information.
>>
>> > It sounds like you are doing the right troubleshooting steps.  A few
>> > more ideas off the top of my head:
>> >
>> > * When you tested with the s3 cli, did you use the same credentials,
>> > from the same machine NiFi is running on?  The CloudTrail events are
>> > written by AWS, so the ownership and permissions might be tricky.
>>
>> Same credentials, not the same machine.
>>
>> > * As an experiment, try creating one or more new directory/objects as
>> > the NiFi user and configuring ListS3's prefix to target only these new
>> > objects (you might want to copy/paste ListS3 or be sure to wipe out the
>> > state later).
>>
>> I'll try this as well.
>>
>> > * You are sure the prefix is blank?  You might try setting it to
>> > "AWSLogs/" for a while to see if it's different.
>>
>> Tried with a blank prefix, with "/" and "AWSLogs" now, no change. Or
>> should I wait a while first?
>> If I set the prefix to a directory containing actual log objects
>> (*.json.gz files), ListS3 is able to list them almost immediately. The
>> prefix used is "AWSLogs/<aws_id>/CloudTrail/ap-northeast-1/2017/07/03/"
>> in this case.
>> It sems ListS3 doesn't recurse?
>>
>> > * Do you have CloudTrail set up to record S3 data events, or can you
>> > set this up?  This is usually very tedious, but sometimes there is no
>> > substitute.
>>
>> I'll doublecheck. I believe I set this up.
>>
>> Kind regards,
>> Laurens
>>
>> > On Thu, Jul 20, 2017 at 11:56 AM, Joe Witt <[email protected]> wrote:
>> >
>> >> Looking at the code it suggests the two cases where it would come up
>> >> with nothing for listing (when there are items to list) is if there is
>> >> state already tracking lastModified of a previously pulled object or
>> >> previously pulled object with the same key.  Since you're not even
>> >> getting to the point where state is being persisted it suggests it
>> >> really is getting nothing back on the listing request.
>> >>
>> >> Just in looking at the docs I wonder if you'll need to explicitly set
>> >> the prefix value to something like '/'?
>> >>
>> >> JeffStorck/JamesWing: Any ideas?
>> >>
>> >> We should update the code to provide debug information when listed
>> >> objects are skipped.
>> >>
>> >> Thanks
>> >> Joe
>> >>
>> >> On Thu, Jul 20, 2017 at 2:44 PM, Laurens Vets <[email protected]>
>> >> wrote:
>> >>> I enabled DEBUG logging and I see the following:
>> >>>
>> >>>
>> >>> 2017-07-20 11:39:08,670 DEBUG [StandardProcessScheduler Thread-1]
>> >>> org.apache.nifi.processors.aws.s3.ListS3
>> >>> ListS3[id=6119854d-015d-1000-341f-b294838980af] Using aws credentials
>> >>> for
>> >>> creating client
>> >>> 2017-07-20 11:39:08,670 INFO [StandardProcessScheduler Thread-1]
>> >>> org.apache.nifi.processors.aws.s3.ListS3
>> >>> ListS3[id=6119854d-015d-1000-341f-b294838980af] Creating client with
>> >>> AWS
>> >>> credentials
>> >>> 2017-07-20 11:39:08,672 INFO [StandardProcessScheduler Thread-1]
>> >>> o.a.n.c.s.TimerDrivenSchedulingAgent Scheduled
>> >>> ListS3[id=6119854d-015d-1000-341f-b294838980af] to run with 1 threads
>> >>> 2017-07-20 11:39:08,674 DEBUG [Timer-Driven Process Thread-4]
>> >>> org.apache.nifi.processors.aws.s3.ListS3
>> >>> ListS3[id=6119854d-015d-1000-341f-b294838980af] Returning CLUSTER
>> >>> State:
>> >>> StandardStateMap[version=-1, values={}]
>> >>> 2017-07-20 11:39:09,089 INFO [Flow Service Tasks Thread-2]
>> >>> o.a.nifi.controller.StandardFlowService Saved flow controller
>> >>> org.apache.nifi.controller.FlowController@7c10f421 // Another save
>> >>> pending =
>> >>> false
>> >>> 2017-07-20 11:39:09,249 INFO [Timer-Driven Process Thread-4]
>> >>> org.apache.nifi.processors.aws.s3.ListS3
>> >>> ListS3[id=6119854d-015d-1000-341f-b294838980af] Successfully listed
>> >>> S3
>> >>> bucket BUCKETNAME in 575 millis
>> >>> 2017-07-20 11:39:09,249 DEBUG [Timer-Driven Process Thread-4]
>> >>> org.apache.nifi.processors.aws.s3.ListS3
>> >>> ListS3[id=6119854d-015d-1000-341f-b294838980af] No new objects in S3
>> >>> bucket
>> >>> BUCKETNAME to list. Yielding.
>> >>> 2017-07-20 11:39:09,249 DEBUG [Timer-Driven Process Thread-4]
>> >>> org.apache.nifi.processors.aws.s3.ListS3
>> >>> ListS3[id=6119854d-015d-1000-341f-b294838980af] has chosen to yield
>> >>> its
>> >>> resources; will not be scheduled to run again for 1000 milliseconds
>> >>> 2017-07-20 11:39:10,246 INFO [Write-Ahead Local State Provider
>> >>> Maintenance]
>> >>> org.wali.MinimalLockingWriteAheadLog
>> >>> org.wali.MinimalLockingWriteAheadLog@2480acc3 checkpointed with 0
>> >>> Records
>> >>> and 0 Swap Files in 9 milliseconds (Stop-the-world time = 1
>> >>> milliseconds,
>> >>> Clear Edit Logs time = 0 millis), max Transaction ID -1
>> >>> 2017-07-20 11:39:10,250 DEBUG [Timer-Driven Process Thread-4]
>> >>> org.apache.nifi.processors.aws.s3.ListS3
>> >>> ListS3[id=6119854d-015d-1000-341f-b294838980af] Returning CLUSTER
>> >>> State:
>> >>> StandardStateMap[version=-1, values={}]
>> >>> 2017-07-20 11:39:10,288 INFO [Timer-Driven Process Thread-4]
>> >>> org.apache.nifi.processors.aws.s3.ListS3
>> >>> ListS3[id=6119854d-015d-1000-341f-b294838980af] Successfully listed
>> >>> S3
>> >>> bucket BUCKETNAME in 37 millis
>> >>> 2017-07-20 11:39:10,288 DEBUG [Timer-Driven Process Thread-4]
>> >>> org.apache.nifi.processors.aws.s3.ListS3
>> >>> ListS3[id=6119854d-015d-1000-341f-b294838980af] No new objects in S3
>> >>> bucket
>> >>> BUCKETNAME to list. Yielding.
>> >>> 2017-07-20 11:39:10,288 DEBUG [Timer-Driven Process Thread-4]
>> >>> org.apache.nifi.processors.aws.s3.ListS3
>> >>> ListS3[id=6119854d-015d-1000-341f-b294838980af] has chosen to yield
>> >>> its
>> >>> resources; will not be scheduled to run again for 1000 milliseconds
>> >>> 2017-07-20 11:39:10,558 INFO [pool-8-thread-1]
>> >>> o.a.n.c.r.WriteAheadFlowFileRepository Initiating checkpoint of
>> >>> FlowFile
>> >>> Repository
>> >>> 2017-07-20 11:39:10,633 INFO [pool-8-thread-1]
>> >>> org.wali.MinimalLockingWriteAheadLog
>> >>> org.wali.MinimalLockingWriteAheadLog@1773faf8 checkpointed with 0
>> >>> Records
>> >>> and 0 Swap Files in 74 milliseconds (Stop-the-world time = 34
>> >>> milliseconds,
>> >>> Clear Edit Logs time = 30 millis), max Transaction ID -1
>> >>> 2017-07-20 11:39:10,633 INFO [pool-8-thread-1]
>> >>> o.a.n.c.r.WriteAheadFlowFileRepository Successfully checkpointed
>> >>> FlowFile
>> >>> Repository with 0 records in 75 milliseconds
>> >>> 2017-07-20 11:39:11,289 DEBUG [Timer-Driven Process Thread-10]
>> >>> org.apache.nifi.processors.aws.s3.ListS3
>> >>> ListS3[id=6119854d-015d-1000-341f-b294838980af] Returning CLUSTER
>> >>> State:
>> >>> StandardStateMap[version=-1, values={}]
>> >>> 2017-07-20 11:39:11,328 INFO [Timer-Driven Process Thread-10]
>> >>> org.apache.nifi.processors.aws.s3.ListS3
>> >>> ListS3[id=6119854d-015d-1000-341f-b294838980af] Successfully listed
>> >>> S3
>> >>> bucket BUCKETNAME in 39 millis
>> >>> 2017-07-20 11:39:11,328 DEBUG [Timer-Driven Process Thread-10]
>> >>> org.apache.nifi.processors.aws.s3.ListS3
>> >>> ListS3[id=6119854d-015d-1000-341f-b294838980af] No new objects in S3
>> >>> bucket
>> >>> BUCKETNAME to list. Yielding.
>> >>> 2017-07-20 11:39:11,328 DEBUG [Timer-Driven Process Thread-10]
>> >>> org.apache.nifi.processors.aws.s3.ListS3
>> >>> ListS3[id=6119854d-015d-1000-341f-b294838980af] has chosen to yield
>> >>> its
>> >>> resources; will not be scheduled to run again for 1000 milliseconds
>> >>> 2017-07-20 11:39:12,329 DEBUG [Timer-Driven Process Thread-2]
>> >>> org.apache.nifi.processors.aws.s3.ListS3
>> >>> ListS3[id=6119854d-015d-1000-341f-b294838980af] Returning CLUSTER
>> >>> State:
>> >>> StandardStateMap[version=-1, values={}]
>> >>> 2017-07-20 11:39:12,376 INFO [Timer-Driven Process Thread-2]
>> >>> org.apache.nifi.processors.aws.s3.ListS3
>> >>> ListS3[id=6119854d-015d-1000-341f-b294838980af] Successfully listed
>> >>> S3
>> >>> bucket BUCKETNAME in 46 millis
>> >>> 2017-07-20 11:39:12,376 DEBUG [Timer-Driven Process Thread-2]
>> >>> org.apache.nifi.processors.aws.s3.ListS3
>> >>> ListS3[id=6119854d-015d-1000-341f-b294838980af] No new objects in S3
>> >>> bucket
>> >>> BUCKETNAME to list. Yielding.
>> >>> 2017-07-20 11:39:12,376 DEBUG [Timer-Driven Process Thread-2]
>> >>> org.apache.nifi.processors.aws.s3.ListS3
>> >>> ListS3[id=6119854d-015d-1000-341f-b294838980af] has chosen to yield
>> >>> its
>> >>> resources; will not be scheduled to run again for 1000 milliseconds
>> >>> 2017-07-20 11:39:13,377 DEBUG [Timer-Driven Process Thread-2]
>> >>> org.apache.nifi.processors.aws.s3.ListS3
>> >>> ListS3[id=6119854d-015d-1000-341f-b294838980af] Returning CLUSTER
>> >>> State:
>> >>> StandardStateMap[version=-1, values={}]
>> >>> 2017-07-20 11:39:13,411 INFO [Timer-Driven Process Thread-2]
>> >>> org.apache.nifi.processors.aws.s3.ListS3
>> >>> ListS3[id=6119854d-015d-1000-341f-b294838980af] Successfully listed
>> >>> S3
>> >>> bucket BUCKETNAME in 34 millis
>> >>> 2017-07-20 11:39:13,411 DEBUG [Timer-Driven Process Thread-2]
>> >>> org.apache.nifi.processors.aws.s3.ListS3
>> >>> ListS3[id=6119854d-015d-1000-341f-b294838980af] No new objects in S3
>> >>> bucket
>> >>> BUCKETNAME to list. Yielding.
>> >>> 2017-07-20 11:39:13,412 DEBUG [Timer-Driven Process Thread-2]
>> >>> org.apache.nifi.processors.aws.s3.ListS3
>> >>> ListS3[id=6119854d-015d-1000-341f-b294838980af] has chosen to yield
>> >>> its
>> >>> resources; will not be scheduled to run again for 1000 milliseconds
>> >>> 2017-07-20 11:39:14,413 DEBUG [Timer-Driven Process Thread-4]
>> >>> org.apache.nifi.processors.aws.s3.ListS3
>> >>> ListS3[id=6119854d-015d-1000-341f-b294838980af] Returning CLUSTER
>> >>> State:
>> >>> StandardStateMap[version=-1, values={}]
>> >>> 2017-07-20 11:39:14,449 INFO [Timer-Driven Process Thread-4]
>> >>> org.apache.nifi.processors.aws.s3.ListS3
>> >>> ListS3[id=6119854d-015d-1000-341f-b294838980af] Successfully listed
>> >>> S3
>> >>> bucket BUCKETNAME in 36 millis
>> >>> 2017-07-20 11:39:14,450 DEBUG [Timer-Driven Process Thread-4]
>> >>> org.apache.nifi.processors.aws.s3.ListS3
>> >>> ListS3[id=6119854d-015d-1000-341f-b294838980af] No new objects in S3
>> >>> bucket
>> >>> BUCKETNAME to list. Yielding.
>> >>> 2017-07-20 11:39:14,450 DEBUG [Timer-Driven Process Thread-4]
>> >>> org.apache.nifi.processors.aws.s3.ListS3
>> >>> ListS3[id=6119854d-015d-1000-341f-b294838980af] has chosen to yield
>> >>> its
>> >>> resources; will not be scheduled to run again for 1000 milliseconds
>> >>> 2017-07-20 11:39:15,451 DEBUG [Timer-Driven Process Thread-8]
>> >>> org.apache.nifi.processors.aws.s3.ListS3
>> >>> ListS3[id=6119854d-015d-1000-341f-b294838980af] Returning CLUSTER
>> >>> State:
>> >>> StandardStateMap[version=-1, values={}]
>> >>> 2017-07-20 11:39:15,506 INFO [Timer-Driven Process Thread-8]
>> >>> org.apache.nifi.processors.aws.s3.ListS3
>> >>> ListS3[id=6119854d-015d-1000-341f-b294838980af] Successfully listed
>> >>> S3
>> >>> bucket BUCKETNAME in 54 millis
>> >>> 2017-07-20 11:39:15,506 DEBUG [Timer-Driven Process Thread-8]
>> >>> org.apache.nifi.processors.aws.s3.ListS3
>> >>> ListS3[id=6119854d-015d-1000-341f-b294838980af] No new objects in S3
>> >>> bucket
>> >>> BUCKETNAME to list. Yielding.
>> >>> 2017-07-20 11:39:15,506 DEBUG [Timer-Driven Process Thread-8]
>> >>> org.apache.nifi.processors.aws.s3.ListS3
>> >>> ListS3[id=6119854d-015d-1000-341f-b294838980af] has chosen to yield
>> >>> its
>> >>> resources; will not be scheduled to run again for 1000 milliseconds
>> >>>
>> >>> My S3 log structure is:
>> >>>
>> >>> BUCKETNAME/AWSLogs/ARN/CloudTrail-Digest/ap-northeast-1/2017/07/03/
>> 869964652807_CloudTrail-Digest_ap-northeast-1_cloudtrail-orca_us-west-2_
>> 20170703T192938Z.json.gz
>> >>>
>> >>> Any idea why it would not recurse into the BUCKETNAME?
>> >>>
>> >>> On 2017-07-20 09:31, Laurens Vets wrote:
>> >>>
>> >>> There's no state currently, ie state is empty.
>> >>>
>> >>> I would think that when there's no state, ListS3 would start from the
>> >>> beginning?
>> >>>
>> >>> FYI, the only items I've filled in in the ListS3 processor are:
>> >>>
>> >>> - Bucket: Our bucketname.
>> >>>
>> >>> - Region: Apparently I have to choose one, this is set to us-west-2
>> >>>
>> >>> - Access Key: <set>
>> >>>
>> >>> - Secret Key: <set>
>> >>>
>> >>> I'm pretty sure the above settings are correct because when I do "aws
>> >>> s3 ls
>> >>> s3://<bucketname>" with the above keys, I do get output.
>> >>>
>> >>> On 2017-07-20 09:18, Pierre Villard wrote:
>> >>>
>> >>> Can you check what's the current state of the processor? (right click
>> >>> / view
>> >>> state)
>> >>> Are you sure there is data to retrieve more recent that what is
>> >>> currently in
>> >>> the processor's state?
>> >>>
>> >>> Pierre
>> >>>
>> >>> 2017-07-20 18:16 GMT+02:00 Laurens Vets <[email protected]>:
>> >>>>
>> >>>> I'm running 1.3.0 at the moment... I'm tempted to go back to 1.2.0
>> >>>> as I
>> >>>> remember I got something working with S3.
>> >>>>
>> >>>> Can I just downgrade?
>> >>>>
>> >>>> On 2017-07-20 09:12, Adam Lamar wrote:
>> >>>>
>> >>>> Hi Laurens,
>> >>>>
>> >>>> What NiFi version are you running? There was an issue where ListS3
>> >>>> would
>> >>>> spin like that on buckets with many files, but it was fixed in
>> >>>> version 1.1.0
>> >>>> IIRC.
>> >>>>
>> >>>> Hope that helps,
>> >>>> Adam
>> >>>>
>> >>>>
>> >>>> On Thu, Jul 20, 2017 at 10:05 AM, Laurens Vets <[email protected]>
>> >>>> wrote:
>> >>>>>
>> >>>>> Hello,
>> >>>>>
>> >>>>> I'm trying to ingest AWS CloudTrail logs with NiFi. I think I
>> >>>>> configured
>> >>>>> ListS3 correctly, but it has been running for hours & hours without
>> >>>>> showing
>> >>>>> anything (except for the # of tasks).
>> >>>>>
>> >>>>> How long does it take before I should see _any_
>> >>>>> output/state/something in
>> >>>>> the ListS3 processor?
>> >>>>
>> >>>>
>> >>>
>> >>>
>
>
>

Reply via email to