Hi Jeff, Did you by any chance found your flow? I want to compare yours against mine if possible.
Basically, what I came up with is: 1. ListS3. 2. RouteOnAttribute. Route only when the filename contains "_CloudTrail_". I don't care about the digests (I think). 3. FetchS3Object. 4. UpdateAttribute: Remove S3 Path From Filename. I only care about the actual filename, not the included S3 path. 5. Gunzip the object. 6. SplitJson. 7. UpdateAttribute to remove the .json extension. 8. UpdateAttribute to contruct a unique filename. This adds a UUID to the filename, otherwise, a bunch of the split json files have the same name. 9. PublishKafka. Side node: I don't think the editing of the filename is all that important, it just looked clean :) > I think I have a working Cloudtrail flow on my other computer... I'll try to > fire that up today and see what I get. I used 1.3.0 the last time I looked > at Cloudtrail data. > > On Thu, Jul 20, 2017 at 4:56 PM Laurens Vets <[email protected]> wrote: > >> Please see inline for my answers and some additional information. >> >>> It sounds like you are doing the right troubleshooting steps. A few >>> more ideas off the top of my head: >>> >>> * When you tested with the s3 cli, did you use the same credentials, >>> from the same machine NiFi is running on? The CloudTrail events are >>> written by AWS, so the ownership and permissions might be tricky. >> >> Same credentials, not the same machine. >> >>> * As an experiment, try creating one or more new directory/objects as >>> the NiFi user and configuring ListS3's prefix to target only these new >>> objects (you might want to copy/paste ListS3 or be sure to wipe out the >>> state later). >> >> I'll try this as well. >> >>> * You are sure the prefix is blank? You might try setting it to >>> "AWSLogs/" for a while to see if it's different. >> >> Tried with a blank prefix, with "/" and "AWSLogs" now, no change. Or >> should I wait a while first? >> If I set the prefix to a directory containing actual log objects >> (*.json.gz files), ListS3 is able to list them almost immediately. The >> prefix used is "AWSLogs/<aws_id>/CloudTrail/ap-northeast-1/2017/07/03/" >> in this case. >> It sems ListS3 doesn't recurse? >> >>> * Do you have CloudTrail set up to record S3 data events, or can you >>> set this up? This is usually very tedious, but sometimes there is no >>> substitute. >> >> I'll doublecheck. I believe I set this up. >> >> Kind regards, >> Laurens >> >>> On Thu, Jul 20, 2017 at 11:56 AM, Joe Witt <[email protected]> wrote: >>> >>>> Looking at the code it suggests the two cases where it would come up >>>> with nothing for listing (when there are items to list) is if there is >>>> state already tracking lastModified of a previously pulled object or >>>> previously pulled object with the same key. Since you're not even >>>> getting to the point where state is being persisted it suggests it >>>> really is getting nothing back on the listing request. >>>> >>>> Just in looking at the docs I wonder if you'll need to explicitly set >>>> the prefix value to something like '/'? >>>> >>>> JeffStorck/JamesWing: Any ideas? >>>> >>>> We should update the code to provide debug information when listed >>>> objects are skipped. >>>> >>>> Thanks >>>> Joe >>>> >>>> On Thu, Jul 20, 2017 at 2:44 PM, Laurens Vets <[email protected]> >>>> wrote: >>>>> I enabled DEBUG logging and I see the following: >>>>> >>>>> >>>>> 2017-07-20 11:39:08,670 DEBUG [StandardProcessScheduler Thread-1] >>>>> org.apache.nifi.processors.aws.s3.ListS3 >>>>> ListS3[id=6119854d-015d-1000-341f-b294838980af] Using aws credentials >>>>> for >>>>> creating client >>>>> 2017-07-20 11:39:08,670 INFO [StandardProcessScheduler Thread-1] >>>>> org.apache.nifi.processors.aws.s3.ListS3 >>>>> ListS3[id=6119854d-015d-1000-341f-b294838980af] Creating client with >>>>> AWS >>>>> credentials >>>>> 2017-07-20 11:39:08,672 INFO [StandardProcessScheduler Thread-1] >>>>> o.a.n.c.s.TimerDrivenSchedulingAgent Scheduled >>>>> ListS3[id=6119854d-015d-1000-341f-b294838980af] to run with 1 threads >>>>> 2017-07-20 11:39:08,674 DEBUG [Timer-Driven Process Thread-4] >>>>> org.apache.nifi.processors.aws.s3.ListS3 >>>>> ListS3[id=6119854d-015d-1000-341f-b294838980af] Returning CLUSTER >>>>> State: >>>>> StandardStateMap[version=-1, values={}] >>>>> 2017-07-20 11:39:09,089 INFO [Flow Service Tasks Thread-2] >>>>> o.a.nifi.controller.StandardFlowService Saved flow controller >>>>> org.apache.nifi.controller.FlowController@7c10f421 // Another save >>>>> pending = >>>>> false >>>>> 2017-07-20 11:39:09,249 INFO [Timer-Driven Process Thread-4] >>>>> org.apache.nifi.processors.aws.s3.ListS3 >>>>> ListS3[id=6119854d-015d-1000-341f-b294838980af] Successfully listed >>>>> S3 >>>>> bucket BUCKETNAME in 575 millis >>>>> 2017-07-20 11:39:09,249 DEBUG [Timer-Driven Process Thread-4] >>>>> org.apache.nifi.processors.aws.s3.ListS3 >>>>> ListS3[id=6119854d-015d-1000-341f-b294838980af] No new objects in S3 >>>>> bucket >>>>> BUCKETNAME to list. Yielding. >>>>> 2017-07-20 11:39:09,249 DEBUG [Timer-Driven Process Thread-4] >>>>> org.apache.nifi.processors.aws.s3.ListS3 >>>>> ListS3[id=6119854d-015d-1000-341f-b294838980af] has chosen to yield >>>>> its >>>>> resources; will not be scheduled to run again for 1000 milliseconds >>>>> 2017-07-20 11:39:10,246 INFO [Write-Ahead Local State Provider >>>>> Maintenance] >>>>> org.wali.MinimalLockingWriteAheadLog >>>>> org.wali.MinimalLockingWriteAheadLog@2480acc3 checkpointed with 0 >>>>> Records >>>>> and 0 Swap Files in 9 milliseconds (Stop-the-world time = 1 >>>>> milliseconds, >>>>> Clear Edit Logs time = 0 millis), max Transaction ID -1 >>>>> 2017-07-20 11:39:10,250 DEBUG [Timer-Driven Process Thread-4] >>>>> org.apache.nifi.processors.aws.s3.ListS3 >>>>> ListS3[id=6119854d-015d-1000-341f-b294838980af] Returning CLUSTER >>>>> State: >>>>> StandardStateMap[version=-1, values={}] >>>>> 2017-07-20 11:39:10,288 INFO [Timer-Driven Process Thread-4] >>>>> org.apache.nifi.processors.aws.s3.ListS3 >>>>> ListS3[id=6119854d-015d-1000-341f-b294838980af] Successfully listed >>>>> S3 >>>>> bucket BUCKETNAME in 37 millis >>>>> 2017-07-20 11:39:10,288 DEBUG [Timer-Driven Process Thread-4] >>>>> org.apache.nifi.processors.aws.s3.ListS3 >>>>> ListS3[id=6119854d-015d-1000-341f-b294838980af] No new objects in S3 >>>>> bucket >>>>> BUCKETNAME to list. Yielding. >>>>> 2017-07-20 11:39:10,288 DEBUG [Timer-Driven Process Thread-4] >>>>> org.apache.nifi.processors.aws.s3.ListS3 >>>>> ListS3[id=6119854d-015d-1000-341f-b294838980af] has chosen to yield >>>>> its >>>>> resources; will not be scheduled to run again for 1000 milliseconds >>>>> 2017-07-20 11:39:10,558 INFO [pool-8-thread-1] >>>>> o.a.n.c.r.WriteAheadFlowFileRepository Initiating checkpoint of >>>>> FlowFile >>>>> Repository >>>>> 2017-07-20 11:39:10,633 INFO [pool-8-thread-1] >>>>> org.wali.MinimalLockingWriteAheadLog >>>>> org.wali.MinimalLockingWriteAheadLog@1773faf8 checkpointed with 0 >>>>> Records >>>>> and 0 Swap Files in 74 milliseconds (Stop-the-world time = 34 >>>>> milliseconds, >>>>> Clear Edit Logs time = 30 millis), max Transaction ID -1 >>>>> 2017-07-20 11:39:10,633 INFO [pool-8-thread-1] >>>>> o.a.n.c.r.WriteAheadFlowFileRepository Successfully checkpointed >>>>> FlowFile >>>>> Repository with 0 records in 75 milliseconds >>>>> 2017-07-20 11:39:11,289 DEBUG [Timer-Driven Process Thread-10] >>>>> org.apache.nifi.processors.aws.s3.ListS3 >>>>> ListS3[id=6119854d-015d-1000-341f-b294838980af] Returning CLUSTER >>>>> State: >>>>> StandardStateMap[version=-1, values={}] >>>>> 2017-07-20 11:39:11,328 INFO [Timer-Driven Process Thread-10] >>>>> org.apache.nifi.processors.aws.s3.ListS3 >>>>> ListS3[id=6119854d-015d-1000-341f-b294838980af] Successfully listed >>>>> S3 >>>>> bucket BUCKETNAME in 39 millis >>>>> 2017-07-20 11:39:11,328 DEBUG [Timer-Driven Process Thread-10] >>>>> org.apache.nifi.processors.aws.s3.ListS3 >>>>> ListS3[id=6119854d-015d-1000-341f-b294838980af] No new objects in S3 >>>>> bucket >>>>> BUCKETNAME to list. Yielding. >>>>> 2017-07-20 11:39:11,328 DEBUG [Timer-Driven Process Thread-10] >>>>> org.apache.nifi.processors.aws.s3.ListS3 >>>>> ListS3[id=6119854d-015d-1000-341f-b294838980af] has chosen to yield >>>>> its >>>>> resources; will not be scheduled to run again for 1000 milliseconds >>>>> 2017-07-20 11:39:12,329 DEBUG [Timer-Driven Process Thread-2] >>>>> org.apache.nifi.processors.aws.s3.ListS3 >>>>> ListS3[id=6119854d-015d-1000-341f-b294838980af] Returning CLUSTER >>>>> State: >>>>> StandardStateMap[version=-1, values={}] >>>>> 2017-07-20 11:39:12,376 INFO [Timer-Driven Process Thread-2] >>>>> org.apache.nifi.processors.aws.s3.ListS3 >>>>> ListS3[id=6119854d-015d-1000-341f-b294838980af] Successfully listed >>>>> S3 >>>>> bucket BUCKETNAME in 46 millis >>>>> 2017-07-20 11:39:12,376 DEBUG [Timer-Driven Process Thread-2] >>>>> org.apache.nifi.processors.aws.s3.ListS3 >>>>> ListS3[id=6119854d-015d-1000-341f-b294838980af] No new objects in S3 >>>>> bucket >>>>> BUCKETNAME to list. Yielding. >>>>> 2017-07-20 11:39:12,376 DEBUG [Timer-Driven Process Thread-2] >>>>> org.apache.nifi.processors.aws.s3.ListS3 >>>>> ListS3[id=6119854d-015d-1000-341f-b294838980af] has chosen to yield >>>>> its >>>>> resources; will not be scheduled to run again for 1000 milliseconds >>>>> 2017-07-20 11:39:13,377 DEBUG [Timer-Driven Process Thread-2] >>>>> org.apache.nifi.processors.aws.s3.ListS3 >>>>> ListS3[id=6119854d-015d-1000-341f-b294838980af] Returning CLUSTER >>>>> State: >>>>> StandardStateMap[version=-1, values={}] >>>>> 2017-07-20 11:39:13,411 INFO [Timer-Driven Process Thread-2] >>>>> org.apache.nifi.processors.aws.s3.ListS3 >>>>> ListS3[id=6119854d-015d-1000-341f-b294838980af] Successfully listed >>>>> S3 >>>>> bucket BUCKETNAME in 34 millis >>>>> 2017-07-20 11:39:13,411 DEBUG [Timer-Driven Process Thread-2] >>>>> org.apache.nifi.processors.aws.s3.ListS3 >>>>> ListS3[id=6119854d-015d-1000-341f-b294838980af] No new objects in S3 >>>>> bucket >>>>> BUCKETNAME to list. Yielding. >>>>> 2017-07-20 11:39:13,412 DEBUG [Timer-Driven Process Thread-2] >>>>> org.apache.nifi.processors.aws.s3.ListS3 >>>>> ListS3[id=6119854d-015d-1000-341f-b294838980af] has chosen to yield >>>>> its >>>>> resources; will not be scheduled to run again for 1000 milliseconds >>>>> 2017-07-20 11:39:14,413 DEBUG [Timer-Driven Process Thread-4] >>>>> org.apache.nifi.processors.aws.s3.ListS3 >>>>> ListS3[id=6119854d-015d-1000-341f-b294838980af] Returning CLUSTER >>>>> State: >>>>> StandardStateMap[version=-1, values={}] >>>>> 2017-07-20 11:39:14,449 INFO [Timer-Driven Process Thread-4] >>>>> org.apache.nifi.processors.aws.s3.ListS3 >>>>> ListS3[id=6119854d-015d-1000-341f-b294838980af] Successfully listed >>>>> S3 >>>>> bucket BUCKETNAME in 36 millis >>>>> 2017-07-20 11:39:14,450 DEBUG [Timer-Driven Process Thread-4] >>>>> org.apache.nifi.processors.aws.s3.ListS3 >>>>> ListS3[id=6119854d-015d-1000-341f-b294838980af] No new objects in S3 >>>>> bucket >>>>> BUCKETNAME to list. Yielding. >>>>> 2017-07-20 11:39:14,450 DEBUG [Timer-Driven Process Thread-4] >>>>> org.apache.nifi.processors.aws.s3.ListS3 >>>>> ListS3[id=6119854d-015d-1000-341f-b294838980af] has chosen to yield >>>>> its >>>>> resources; will not be scheduled to run again for 1000 milliseconds >>>>> 2017-07-20 11:39:15,451 DEBUG [Timer-Driven Process Thread-8] >>>>> org.apache.nifi.processors.aws.s3.ListS3 >>>>> ListS3[id=6119854d-015d-1000-341f-b294838980af] Returning CLUSTER >>>>> State: >>>>> StandardStateMap[version=-1, values={}] >>>>> 2017-07-20 11:39:15,506 INFO [Timer-Driven Process Thread-8] >>>>> org.apache.nifi.processors.aws.s3.ListS3 >>>>> ListS3[id=6119854d-015d-1000-341f-b294838980af] Successfully listed >>>>> S3 >>>>> bucket BUCKETNAME in 54 millis >>>>> 2017-07-20 11:39:15,506 DEBUG [Timer-Driven Process Thread-8] >>>>> org.apache.nifi.processors.aws.s3.ListS3 >>>>> ListS3[id=6119854d-015d-1000-341f-b294838980af] No new objects in S3 >>>>> bucket >>>>> BUCKETNAME to list. Yielding. >>>>> 2017-07-20 11:39:15,506 DEBUG [Timer-Driven Process Thread-8] >>>>> org.apache.nifi.processors.aws.s3.ListS3 >>>>> ListS3[id=6119854d-015d-1000-341f-b294838980af] has chosen to yield >>>>> its >>>>> resources; will not be scheduled to run again for 1000 milliseconds >>>>> >>>>> My S3 log structure is: >>>>> >>>>> BUCKETNAME/AWSLogs/ARN/CloudTrail-Digest/ap-northeast-1/2017/07/03/869964652807_CloudTrail-Digest_ap-northeast-1_cloudtrail-orca_us-west-2_20170703T192938Z.json.gz >>>>> >>>>> Any idea why it would not recurse into the BUCKETNAME? >>>>> >>>>> On 2017-07-20 09:31, Laurens Vets wrote: >>>>> >>>>> There's no state currently, ie state is empty. >>>>> >>>>> I would think that when there's no state, ListS3 would start from the >>>>> beginning? >>>>> >>>>> FYI, the only items I've filled in in the ListS3 processor are: >>>>> >>>>> - Bucket: Our bucketname. >>>>> >>>>> - Region: Apparently I have to choose one, this is set to us-west-2 >>>>> >>>>> - Access Key: <set> >>>>> >>>>> - Secret Key: <set> >>>>> >>>>> I'm pretty sure the above settings are correct because when I do "aws >>>>> s3 ls >>>>> s3://<bucketname>" with the above keys, I do get output. >>>>> >>>>> On 2017-07-20 09:18, Pierre Villard wrote: >>>>> >>>>> Can you check what's the current state of the processor? (right click >>>>> / view >>>>> state) >>>>> Are you sure there is data to retrieve more recent that what is >>>>> currently in >>>>> the processor's state? >>>>> >>>>> Pierre >>>>> >>>>> 2017-07-20 18:16 GMT+02:00 Laurens Vets <[email protected]>: >>>>>> >>>>>> I'm running 1.3.0 at the moment... I'm tempted to go back to 1.2.0 >>>>>> as I >>>>>> remember I got something working with S3. >>>>>> >>>>>> Can I just downgrade? >>>>>> >>>>>> On 2017-07-20 09:12, Adam Lamar wrote: >>>>>> >>>>>> Hi Laurens, >>>>>> >>>>>> What NiFi version are you running? There was an issue where ListS3 >>>>>> would >>>>>> spin like that on buckets with many files, but it was fixed in >>>>>> version 1.1.0 >>>>>> IIRC. >>>>>> >>>>>> Hope that helps, >>>>>> Adam >>>>>> >>>>>> >>>>>> On Thu, Jul 20, 2017 at 10:05 AM, Laurens Vets <[email protected]> >>>>>> wrote: >>>>>>> >>>>>>> Hello, >>>>>>> >>>>>>> I'm trying to ingest AWS CloudTrail logs with NiFi. I think I >>>>>>> configured >>>>>>> ListS3 correctly, but it has been running for hours & hours without >>>>>>> showing >>>>>>> anything (except for the # of tasks). >>>>>>> >>>>>>> How long does it take before I should see _any_ >>>>>>> output/state/something in >>>>>>> the ListS3 processor? >>>>>> >>>>>> >>>>> >>>>>
