[GitHub] nifi issue #2158: NIFI-4360 Adding support for ADLS Processors. Feature incl...
Github user milanchandna commented on the issue: https://github.com/apache/nifi/pull/2158 @jzonthemtn @jzonthemtn @pvillard31 Please review. ---
[GitHub] nifi issue #2158: NIFI-4360 Adding support for ADLS Processors. Feature incl...
Github user milanchandna commented on the issue: https://github.com/apache/nifi/pull/2158 Can I please get some feedback here. Thanks. ---
[GitHub] nifi issue #2158: NIFI-4360 Adding support for ADLS Processors. Feature incl...
Github user milanchandna commented on the issue: https://github.com/apache/nifi/pull/2158 @joewitt, @jzonthemtn @pvillard31 - Hey, did you guys got a chance to look at this? ---
[GitHub] nifi issue #2180: Added GetMongoAggregation to support running Mongo aggrega...
Github user milanchandna commented on the issue: https://github.com/apache/nifi/pull/2180 Yes I reviewed, changes looks good. But I am myself a fresh contributor so IMO you should wait for expert review to get this merged. ---
[GitHub] nifi pull request #2158: NIFI-4360 Adding support for ADLS Processors. Featu...
Github user milanchandna commented on a diff in the pull request: https://github.com/apache/nifi/pull/2158#discussion_r143422216 --- Diff: nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/adls/ListADLSFile.java --- @@ -0,0 +1,289 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.adls; + +import com.microsoft.azure.datalake.store.ADLStoreClient; +import com.microsoft.azure.datalake.store.DirectoryEntry; +import com.microsoft.azure.datalake.store.DirectoryEntryType; +import org.apache.nifi.annotation.behavior.*; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.state.Scope; +import org.apache.nifi.components.state.StateMap; +import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessorInitializationContext; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; + +import java.io.IOException; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.*; +import java.util.concurrent.TimeUnit; + +import static org.apache.nifi.processors.adls.ADLSConstants.ACCOUNT_NAME; +import static org.apache.nifi.processors.adls.ADLSConstants.REL_SUCCESS; + +@TriggerSerially +@TriggerWhenEmpty +@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN) +@Tags({"hadoop", "ADLS", "get", "list", "ingest", "ingress", "source", "filesystem"}) --- End diff -- done. ---
[GitHub] nifi pull request #2158: NIFI-4360 Adding support for ADLS Processors. Featu...
Github user milanchandna commented on a diff in the pull request: https://github.com/apache/nifi/pull/2158#discussion_r143404820 --- Diff: nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/adls/ListADLSFile.java --- @@ -0,0 +1,289 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.adls; + +import com.microsoft.azure.datalake.store.ADLStoreClient; +import com.microsoft.azure.datalake.store.DirectoryEntry; +import com.microsoft.azure.datalake.store.DirectoryEntryType; +import org.apache.nifi.annotation.behavior.*; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.state.Scope; +import org.apache.nifi.components.state.StateMap; +import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessorInitializationContext; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; + +import java.io.IOException; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.*; +import java.util.concurrent.TimeUnit; + +import static org.apache.nifi.processors.adls.ADLSConstants.ACCOUNT_NAME; +import static org.apache.nifi.processors.adls.ADLSConstants.REL_SUCCESS; + +@TriggerSerially +@TriggerWhenEmpty +@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN) +@Tags({"hadoop", "ADLS", "get", "list", "ingest", "ingress", "source", "filesystem"}) +@CapabilityDescription("Retrieves a listing of files from ADLS. For each file that is " ++ "listed in ADLS, this processor creates a FlowFile that represents the ADLS file to be fetched in conjunction with FetchADLSFile. This Processor is " ++ "designed to run on Primary Node only in a cluster. If the primary node changes, the new Primary Node will pick up where the previous node left " ++ "off without duplicating all of the data.") +@WritesAttributes({ +@WritesAttribute(attribute="filename", description="The name of the file that was read from ADLS"), +@WritesAttribute(attribute="path", description="The path is set to the path of the file on ADLS"), +@WritesAttribute(attribute="adls.owner", description="The user that owns the file"), +@WritesAttribute(attribute="adls.group", description="The group that owns the file"), +@WritesAttribute(attribute="adls.lastModified", description="The timestamp of when the file was last modified, as milliseconds since midnight Jan 1, 1970 UTC"), +@WritesAttribute(attribute="adls.length", description="The number of bytes in the file"), +@WritesAttribute(attribute="adls.expirytime", description="The number of bytes in the file"), +@WritesAttribute(attribute="adls.permissions", description="The permissions for the file in ADLS. This is formatted as 3 characters for the owner, " ++ "3 for the group, and 3 for other users. For example rw-rw-r--") +}) +@Stateful(scopes = Scope.CLUSTER, description = "After performing a listing of ADLS files, the latest timestamp of all the files transferred is stored. " ++ "This allows the Processor to list only files that have been added or modified afte
[GitHub] nifi pull request #2158: NIFI-4360 Adding support for ADLS Processors. Featu...
Github user milanchandna commented on a diff in the pull request: https://github.com/apache/nifi/pull/2158#discussion_r143404785 --- Diff: nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/adls/ListADLSFile.java --- @@ -0,0 +1,289 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.adls; + +import com.microsoft.azure.datalake.store.ADLStoreClient; +import com.microsoft.azure.datalake.store.DirectoryEntry; +import com.microsoft.azure.datalake.store.DirectoryEntryType; +import org.apache.nifi.annotation.behavior.*; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.state.Scope; +import org.apache.nifi.components.state.StateMap; +import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessorInitializationContext; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; + +import java.io.IOException; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.*; +import java.util.concurrent.TimeUnit; + +import static org.apache.nifi.processors.adls.ADLSConstants.ACCOUNT_NAME; +import static org.apache.nifi.processors.adls.ADLSConstants.REL_SUCCESS; + +@TriggerSerially +@TriggerWhenEmpty +@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN) +@Tags({"hadoop", "ADLS", "get", "list", "ingest", "ingress", "source", "filesystem"}) --- End diff -- done. ---
[GitHub] nifi pull request #2158: NIFI-4360 Adding support for ADLS Processors. Featu...
Github user milanchandna commented on a diff in the pull request: https://github.com/apache/nifi/pull/2158#discussion_r143404854 --- Diff: nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/adls/PutADLSFile.java --- @@ -0,0 +1,267 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.adls; + +import com.microsoft.azure.datalake.store.ADLFileOutputStream; +import com.microsoft.azure.datalake.store.ADLStoreClient; +import com.microsoft.azure.datalake.store.IfExists; +import com.microsoft.azure.datalake.store.acl.AclEntry; +import org.apache.nifi.annotation.behavior.*; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessorInitializationContext; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.util.StopWatch; +import org.apache.nifi.util.StringUtils; + +import java.io.IOException; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +import static org.apache.nifi.processors.adls.ADLSConstants.*; + +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@Tags({"hadoop", "ADLS", "put", "copy", "filesystem", "restricted"}) --- End diff -- done. ---
[GitHub] nifi pull request #2158: NIFI-4360 Adding support for ADLS Processors. Featu...
Github user milanchandna commented on a diff in the pull request: https://github.com/apache/nifi/pull/2158#discussion_r143404758 --- Diff: nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/adls/ADLSConstants.java --- @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.adls; + +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.util.StandardValidators; + + +public class ADLSConstants { + +public static final int CHUNK_SIZE_IN_BYTES = 400; + +public static final PropertyDescriptor PATH_NAME = new PropertyDescriptor.Builder() +.name("Path") +.description("Path for file in Azure Data Lake, e.g. /adlshome/") +.required(true) +.expressionLanguageSupported(true) +.addValidator(StandardValidators.NON_EMPTY_VALIDATOR) +.build(); + +public static final PropertyDescriptor RETRY_ON_FAIL = new PropertyDescriptor.Builder() +.name("Overwrite policy") +.description("How many times to retry if read fails per chunk read, defaults to 3") +.required(true) +//TODO add Integer validator --- End diff -- property wasn't being used anymore, removed it. ---
[GitHub] nifi pull request #2180: Added GetMongoAggregation to support running Mongo ...
Github user milanchandna commented on a diff in the pull request: https://github.com/apache/nifi/pull/2180#discussion_r142060479 --- Diff: nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongoAggregation.java --- @@ -0,0 +1,174 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.nifi.processors.mongodb; + +import com.mongodb.BasicDBObject; +import com.mongodb.client.AggregateIterable; +import com.mongodb.client.MongoCollection; +import com.mongodb.client.MongoCursor; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.components.Validator; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.bson.conversions.Bson; +import org.codehaus.jackson.map.ObjectMapper; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +@Tags({"mongo", "aggregation", "aggregate"}) +@CapabilityDescription("A processor that runs an aggregation query at user-defined intervals.") +public class GetMongoAggregation extends AbstractMongoProcessor { + +private final static Set relationships; +private final static List propertyDescriptors; +static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("All files are routed to success").build(); --- End diff -- Can REL_SUCCESS be used from AbstractMongoProcessor ---
[GitHub] nifi pull request #2180: Added GetMongoAggregation to support running Mongo ...
Github user milanchandna commented on a diff in the pull request: https://github.com/apache/nifi/pull/2180#discussion_r142060366 --- Diff: nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/AbstractMongoProcessor.java --- @@ -102,8 +107,26 @@ .defaultValue(WRITE_CONCERN_ACKNOWLEDGED) .build(); +static final PropertyDescriptor RESULTS_PER_FLOWFILE = new PropertyDescriptor.Builder() +.name("results-per-flowfile") +.displayName("Results Per FlowFile") +.description("How many results to put into a flowfile at once. The whole body will be treated as a JSON array of results.") +.required(false) +.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) +.defaultValue("1") +.build(); + +static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder() +.name("Batch Size") --- End diff -- Generally it's preferred to add a displayname and make name without whitespaces, like "batch-size" ---
[GitHub] nifi pull request #2180: Added GetMongoAggregation to support running Mongo ...
Github user milanchandna commented on a diff in the pull request: https://github.com/apache/nifi/pull/2180#discussion_r142060410 --- Diff: nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/AbstractMongoProcessor.java --- @@ -102,8 +107,26 @@ .defaultValue(WRITE_CONCERN_ACKNOWLEDGED) .build(); +static final PropertyDescriptor RESULTS_PER_FLOWFILE = new PropertyDescriptor.Builder() +.name("results-per-flowfile") +.displayName("Results Per FlowFile") +.description("How many results to put into a flowfile at once. The whole body will be treated as a JSON array of results.") +.required(false) +.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) +.defaultValue("1") +.build(); + +static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder() +.name("Batch Size") +.description("The number of elements returned from the server in one batch") +.required(false) --- End diff -- Do you want to add default value as 1, can get rid of null check in onTrigger method by doing this. ---
[GitHub] nifi pull request #2180: Added GetMongoAggregation to support running Mongo ...
Github user milanchandna commented on a diff in the pull request: https://github.com/apache/nifi/pull/2180#discussion_r142060476 --- Diff: nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/AbstractMongoProcessor.java --- @@ -221,4 +244,17 @@ protected WriteConcern getWriteConcern(final ProcessContext context) { } return writeConcern; } + +protected void writeBatch(String payload, ProcessContext context, ProcessSession session) { +FlowFile flowFile = session.create(); +flowFile = session.write(flowFile, new OutputStreamCallback() { +@Override +public void process(OutputStream out) throws IOException { +out.write(payload.getBytes("UTF-8")); --- End diff -- session.importFrom method can save you from callback. String payload can be converted to byte Input Stream and provided to importFrom method. ---
[GitHub] nifi pull request #2158: NIFI-4360 Adding support for ADLS Processors. Featu...
Github user milanchandna commented on a diff in the pull request: https://github.com/apache/nifi/pull/2158#discussion_r142037960 --- Diff: nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/adls/PutADLSFile.java --- @@ -0,0 +1,282 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.adls; + +import com.microsoft.azure.datalake.store.ADLFileOutputStream; +import com.microsoft.azure.datalake.store.ADLStoreClient; +import com.microsoft.azure.datalake.store.IfExists; +import com.microsoft.azure.datalake.store.acl.AclEntry; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.ReadsAttribute; +import org.apache.nifi.annotation.behavior.ReadsAttributes; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.behavior.Restricted; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessorInitializationContext; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.util.StopWatch; +import org.apache.nifi.util.StringUtils; + +import java.io.IOException; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +import static org.apache.nifi.processors.adls.ADLSConstants.ERR_ACL_ENTRY; +import static org.apache.nifi.processors.adls.ADLSConstants.ERR_FLOWFILE_CORE_ATTR_FILENAME; +import static org.apache.nifi.processors.adls.ADLSConstants.REL_FAILURE; +import static org.apache.nifi.processors.adls.ADLSConstants.FILE_NAME_ATTRIBUTE; +import static org.apache.nifi.processors.adls.ADLSConstants.CHUNK_SIZE_IN_BYTES; +import static org.apache.nifi.processors.adls.ADLSConstants.ADLS_FILE_PATH_ATTRIBUTE; +import static org.apache.nifi.processors.adls.ADLSConstants.REL_SUCCESS; + +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@Tags({"azure", "hadoop", "ADLS", "put", "egress", "copy", "filesystem", "restricted"}) +@CapabilityDescription("Write FlowFile data to Azure Data Lake Store (ADLS)") +@SeeAlso({ListADLSFile.class, FetchADLSFile.class}) +@ReadsAttributes({ +@ReadsAttribute(attribute = "filename", description = "The name of the file written to ADLS comes from the value of this attribute."), +}) +@WritesAttributes({ +@WritesAttribute(attribute = "filename", description = "The name of the file written to ADLS is stored in this attribute."), +@WritesAttribute(attribute = "absolute.adls.path", description = "The absolute path to the file on ADLS is stored in this attribute.") +}) +@Restricted("Provides operator the ability to write to any file that NiFi has access to in ADLS.") +public class PutADLSFile extends ADLSAbstractProcessor { + +private static final String PART_FILE_EXTENSION = ".nifipart"; +private static final String REPLACE_RESOLUTION = "replace"; +private static final String FAIL_RES
[GitHub] nifi pull request #2158: NIFI-4360 Adding support for ADLS Processors. Featu...
Github user milanchandna commented on a diff in the pull request: https://github.com/apache/nifi/pull/2158#discussion_r140503820 --- Diff: nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/adls/TestPutADLSFile.java --- @@ -0,0 +1,522 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.adls; + +import com.microsoft.azure.datalake.store.ADLStoreClient; +import com.microsoft.azure.datalake.store.ADLStoreOptions; +import okhttp3.mockwebserver.MockResponse; +import okhttp3.mockwebserver.MockWebServer; +import okhttp3.mockwebserver.QueueDispatcher; +import okhttp3.mockwebserver.RecordedRequest; +import org.apache.nifi.processor.Processor; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.hamcrest.CoreMatchers; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.nio.file.Paths; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.TimeUnit; + + +public class TestPutADLSFile { + +private MockWebServer server; +private ADLStoreClient client; +private Processor processor; +private TestRunner runner; +Map<String, String> fileAttributes; + +@Before +public void init() throws IOException, InitializationException { +server = new MockWebServer(); +QueueDispatcher dispatcher = new QueueDispatcher(); +dispatcher.setFailFast(new MockResponse().setResponseCode(400)); +server.setDispatcher(dispatcher); +server.start(); +String accountFQDN = server.getHostName() + ":" + server.getPort(); +String dummyToken = "testDummyAadToken"; + +client = ADLStoreClient.createClient(accountFQDN, dummyToken); +client.setOptions(new ADLStoreOptions().setInsecureTransport()); + +processor = new PutADLSWithMockClient(client); + +runner = TestRunners.newTestRunner(processor); + +runner.setProperty(ADLSConstants.ACCOUNT_NAME, accountFQDN); +runner.setProperty(ADLSConstants.CLIENT_ID, "foobar"); +runner.setProperty(ADLSConstants.CLIENT_SECRET, "foobar"); +runner.setProperty(ADLSConstants.AUTH_TOKEN_ENDPOINT, "foobar"); +runner.setProperty(PutADLSFile.DIRECTORY, "/sample/"); +runner.setProperty(PutADLSFile.CONFLICT_RESOLUTION, PutADLSFile.FAIL_RESOLUTION_AV); +runner.removeProperty(PutADLSFile.UMASK); +runner.removeProperty(PutADLSFile.ACL); + +fileAttributes = new HashMap<>(); +fileAttributes.put("filename", "sample.txt"); +fileAttributes.put("path", "/root/"); +} + +@Test +public void testPutConflictFail() throws IOException, InterruptedException { + +//for create call +server.enqueue(new MockResponse().setResponseCode(200)); +//for append call(internal call while writing to stream) +server.enqueue(new MockResponse().setResponseCode(200)); +//for rename call(since its fail conflict) +server.enqueue(new MockResponse().setResponseCode(200).setBody("{\"boolean\":true}")); +//for delete call(removing temp file) +server.enqueue(new MockResponse().setResponseCode(200)); + +//String bodySimpleFileContent = new String(Files.readAllBytes(Paths.get(bodySimpleFileName))); --- End diff -- yes, removed. ---
[GitHub] nifi pull request #2158: NIFI-4360 Adding support for ADLS Processors. Featu...
Github user milanchandna commented on a diff in the pull request: https://github.com/apache/nifi/pull/2158#discussion_r140503615 --- Diff: nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/adls/PutADLSFile.java --- @@ -0,0 +1,267 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.adls; + +import com.microsoft.azure.datalake.store.ADLFileOutputStream; +import com.microsoft.azure.datalake.store.ADLStoreClient; +import com.microsoft.azure.datalake.store.IfExists; +import com.microsoft.azure.datalake.store.acl.AclEntry; +import org.apache.nifi.annotation.behavior.*; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessorInitializationContext; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.util.StopWatch; +import org.apache.nifi.util.StringUtils; + +import java.io.IOException; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +import static org.apache.nifi.processors.adls.ADLSConstants.*; + +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@Tags({"hadoop", "ADLS", "put", "copy", "filesystem", "restricted"}) +@CapabilityDescription("Write FlowFile data to Azure Data Lake Store (ADLS)") +@ReadsAttributes({ +@ReadsAttribute(attribute = "filename", description = "The name of the file written to ADLS comes from the value of this attribute."), +@ReadsAttribute(attribute = "filepath", description = "The relative path to the file on ADLS comes from the value of this attribute.") +}) +@WritesAttributes({ +@WritesAttribute(attribute = "filename", description = "The name of the file written to ADLS is stored in this attribute."), +@WritesAttribute(attribute = "absolute.adls.path", description = "The absolute path to the file on ADLS is stored in this attribute.") +}) +@Restricted("Provides operator the ability to write to any file that NiFi has access to in ADLS.") +public class PutADLSFile extends ADLSAbstractProcessor { + +private static final String PART_FILE_EXTENSION = ".nifipart"; +private static final String REPLACE_RESOLUTION = "replace"; +private static final String FAIL_RESOLUTION = "fail"; +private static final String APPEND_RESOLUTION = "append"; + +protected static final AllowableValue REPLACE_RESOLUTION_AV = new AllowableValue(REPLACE_RESOLUTION, +REPLACE_RESOLUTION, "Replaces the existing file if any."); +protected static final AllowableValue FAIL_RESOLUTION_AV = new AllowableValue(FAIL_RESOLUTION, FAIL_RESOLUTION, +"Penalizes the flow file and routes it to failure."); +protected static final AllowableValue APPEND_RESOLUTION_AV = new AllowableValue(APPEND_RESOLUTION, APPEND_RESOLUTION, +"Appends to the existing file if any, creates a new file otherwise."); + +// properties +protected static final PropertyDescriptor CONFLICT_RESOLUTION = new PropertyDescriptor.Builder()
[GitHub] nifi pull request #2158: NIFI-4360 Adding support for ADLS Processors. Featu...
Github user milanchandna commented on a diff in the pull request: https://github.com/apache/nifi/pull/2158#discussion_r140503030 --- Diff: nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/adls/ADLSConstants.java --- @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.adls; + +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.util.StandardValidators; + + +public class ADLSConstants { + +public static final int CHUNK_SIZE_IN_BYTES = 400; + +public static final PropertyDescriptor PATH_NAME = new PropertyDescriptor.Builder() +.name("Path") +.description("Path for file in Azure Data Lake, e.g. /adlshome/") +.required(true) +.expressionLanguageSupported(true) +.addValidator(StandardValidators.NON_EMPTY_VALIDATOR) +.build(); + +public static final PropertyDescriptor RETRY_ON_FAIL = new PropertyDescriptor.Builder() +.name("Overwrite policy") +.description("How many times to retry if read fails per chunk read, defaults to 3") +.required(true) +//TODO add Integer validator +.defaultValue("3") +.build(); + +public static final PropertyDescriptor ACCOUNT_NAME = new PropertyDescriptor.Builder() +.name("Account FQDN") +.description("Azure account fully qualified domain name eg: accountname.azuredatalakestore.net") +.required(true) +.addValidator(StandardValidators.NON_EMPTY_VALIDATOR) +.build(); + +public static final PropertyDescriptor CLIENT_ID = new PropertyDescriptor.Builder() +.name("Client ID") +.description("Azure client ID") +.required(true) +.addValidator(StandardValidators.NON_EMPTY_VALIDATOR) +.build(); + +public static final PropertyDescriptor CLIENT_SECRET = new PropertyDescriptor.Builder() +.name("Client secret") +.description("Azure client secret") +.required(true) +.sensitive(true) +.addValidator(StandardValidators.NON_EMPTY_VALIDATOR) +.build(); + +public static final PropertyDescriptor AUTH_TOKEN_ENDPOINT = new PropertyDescriptor.Builder() +.name("Auth token endpoint") +.description("Azure client secret") --- End diff -- Removed it, wasn't being used. ---
[GitHub] nifi pull request #2158: NIFI-4360 Adding support for ADLS Processors. Featu...
Github user milanchandna commented on a diff in the pull request: https://github.com/apache/nifi/pull/2158#discussion_r140502906 --- Diff: nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/adls/ADLSConstants.java --- @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.adls; + +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.util.StandardValidators; + + +public class ADLSConstants { + +public static final int CHUNK_SIZE_IN_BYTES = 400; + +public static final PropertyDescriptor PATH_NAME = new PropertyDescriptor.Builder() +.name("Path") --- End diff -- Used name and displayname properly wherever applicable. ---
[GitHub] nifi issue #2158: NIFI-4360 Adding support for ADLS Processors. Feature incl...
Github user milanchandna commented on the issue: https://github.com/apache/nifi/pull/2158 Hey @pvillard31 Thanks for comments, looking into them. Currently when I am running full build, I am getting failures in TestMinimalLockingWriteAheadLog. testRecoverFileThatHasTrailingNULBytesAndTruncation(org.wali.TestMinimalLockingWriteAheadLog) And because of this not able to reliably detect failures in my own module. Any suggestion to skip the above failures? -Milan. ---
[GitHub] nifi issue #2158: NIFI-4360 Adding support for ADLS Processors. Feature incl...
Github user milanchandna commented on the issue: https://github.com/apache/nifi/pull/2158 Removed the test resource files(as there were license issues) and resolved their dependencies. And there were environment specific characters in test cases. Got rid of them. Please check now. ---
[GitHub] nifi issue #2158: NIFI-4360 Adding support for ADLS Processors. Feature incl...
Github user milanchandna commented on the issue: https://github.com/apache/nifi/pull/2158 Thanks @joewitt These test resource files contains Lorem Ipsum text. But anyways I will create my own if required but dont want to delete as they are required for an important test case. Thanks. ---
[GitHub] nifi pull request #2158: NIFI-4360 Adding support for ADLS Processors. Featu...
GitHub user milanchandna opened a pull request: https://github.com/apache/nifi/pull/2158 NIFI-4360 Adding support for ADLS Processors. Feature includes List, ⦠â¦Get, Put processors. Till now ADLS interaction was possible using HDFS processors. Now users can ingress and egress their data directly using ADLS processors. It is much simpler and easy. And one lesser layer to go through as well. This will also help users who are not familiar with Hadoop configurations. Thank you for submitting a contribution to Apache NiFi. In order to streamline the review of the contribution we ask you to ensure the following steps have been taken: ### For all changes: - [x] Is there a JIRA ticket associated with this PR? Is it referenced in the commit message? - [x] Does your PR title start with NIFI- where is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character. - [x] Has your PR been rebased against the latest commit within the target branch (typically master)? - [x] Is your initial contribution a single, squashed commit? ### For code changes: - [ ] Have you ensured that the full suite of tests is executed via mvn -Pcontrib-check clean install at the root nifi folder? - There were failures in other modules testRecoverFileThatHasTrailingNULBytesAndTruncation(org.wali.TestMinimalLockingWriteAheadLog) - [x] Have you written or updated unit tests to verify your changes? - [x] If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under [ASF 2.0](http://www.apache.org/legal/resolved.html#category-a)? - [x] If applicable, have you updated the LICENSE file, including the main LICENSE file under nifi-assembly? - [ ] If applicable, have you updated the NOTICE file, including the main NOTICE file found under nifi-assembly? - [ ] If adding new Properties, have you added .displayName in addition to .name (programmatic access) for each of the new properties? ### For documentation related changes: - [x] Have you ensured that format looks appropriate for the output in which it is rendered? ### Note: Please ensure that once the PR is submitted, you check travis-ci for build issues and submit an update to your PR as soon as possible. You can merge this pull request into a Git repository by running: $ git pull https://github.com/milanchandna/nifi NIFI-4360 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/nifi/pull/2158.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2158 commit 4f449c55c99868bd8b96bc370186503f4928c11f Author: Milan Chandna <milan.chan...@gmail.com> Date: 2017-09-18T07:00:17Z NIFI-4360 Adding support for ADLS Processors. Feature includes List, Get, Put processors. ---