Lauren, For debugging and tracking purposes, you might want to look at the fragment attributes created by SplitJson instead of generating a UUID to add to each split.
The "fragment.identifier" contains a unique UUID shared among all fragments, "fragment.index" indicates the index of current split within the set, and "fragment.count" give the total number of splits created by SplitJson. Regards, Joe S On Mon, Jul 24, 2017 at 11:58 AM, Laurens Vets <[email protected]> wrote: > Hi Jeff, > > Did you by any chance found your flow? I want to compare yours against > mine if possible. > > Basically, what I came up with is: > > 1. ListS3. > > 2. RouteOnAttribute. Route only when the filename contains "_CloudTrail_". > I don't care about the digests (I think). > > 3. FetchS3Object. > > 4. UpdateAttribute: Remove S3 Path From Filename. I only care about the > actual filename, not the included S3 path. > > 5. Gunzip the object. > > 6. SplitJson. > > 7. UpdateAttribute to remove the .json extension. > > 8. UpdateAttribute to contruct a unique filename. This adds a UUID to the > filename, otherwise, a bunch of the split json files have the same name. > > 9. PublishKafka. > > Side node: I don't think the editing of the filename is all that > important, it just looked clean :) > > I think I have a working Cloudtrail flow on my other computer... I'll try > to fire that up today and see what I get. I used 1.3.0 the last time I > looked at Cloudtrail data. > > On Thu, Jul 20, 2017 at 4:56 PM Laurens Vets <[email protected]> wrote: > >> Please see inline for my answers and some additional information. >> >> > It sounds like you are doing the right troubleshooting steps. A few >> > more ideas off the top of my head: >> > >> > * When you tested with the s3 cli, did you use the same credentials, >> > from the same machine NiFi is running on? The CloudTrail events are >> > written by AWS, so the ownership and permissions might be tricky. >> >> Same credentials, not the same machine. >> >> > * As an experiment, try creating one or more new directory/objects as >> > the NiFi user and configuring ListS3's prefix to target only these new >> > objects (you might want to copy/paste ListS3 or be sure to wipe out the >> > state later). >> >> I'll try this as well. >> >> > * You are sure the prefix is blank? You might try setting it to >> > "AWSLogs/" for a while to see if it's different. >> >> Tried with a blank prefix, with "/" and "AWSLogs" now, no change. Or >> should I wait a while first? >> If I set the prefix to a directory containing actual log objects >> (*.json.gz files), ListS3 is able to list them almost immediately. The >> prefix used is "AWSLogs/<aws_id>/CloudTrail/ap-northeast-1/2017/07/03/" >> in this case. >> It sems ListS3 doesn't recurse? >> >> > * Do you have CloudTrail set up to record S3 data events, or can you >> > set this up? This is usually very tedious, but sometimes there is no >> > substitute. >> >> I'll doublecheck. I believe I set this up. >> >> Kind regards, >> Laurens >> >> > On Thu, Jul 20, 2017 at 11:56 AM, Joe Witt <[email protected]> wrote: >> > >> >> Looking at the code it suggests the two cases where it would come up >> >> with nothing for listing (when there are items to list) is if there is >> >> state already tracking lastModified of a previously pulled object or >> >> previously pulled object with the same key. Since you're not even >> >> getting to the point where state is being persisted it suggests it >> >> really is getting nothing back on the listing request. >> >> >> >> Just in looking at the docs I wonder if you'll need to explicitly set >> >> the prefix value to something like '/'? >> >> >> >> JeffStorck/JamesWing: Any ideas? >> >> >> >> We should update the code to provide debug information when listed >> >> objects are skipped. >> >> >> >> Thanks >> >> Joe >> >> >> >> On Thu, Jul 20, 2017 at 2:44 PM, Laurens Vets <[email protected]> >> >> wrote: >> >>> I enabled DEBUG logging and I see the following: >> >>> >> >>> >> >>> 2017-07-20 11:39:08,670 DEBUG [StandardProcessScheduler Thread-1] >> >>> org.apache.nifi.processors.aws.s3.ListS3 >> >>> ListS3[id=6119854d-015d-1000-341f-b294838980af] Using aws credentials >> >>> for >> >>> creating client >> >>> 2017-07-20 11:39:08,670 INFO [StandardProcessScheduler Thread-1] >> >>> org.apache.nifi.processors.aws.s3.ListS3 >> >>> ListS3[id=6119854d-015d-1000-341f-b294838980af] Creating client with >> >>> AWS >> >>> credentials >> >>> 2017-07-20 11:39:08,672 INFO [StandardProcessScheduler Thread-1] >> >>> o.a.n.c.s.TimerDrivenSchedulingAgent Scheduled >> >>> ListS3[id=6119854d-015d-1000-341f-b294838980af] to run with 1 threads >> >>> 2017-07-20 11:39:08,674 DEBUG [Timer-Driven Process Thread-4] >> >>> org.apache.nifi.processors.aws.s3.ListS3 >> >>> ListS3[id=6119854d-015d-1000-341f-b294838980af] Returning CLUSTER >> >>> State: >> >>> StandardStateMap[version=-1, values={}] >> >>> 2017-07-20 11:39:09,089 INFO [Flow Service Tasks Thread-2] >> >>> o.a.nifi.controller.StandardFlowService Saved flow controller >> >>> org.apache.nifi.controller.FlowController@7c10f421 // Another save >> >>> pending = >> >>> false >> >>> 2017-07-20 11:39:09,249 INFO [Timer-Driven Process Thread-4] >> >>> org.apache.nifi.processors.aws.s3.ListS3 >> >>> ListS3[id=6119854d-015d-1000-341f-b294838980af] Successfully listed >> >>> S3 >> >>> bucket BUCKETNAME in 575 millis >> >>> 2017-07-20 11:39:09,249 DEBUG [Timer-Driven Process Thread-4] >> >>> org.apache.nifi.processors.aws.s3.ListS3 >> >>> ListS3[id=6119854d-015d-1000-341f-b294838980af] No new objects in S3 >> >>> bucket >> >>> BUCKETNAME to list. Yielding. >> >>> 2017-07-20 11:39:09,249 DEBUG [Timer-Driven Process Thread-4] >> >>> org.apache.nifi.processors.aws.s3.ListS3 >> >>> ListS3[id=6119854d-015d-1000-341f-b294838980af] has chosen to yield >> >>> its >> >>> resources; will not be scheduled to run again for 1000 milliseconds >> >>> 2017-07-20 11:39:10,246 INFO [Write-Ahead Local State Provider >> >>> Maintenance] >> >>> org.wali.MinimalLockingWriteAheadLog >> >>> org.wali.MinimalLockingWriteAheadLog@2480acc3 checkpointed with 0 >> >>> Records >> >>> and 0 Swap Files in 9 milliseconds (Stop-the-world time = 1 >> >>> milliseconds, >> >>> Clear Edit Logs time = 0 millis), max Transaction ID -1 >> >>> 2017-07-20 11:39:10,250 DEBUG [Timer-Driven Process Thread-4] >> >>> org.apache.nifi.processors.aws.s3.ListS3 >> >>> ListS3[id=6119854d-015d-1000-341f-b294838980af] Returning CLUSTER >> >>> State: >> >>> StandardStateMap[version=-1, values={}] >> >>> 2017-07-20 11:39:10,288 INFO [Timer-Driven Process Thread-4] >> >>> org.apache.nifi.processors.aws.s3.ListS3 >> >>> ListS3[id=6119854d-015d-1000-341f-b294838980af] Successfully listed >> >>> S3 >> >>> bucket BUCKETNAME in 37 millis >> >>> 2017-07-20 11:39:10,288 DEBUG [Timer-Driven Process Thread-4] >> >>> org.apache.nifi.processors.aws.s3.ListS3 >> >>> ListS3[id=6119854d-015d-1000-341f-b294838980af] No new objects in S3 >> >>> bucket >> >>> BUCKETNAME to list. Yielding. >> >>> 2017-07-20 11:39:10,288 DEBUG [Timer-Driven Process Thread-4] >> >>> org.apache.nifi.processors.aws.s3.ListS3 >> >>> ListS3[id=6119854d-015d-1000-341f-b294838980af] has chosen to yield >> >>> its >> >>> resources; will not be scheduled to run again for 1000 milliseconds >> >>> 2017-07-20 11:39:10,558 INFO [pool-8-thread-1] >> >>> o.a.n.c.r.WriteAheadFlowFileRepository Initiating checkpoint of >> >>> FlowFile >> >>> Repository >> >>> 2017-07-20 11:39:10,633 INFO [pool-8-thread-1] >> >>> org.wali.MinimalLockingWriteAheadLog >> >>> org.wali.MinimalLockingWriteAheadLog@1773faf8 checkpointed with 0 >> >>> Records >> >>> and 0 Swap Files in 74 milliseconds (Stop-the-world time = 34 >> >>> milliseconds, >> >>> Clear Edit Logs time = 30 millis), max Transaction ID -1 >> >>> 2017-07-20 11:39:10,633 INFO [pool-8-thread-1] >> >>> o.a.n.c.r.WriteAheadFlowFileRepository Successfully checkpointed >> >>> FlowFile >> >>> Repository with 0 records in 75 milliseconds >> >>> 2017-07-20 11:39:11,289 DEBUG [Timer-Driven Process Thread-10] >> >>> org.apache.nifi.processors.aws.s3.ListS3 >> >>> ListS3[id=6119854d-015d-1000-341f-b294838980af] Returning CLUSTER >> >>> State: >> >>> StandardStateMap[version=-1, values={}] >> >>> 2017-07-20 11:39:11,328 INFO [Timer-Driven Process Thread-10] >> >>> org.apache.nifi.processors.aws.s3.ListS3 >> >>> ListS3[id=6119854d-015d-1000-341f-b294838980af] Successfully listed >> >>> S3 >> >>> bucket BUCKETNAME in 39 millis >> >>> 2017-07-20 11:39:11,328 DEBUG [Timer-Driven Process Thread-10] >> >>> org.apache.nifi.processors.aws.s3.ListS3 >> >>> ListS3[id=6119854d-015d-1000-341f-b294838980af] No new objects in S3 >> >>> bucket >> >>> BUCKETNAME to list. Yielding. >> >>> 2017-07-20 11:39:11,328 DEBUG [Timer-Driven Process Thread-10] >> >>> org.apache.nifi.processors.aws.s3.ListS3 >> >>> ListS3[id=6119854d-015d-1000-341f-b294838980af] has chosen to yield >> >>> its >> >>> resources; will not be scheduled to run again for 1000 milliseconds >> >>> 2017-07-20 11:39:12,329 DEBUG [Timer-Driven Process Thread-2] >> >>> org.apache.nifi.processors.aws.s3.ListS3 >> >>> ListS3[id=6119854d-015d-1000-341f-b294838980af] Returning CLUSTER >> >>> State: >> >>> StandardStateMap[version=-1, values={}] >> >>> 2017-07-20 11:39:12,376 INFO [Timer-Driven Process Thread-2] >> >>> org.apache.nifi.processors.aws.s3.ListS3 >> >>> ListS3[id=6119854d-015d-1000-341f-b294838980af] Successfully listed >> >>> S3 >> >>> bucket BUCKETNAME in 46 millis >> >>> 2017-07-20 11:39:12,376 DEBUG [Timer-Driven Process Thread-2] >> >>> org.apache.nifi.processors.aws.s3.ListS3 >> >>> ListS3[id=6119854d-015d-1000-341f-b294838980af] No new objects in S3 >> >>> bucket >> >>> BUCKETNAME to list. Yielding. >> >>> 2017-07-20 11:39:12,376 DEBUG [Timer-Driven Process Thread-2] >> >>> org.apache.nifi.processors.aws.s3.ListS3 >> >>> ListS3[id=6119854d-015d-1000-341f-b294838980af] has chosen to yield >> >>> its >> >>> resources; will not be scheduled to run again for 1000 milliseconds >> >>> 2017-07-20 11:39:13,377 DEBUG [Timer-Driven Process Thread-2] >> >>> org.apache.nifi.processors.aws.s3.ListS3 >> >>> ListS3[id=6119854d-015d-1000-341f-b294838980af] Returning CLUSTER >> >>> State: >> >>> StandardStateMap[version=-1, values={}] >> >>> 2017-07-20 11:39:13,411 INFO [Timer-Driven Process Thread-2] >> >>> org.apache.nifi.processors.aws.s3.ListS3 >> >>> ListS3[id=6119854d-015d-1000-341f-b294838980af] Successfully listed >> >>> S3 >> >>> bucket BUCKETNAME in 34 millis >> >>> 2017-07-20 11:39:13,411 DEBUG [Timer-Driven Process Thread-2] >> >>> org.apache.nifi.processors.aws.s3.ListS3 >> >>> ListS3[id=6119854d-015d-1000-341f-b294838980af] No new objects in S3 >> >>> bucket >> >>> BUCKETNAME to list. Yielding. >> >>> 2017-07-20 11:39:13,412 DEBUG [Timer-Driven Process Thread-2] >> >>> org.apache.nifi.processors.aws.s3.ListS3 >> >>> ListS3[id=6119854d-015d-1000-341f-b294838980af] has chosen to yield >> >>> its >> >>> resources; will not be scheduled to run again for 1000 milliseconds >> >>> 2017-07-20 11:39:14,413 DEBUG [Timer-Driven Process Thread-4] >> >>> org.apache.nifi.processors.aws.s3.ListS3 >> >>> ListS3[id=6119854d-015d-1000-341f-b294838980af] Returning CLUSTER >> >>> State: >> >>> StandardStateMap[version=-1, values={}] >> >>> 2017-07-20 11:39:14,449 INFO [Timer-Driven Process Thread-4] >> >>> org.apache.nifi.processors.aws.s3.ListS3 >> >>> ListS3[id=6119854d-015d-1000-341f-b294838980af] Successfully listed >> >>> S3 >> >>> bucket BUCKETNAME in 36 millis >> >>> 2017-07-20 11:39:14,450 DEBUG [Timer-Driven Process Thread-4] >> >>> org.apache.nifi.processors.aws.s3.ListS3 >> >>> ListS3[id=6119854d-015d-1000-341f-b294838980af] No new objects in S3 >> >>> bucket >> >>> BUCKETNAME to list. Yielding. >> >>> 2017-07-20 11:39:14,450 DEBUG [Timer-Driven Process Thread-4] >> >>> org.apache.nifi.processors.aws.s3.ListS3 >> >>> ListS3[id=6119854d-015d-1000-341f-b294838980af] has chosen to yield >> >>> its >> >>> resources; will not be scheduled to run again for 1000 milliseconds >> >>> 2017-07-20 11:39:15,451 DEBUG [Timer-Driven Process Thread-8] >> >>> org.apache.nifi.processors.aws.s3.ListS3 >> >>> ListS3[id=6119854d-015d-1000-341f-b294838980af] Returning CLUSTER >> >>> State: >> >>> StandardStateMap[version=-1, values={}] >> >>> 2017-07-20 11:39:15,506 INFO [Timer-Driven Process Thread-8] >> >>> org.apache.nifi.processors.aws.s3.ListS3 >> >>> ListS3[id=6119854d-015d-1000-341f-b294838980af] Successfully listed >> >>> S3 >> >>> bucket BUCKETNAME in 54 millis >> >>> 2017-07-20 11:39:15,506 DEBUG [Timer-Driven Process Thread-8] >> >>> org.apache.nifi.processors.aws.s3.ListS3 >> >>> ListS3[id=6119854d-015d-1000-341f-b294838980af] No new objects in S3 >> >>> bucket >> >>> BUCKETNAME to list. Yielding. >> >>> 2017-07-20 11:39:15,506 DEBUG [Timer-Driven Process Thread-8] >> >>> org.apache.nifi.processors.aws.s3.ListS3 >> >>> ListS3[id=6119854d-015d-1000-341f-b294838980af] has chosen to yield >> >>> its >> >>> resources; will not be scheduled to run again for 1000 milliseconds >> >>> >> >>> My S3 log structure is: >> >>> >> >>> BUCKETNAME/AWSLogs/ARN/CloudTrail-Digest/ap-northeast-1/2017/07/03/ >> 869964652807_CloudTrail-Digest_ap-northeast-1_cloudtrail-orca_us-west-2_ >> 20170703T192938Z.json.gz >> >>> >> >>> Any idea why it would not recurse into the BUCKETNAME? >> >>> >> >>> On 2017-07-20 09:31, Laurens Vets wrote: >> >>> >> >>> There's no state currently, ie state is empty. >> >>> >> >>> I would think that when there's no state, ListS3 would start from the >> >>> beginning? >> >>> >> >>> FYI, the only items I've filled in in the ListS3 processor are: >> >>> >> >>> - Bucket: Our bucketname. >> >>> >> >>> - Region: Apparently I have to choose one, this is set to us-west-2 >> >>> >> >>> - Access Key: <set> >> >>> >> >>> - Secret Key: <set> >> >>> >> >>> I'm pretty sure the above settings are correct because when I do "aws >> >>> s3 ls >> >>> s3://<bucketname>" with the above keys, I do get output. >> >>> >> >>> On 2017-07-20 09:18, Pierre Villard wrote: >> >>> >> >>> Can you check what's the current state of the processor? (right click >> >>> / view >> >>> state) >> >>> Are you sure there is data to retrieve more recent that what is >> >>> currently in >> >>> the processor's state? >> >>> >> >>> Pierre >> >>> >> >>> 2017-07-20 18:16 GMT+02:00 Laurens Vets <[email protected]>: >> >>>> >> >>>> I'm running 1.3.0 at the moment... I'm tempted to go back to 1.2.0 >> >>>> as I >> >>>> remember I got something working with S3. >> >>>> >> >>>> Can I just downgrade? >> >>>> >> >>>> On 2017-07-20 09:12, Adam Lamar wrote: >> >>>> >> >>>> Hi Laurens, >> >>>> >> >>>> What NiFi version are you running? There was an issue where ListS3 >> >>>> would >> >>>> spin like that on buckets with many files, but it was fixed in >> >>>> version 1.1.0 >> >>>> IIRC. >> >>>> >> >>>> Hope that helps, >> >>>> Adam >> >>>> >> >>>> >> >>>> On Thu, Jul 20, 2017 at 10:05 AM, Laurens Vets <[email protected]> >> >>>> wrote: >> >>>>> >> >>>>> Hello, >> >>>>> >> >>>>> I'm trying to ingest AWS CloudTrail logs with NiFi. I think I >> >>>>> configured >> >>>>> ListS3 correctly, but it has been running for hours & hours without >> >>>>> showing >> >>>>> anything (except for the # of tasks). >> >>>>> >> >>>>> How long does it take before I should see _any_ >> >>>>> output/state/something in >> >>>>> the ListS3 processor? >> >>>> >> >>>> >> >>> >> >>> > > >
