[GitHub] nifi issue #2158: NIFI-4360 Adding support for ADLS Processors. Feature incl...

2018-03-21 Thread milanchandna
Github user milanchandna commented on the issue:

https://github.com/apache/nifi/pull/2158
  
@jzonthemtn @jzonthemtn @pvillard31 Please review.


---


[GitHub] nifi issue #2158: NIFI-4360 Adding support for ADLS Processors. Feature incl...

2018-03-21 Thread milanchandna
Github user milanchandna commented on the issue:

https://github.com/apache/nifi/pull/2158
  
Can I please get some feedback here. Thanks.


---


[GitHub] nifi issue #2158: NIFI-4360 Adding support for ADLS Processors. Feature incl...

2017-11-15 Thread milanchandna
Github user milanchandna commented on the issue:

https://github.com/apache/nifi/pull/2158
  
@joewitt, @jzonthemtn @pvillard31 - Hey, did you guys got a chance to look 
at this?


---


[GitHub] nifi issue #2180: Added GetMongoAggregation to support running Mongo aggrega...

2017-10-30 Thread milanchandna
Github user milanchandna commented on the issue:

https://github.com/apache/nifi/pull/2180
  
Yes I reviewed, changes looks good. But I am myself a fresh contributor so 
IMO you should wait for expert review to get this merged.


---


[GitHub] nifi pull request #2158: NIFI-4360 Adding support for ADLS Processors. Featu...

2017-10-09 Thread milanchandna
Github user milanchandna commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2158#discussion_r143422216
  
--- Diff: 
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/adls/ListADLSFile.java
 ---
@@ -0,0 +1,289 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.adls;
+
+import com.microsoft.azure.datalake.store.ADLStoreClient;
+import com.microsoft.azure.datalake.store.DirectoryEntry;
+import com.microsoft.azure.datalake.store.DirectoryEntryType;
+import org.apache.nifi.annotation.behavior.*;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.components.state.StateMap;
+import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+
+import java.io.IOException;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.*;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.nifi.processors.adls.ADLSConstants.ACCOUNT_NAME;
+import static org.apache.nifi.processors.adls.ADLSConstants.REL_SUCCESS;
+
+@TriggerSerially
+@TriggerWhenEmpty
+@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
+@Tags({"hadoop", "ADLS", "get", "list", "ingest", "ingress", "source", 
"filesystem"})
--- End diff --

done.


---


[GitHub] nifi pull request #2158: NIFI-4360 Adding support for ADLS Processors. Featu...

2017-10-09 Thread milanchandna
Github user milanchandna commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2158#discussion_r143404820
  
--- Diff: 
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/adls/ListADLSFile.java
 ---
@@ -0,0 +1,289 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.adls;
+
+import com.microsoft.azure.datalake.store.ADLStoreClient;
+import com.microsoft.azure.datalake.store.DirectoryEntry;
+import com.microsoft.azure.datalake.store.DirectoryEntryType;
+import org.apache.nifi.annotation.behavior.*;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.components.state.StateMap;
+import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+
+import java.io.IOException;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.*;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.nifi.processors.adls.ADLSConstants.ACCOUNT_NAME;
+import static org.apache.nifi.processors.adls.ADLSConstants.REL_SUCCESS;
+
+@TriggerSerially
+@TriggerWhenEmpty
+@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
+@Tags({"hadoop", "ADLS", "get", "list", "ingest", "ingress", "source", 
"filesystem"})
+@CapabilityDescription("Retrieves a listing of files from ADLS. For each 
file that is "
++ "listed in ADLS, this processor creates a FlowFile that 
represents the ADLS file to be fetched in conjunction with FetchADLSFile. This 
Processor is "
++  "designed to run on Primary Node only in a cluster. If the 
primary node changes, the new Primary Node will pick up where the previous node 
left "
++  "off without duplicating all of the data.")
+@WritesAttributes({
+@WritesAttribute(attribute="filename", description="The name of 
the file that was read from ADLS"),
+@WritesAttribute(attribute="path", description="The path is set to 
the path of the file on ADLS"),
+@WritesAttribute(attribute="adls.owner", description="The user 
that owns the file"),
+@WritesAttribute(attribute="adls.group", description="The group 
that owns the file"),
+@WritesAttribute(attribute="adls.lastModified", description="The 
timestamp of when the file was last modified, as milliseconds since midnight 
Jan 1, 1970 UTC"),
+@WritesAttribute(attribute="adls.length", description="The number 
of bytes in the file"),
+@WritesAttribute(attribute="adls.expirytime", description="The 
number of bytes in the file"),
+@WritesAttribute(attribute="adls.permissions", description="The 
permissions for the file in ADLS. This is formatted as 3 characters for the 
owner, "
++ "3 for the group, and 3 for other users. For example 
rw-rw-r--")
+})
+@Stateful(scopes = Scope.CLUSTER, description = "After performing a 
listing of ADLS files, the latest timestamp of all the files transferred is 
stored. "
++ "This allows the Processor to list only files that have been 
added or modified afte

[GitHub] nifi pull request #2158: NIFI-4360 Adding support for ADLS Processors. Featu...

2017-10-09 Thread milanchandna
Github user milanchandna commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2158#discussion_r143404785
  
--- Diff: 
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/adls/ListADLSFile.java
 ---
@@ -0,0 +1,289 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.adls;
+
+import com.microsoft.azure.datalake.store.ADLStoreClient;
+import com.microsoft.azure.datalake.store.DirectoryEntry;
+import com.microsoft.azure.datalake.store.DirectoryEntryType;
+import org.apache.nifi.annotation.behavior.*;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.components.state.StateMap;
+import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+
+import java.io.IOException;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.*;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.nifi.processors.adls.ADLSConstants.ACCOUNT_NAME;
+import static org.apache.nifi.processors.adls.ADLSConstants.REL_SUCCESS;
+
+@TriggerSerially
+@TriggerWhenEmpty
+@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
+@Tags({"hadoop", "ADLS", "get", "list", "ingest", "ingress", "source", 
"filesystem"})
--- End diff --

done.


---


[GitHub] nifi pull request #2158: NIFI-4360 Adding support for ADLS Processors. Featu...

2017-10-09 Thread milanchandna
Github user milanchandna commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2158#discussion_r143404854
  
--- Diff: 
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/adls/PutADLSFile.java
 ---
@@ -0,0 +1,267 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.adls;
+
+import com.microsoft.azure.datalake.store.ADLFileOutputStream;
+import com.microsoft.azure.datalake.store.ADLStoreClient;
+import com.microsoft.azure.datalake.store.IfExists;
+import com.microsoft.azure.datalake.store.acl.AclEntry;
+import org.apache.nifi.annotation.behavior.*;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.util.StopWatch;
+import org.apache.nifi.util.StringUtils;
+
+import java.io.IOException;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import static org.apache.nifi.processors.adls.ADLSConstants.*;
+
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@Tags({"hadoop", "ADLS", "put", "copy", "filesystem", "restricted"})
--- End diff --

done.


---


[GitHub] nifi pull request #2158: NIFI-4360 Adding support for ADLS Processors. Featu...

2017-10-09 Thread milanchandna
Github user milanchandna commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2158#discussion_r143404758
  
--- Diff: 
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/adls/ADLSConstants.java
 ---
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.adls;
+
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.util.StandardValidators;
+
+
+public class ADLSConstants {
+
+public static final int CHUNK_SIZE_IN_BYTES = 400;
+
+public static final PropertyDescriptor PATH_NAME = new 
PropertyDescriptor.Builder()
+.name("Path")
+.description("Path for file in Azure Data Lake, e.g. 
/adlshome/")
+.required(true)
+.expressionLanguageSupported(true)
+.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+.build();
+
+public static final PropertyDescriptor RETRY_ON_FAIL = new 
PropertyDescriptor.Builder()
+.name("Overwrite policy")
+.description("How many times to retry if read fails per chunk 
read, defaults to 3")
+.required(true)
+//TODO add Integer validator
--- End diff --

property wasn't being used anymore, removed it.


---


[GitHub] nifi pull request #2180: Added GetMongoAggregation to support running Mongo ...

2017-10-01 Thread milanchandna
Github user milanchandna commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2180#discussion_r142060479
  
--- Diff: 
nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongoAggregation.java
 ---
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.nifi.processors.mongodb;
+
+import com.mongodb.BasicDBObject;
+import com.mongodb.client.AggregateIterable;
+import com.mongodb.client.MongoCollection;
+import com.mongodb.client.MongoCursor;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.components.Validator;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.bson.conversions.Bson;
+import org.codehaus.jackson.map.ObjectMapper;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+@Tags({"mongo", "aggregation", "aggregate"})
+@CapabilityDescription("A processor that runs an aggregation query at 
user-defined intervals.")
+public class GetMongoAggregation extends AbstractMongoProcessor {
+
+private final static Set relationships;
+private final static List propertyDescriptors;
+static final Relationship REL_SUCCESS = new 
Relationship.Builder().name("success").description("All files are routed to 
success").build();
--- End diff --

Can REL_SUCCESS be used from AbstractMongoProcessor


---


[GitHub] nifi pull request #2180: Added GetMongoAggregation to support running Mongo ...

2017-10-01 Thread milanchandna
Github user milanchandna commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2180#discussion_r142060366
  
--- Diff: 
nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/AbstractMongoProcessor.java
 ---
@@ -102,8 +107,26 @@
 .defaultValue(WRITE_CONCERN_ACKNOWLEDGED)
 .build();
 
+static final PropertyDescriptor RESULTS_PER_FLOWFILE = new 
PropertyDescriptor.Builder()
+.name("results-per-flowfile")
+.displayName("Results Per FlowFile")
+.description("How many results to put into a flowfile at once. 
The whole body will be treated as a JSON array of results.")
+.required(false)
+.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+.defaultValue("1")
+.build();
+
+static final PropertyDescriptor BATCH_SIZE = new 
PropertyDescriptor.Builder()
+.name("Batch Size")
--- End diff --

Generally it's preferred to add a displayname and make name without 
whitespaces, like "batch-size"


---


[GitHub] nifi pull request #2180: Added GetMongoAggregation to support running Mongo ...

2017-10-01 Thread milanchandna
Github user milanchandna commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2180#discussion_r142060410
  
--- Diff: 
nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/AbstractMongoProcessor.java
 ---
@@ -102,8 +107,26 @@
 .defaultValue(WRITE_CONCERN_ACKNOWLEDGED)
 .build();
 
+static final PropertyDescriptor RESULTS_PER_FLOWFILE = new 
PropertyDescriptor.Builder()
+.name("results-per-flowfile")
+.displayName("Results Per FlowFile")
+.description("How many results to put into a flowfile at once. 
The whole body will be treated as a JSON array of results.")
+.required(false)
+.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+.defaultValue("1")
+.build();
+
+static final PropertyDescriptor BATCH_SIZE = new 
PropertyDescriptor.Builder()
+.name("Batch Size")
+.description("The number of elements returned from the server 
in one batch")
+.required(false)
--- End diff --

Do you want to add default value as 1, can get rid of null check in 
onTrigger method by doing this.


---


[GitHub] nifi pull request #2180: Added GetMongoAggregation to support running Mongo ...

2017-10-01 Thread milanchandna
Github user milanchandna commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2180#discussion_r142060476
  
--- Diff: 
nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/AbstractMongoProcessor.java
 ---
@@ -221,4 +244,17 @@ protected WriteConcern getWriteConcern(final 
ProcessContext context) {
 }
 return writeConcern;
 }
+
+protected void writeBatch(String payload, ProcessContext context, 
ProcessSession session) {
+FlowFile flowFile = session.create();
+flowFile = session.write(flowFile, new OutputStreamCallback() {
+@Override
+public void process(OutputStream out) throws IOException {
+out.write(payload.getBytes("UTF-8"));
--- End diff --

session.importFrom method can save you from callback. String payload can be 
converted to byte Input Stream and provided to importFrom method.


---


[GitHub] nifi pull request #2158: NIFI-4360 Adding support for ADLS Processors. Featu...

2017-10-01 Thread milanchandna
Github user milanchandna commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2158#discussion_r142037960
  
--- Diff: 
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/adls/PutADLSFile.java
 ---
@@ -0,0 +1,282 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.adls;
+
+import com.microsoft.azure.datalake.store.ADLFileOutputStream;
+import com.microsoft.azure.datalake.store.ADLStoreClient;
+import com.microsoft.azure.datalake.store.IfExists;
+import com.microsoft.azure.datalake.store.acl.AclEntry;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.ReadsAttribute;
+import org.apache.nifi.annotation.behavior.ReadsAttributes;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.behavior.Restricted;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.util.StopWatch;
+import org.apache.nifi.util.StringUtils;
+
+import java.io.IOException;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import static org.apache.nifi.processors.adls.ADLSConstants.ERR_ACL_ENTRY;
+import static 
org.apache.nifi.processors.adls.ADLSConstants.ERR_FLOWFILE_CORE_ATTR_FILENAME;
+import static org.apache.nifi.processors.adls.ADLSConstants.REL_FAILURE;
+import static 
org.apache.nifi.processors.adls.ADLSConstants.FILE_NAME_ATTRIBUTE;
+import static 
org.apache.nifi.processors.adls.ADLSConstants.CHUNK_SIZE_IN_BYTES;
+import static 
org.apache.nifi.processors.adls.ADLSConstants.ADLS_FILE_PATH_ATTRIBUTE;
+import static org.apache.nifi.processors.adls.ADLSConstants.REL_SUCCESS;
+
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@Tags({"azure", "hadoop", "ADLS", "put", "egress", "copy", "filesystem", 
"restricted"})
+@CapabilityDescription("Write FlowFile data to Azure Data Lake Store 
(ADLS)")
+@SeeAlso({ListADLSFile.class, FetchADLSFile.class})
+@ReadsAttributes({
+@ReadsAttribute(attribute = "filename", description = "The name of 
the file written to ADLS comes from the value of this attribute."),
+})
+@WritesAttributes({
+@WritesAttribute(attribute = "filename", description = "The name 
of the file written to ADLS is stored in this attribute."),
+@WritesAttribute(attribute = "absolute.adls.path", description = 
"The absolute path to the file on ADLS is stored in this attribute.")
+})
+@Restricted("Provides operator the ability to write to any file that NiFi 
has access to in ADLS.")
+public class PutADLSFile extends ADLSAbstractProcessor {
+
+private static final String PART_FILE_EXTENSION = ".nifipart";
+private static final String REPLACE_RESOLUTION = "replace";
+private static final String FAIL_RES

[GitHub] nifi pull request #2158: NIFI-4360 Adding support for ADLS Processors. Featu...

2017-09-22 Thread milanchandna
Github user milanchandna commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2158#discussion_r140503820
  
--- Diff: 
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/adls/TestPutADLSFile.java
 ---
@@ -0,0 +1,522 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.adls;
+
+import com.microsoft.azure.datalake.store.ADLStoreClient;
+import com.microsoft.azure.datalake.store.ADLStoreOptions;
+import okhttp3.mockwebserver.MockResponse;
+import okhttp3.mockwebserver.MockWebServer;
+import okhttp3.mockwebserver.QueueDispatcher;
+import okhttp3.mockwebserver.RecordedRequest;
+import org.apache.nifi.processor.Processor;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.hamcrest.CoreMatchers;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.file.Paths;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+
+public class TestPutADLSFile {
+
+private MockWebServer server;
+private ADLStoreClient client;
+private Processor processor;
+private TestRunner runner;
+Map<String, String> fileAttributes;
+
+@Before
+public void init() throws IOException, InitializationException {
+server = new MockWebServer();
+QueueDispatcher dispatcher = new QueueDispatcher();
+dispatcher.setFailFast(new MockResponse().setResponseCode(400));
+server.setDispatcher(dispatcher);
+server.start();
+String accountFQDN = server.getHostName() + ":" + server.getPort();
+String dummyToken = "testDummyAadToken";
+
+client = ADLStoreClient.createClient(accountFQDN, dummyToken);
+client.setOptions(new ADLStoreOptions().setInsecureTransport());
+
+processor = new PutADLSWithMockClient(client);
+
+runner = TestRunners.newTestRunner(processor);
+
+runner.setProperty(ADLSConstants.ACCOUNT_NAME, accountFQDN);
+runner.setProperty(ADLSConstants.CLIENT_ID, "foobar");
+runner.setProperty(ADLSConstants.CLIENT_SECRET, "foobar");
+runner.setProperty(ADLSConstants.AUTH_TOKEN_ENDPOINT, "foobar");
+runner.setProperty(PutADLSFile.DIRECTORY, "/sample/");
+runner.setProperty(PutADLSFile.CONFLICT_RESOLUTION, 
PutADLSFile.FAIL_RESOLUTION_AV);
+runner.removeProperty(PutADLSFile.UMASK);
+runner.removeProperty(PutADLSFile.ACL);
+
+fileAttributes = new HashMap<>();
+fileAttributes.put("filename", "sample.txt");
+fileAttributes.put("path", "/root/");
+}
+
+@Test
+public void testPutConflictFail() throws IOException, 
InterruptedException {
+
+//for create call
+server.enqueue(new MockResponse().setResponseCode(200));
+//for append call(internal call while writing to stream)
+server.enqueue(new MockResponse().setResponseCode(200));
+//for rename call(since its fail conflict)
+server.enqueue(new 
MockResponse().setResponseCode(200).setBody("{\"boolean\":true}"));
+//for delete call(removing temp file)
+server.enqueue(new MockResponse().setResponseCode(200));
+
+//String bodySimpleFileContent = new 
String(Files.readAllBytes(Paths.get(bodySimpleFileName)));
--- End diff --

yes, removed.


---


[GitHub] nifi pull request #2158: NIFI-4360 Adding support for ADLS Processors. Featu...

2017-09-22 Thread milanchandna
Github user milanchandna commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2158#discussion_r140503615
  
--- Diff: 
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/adls/PutADLSFile.java
 ---
@@ -0,0 +1,267 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.adls;
+
+import com.microsoft.azure.datalake.store.ADLFileOutputStream;
+import com.microsoft.azure.datalake.store.ADLStoreClient;
+import com.microsoft.azure.datalake.store.IfExists;
+import com.microsoft.azure.datalake.store.acl.AclEntry;
+import org.apache.nifi.annotation.behavior.*;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.util.StopWatch;
+import org.apache.nifi.util.StringUtils;
+
+import java.io.IOException;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import static org.apache.nifi.processors.adls.ADLSConstants.*;
+
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@Tags({"hadoop", "ADLS", "put", "copy", "filesystem", "restricted"})
+@CapabilityDescription("Write FlowFile data to Azure Data Lake Store 
(ADLS)")
+@ReadsAttributes({
+@ReadsAttribute(attribute = "filename", description = "The name of 
the file written to ADLS comes from the value of this attribute."),
+@ReadsAttribute(attribute = "filepath", description = "The 
relative path to the file on ADLS comes from the value of this attribute.")
+})
+@WritesAttributes({
+@WritesAttribute(attribute = "filename", description = "The name 
of the file written to ADLS is stored in this attribute."),
+@WritesAttribute(attribute = "absolute.adls.path", description = 
"The absolute path to the file on ADLS is stored in this attribute.")
+})
+@Restricted("Provides operator the ability to write to any file that NiFi 
has access to in ADLS.")
+public class PutADLSFile extends ADLSAbstractProcessor {
+
+private static final String PART_FILE_EXTENSION = ".nifipart";
+private static final String REPLACE_RESOLUTION = "replace";
+private static final String FAIL_RESOLUTION = "fail";
+private static final String APPEND_RESOLUTION = "append";
+
+protected static final AllowableValue REPLACE_RESOLUTION_AV = new 
AllowableValue(REPLACE_RESOLUTION,
+REPLACE_RESOLUTION, "Replaces the existing file if any.");
+protected static final AllowableValue FAIL_RESOLUTION_AV = new 
AllowableValue(FAIL_RESOLUTION, FAIL_RESOLUTION,
+"Penalizes the flow file and routes it to failure.");
+protected static final AllowableValue APPEND_RESOLUTION_AV = new 
AllowableValue(APPEND_RESOLUTION, APPEND_RESOLUTION,
+"Appends to the existing file if any, creates a new file 
otherwise.");
+
+// properties
+protected static final PropertyDescriptor CONFLICT_RESOLUTION = new 
PropertyDescriptor.Builder()

[GitHub] nifi pull request #2158: NIFI-4360 Adding support for ADLS Processors. Featu...

2017-09-22 Thread milanchandna
Github user milanchandna commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2158#discussion_r140503030
  
--- Diff: 
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/adls/ADLSConstants.java
 ---
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.adls;
+
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.util.StandardValidators;
+
+
+public class ADLSConstants {
+
+public static final int CHUNK_SIZE_IN_BYTES = 400;
+
+public static final PropertyDescriptor PATH_NAME = new 
PropertyDescriptor.Builder()
+.name("Path")
+.description("Path for file in Azure Data Lake, e.g. 
/adlshome/")
+.required(true)
+.expressionLanguageSupported(true)
+.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+.build();
+
+public static final PropertyDescriptor RETRY_ON_FAIL = new 
PropertyDescriptor.Builder()
+.name("Overwrite policy")
+.description("How many times to retry if read fails per chunk 
read, defaults to 3")
+.required(true)
+//TODO add Integer validator
+.defaultValue("3")
+.build();
+
+public static final PropertyDescriptor ACCOUNT_NAME = new 
PropertyDescriptor.Builder()
+.name("Account FQDN")
+.description("Azure account fully qualified domain name eg: 
accountname.azuredatalakestore.net")
+.required(true)
+.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+.build();
+
+public static final PropertyDescriptor CLIENT_ID = new 
PropertyDescriptor.Builder()
+.name("Client ID")
+.description("Azure client ID")
+.required(true)
+.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+.build();
+
+public static final PropertyDescriptor CLIENT_SECRET = new 
PropertyDescriptor.Builder()
+.name("Client secret")
+.description("Azure client secret")
+.required(true)
+.sensitive(true)
+.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+.build();
+
+public static final PropertyDescriptor AUTH_TOKEN_ENDPOINT = new 
PropertyDescriptor.Builder()
+.name("Auth token endpoint")
+.description("Azure client secret")
--- End diff --

Removed it, wasn't being used.


---


[GitHub] nifi pull request #2158: NIFI-4360 Adding support for ADLS Processors. Featu...

2017-09-22 Thread milanchandna
Github user milanchandna commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2158#discussion_r140502906
  
--- Diff: 
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/adls/ADLSConstants.java
 ---
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.adls;
+
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.util.StandardValidators;
+
+
+public class ADLSConstants {
+
+public static final int CHUNK_SIZE_IN_BYTES = 400;
+
+public static final PropertyDescriptor PATH_NAME = new 
PropertyDescriptor.Builder()
+.name("Path")
--- End diff --

Used name and displayname properly wherever applicable.


---


[GitHub] nifi issue #2158: NIFI-4360 Adding support for ADLS Processors. Feature incl...

2017-09-22 Thread milanchandna
Github user milanchandna commented on the issue:

https://github.com/apache/nifi/pull/2158
  
Hey @pvillard31 Thanks for comments, looking into them.
Currently when I am running full build, I am getting failures in 
TestMinimalLockingWriteAheadLog.

testRecoverFileThatHasTrailingNULBytesAndTruncation(org.wali.TestMinimalLockingWriteAheadLog)

And because of this not able to reliably detect failures in my own module.
Any suggestion to skip the above failures?

-Milan.


---


[GitHub] nifi issue #2158: NIFI-4360 Adding support for ADLS Processors. Feature incl...

2017-09-22 Thread milanchandna
Github user milanchandna commented on the issue:

https://github.com/apache/nifi/pull/2158
  
Removed the test resource files(as there were license issues) and resolved 
their dependencies. And there were environment specific characters in test 
cases. Got rid of them. Please check now.


---


[GitHub] nifi issue #2158: NIFI-4360 Adding support for ADLS Processors. Feature incl...

2017-09-21 Thread milanchandna
Github user milanchandna commented on the issue:

https://github.com/apache/nifi/pull/2158
  
Thanks @joewitt

These test resource files contains Lorem Ipsum text.
But anyways I will create my own if required but dont want to delete as 
they are required for an important test case.

Thanks.


---


[GitHub] nifi pull request #2158: NIFI-4360 Adding support for ADLS Processors. Featu...

2017-09-18 Thread milanchandna
GitHub user milanchandna opened a pull request:

https://github.com/apache/nifi/pull/2158

NIFI-4360 Adding support for ADLS Processors. Feature includes List, …

…Get, Put processors. Till now ADLS interaction was possible using HDFS 
processors. Now users can ingress and egress their data directly using ADLS 
processors. It is much simpler and easy. And one lesser layer to go through as 
well. This will also help users who are not familiar with Hadoop configurations.

Thank you for submitting a contribution to Apache NiFi.

In order to streamline the review of the contribution we ask you
to ensure the following steps have been taken:

### For all changes:
- [x] Is there a JIRA ticket associated with this PR? Is it referenced 
 in the commit message?

- [x] Does your PR title start with NIFI- where  is the JIRA number 
you are trying to resolve? Pay particular attention to the hyphen "-" character.

- [x] Has your PR been rebased against the latest commit within the target 
branch (typically master)?

- [x] Is your initial contribution a single, squashed commit?

### For code changes:
- [ ] Have you ensured that the full suite of tests is executed via mvn 
-Pcontrib-check clean install at the root nifi folder? - There were failures in 
other modules 
testRecoverFileThatHasTrailingNULBytesAndTruncation(org.wali.TestMinimalLockingWriteAheadLog)
- [x] Have you written or updated unit tests to verify your changes?
- [x] If adding new dependencies to the code, are these dependencies 
licensed in a way that is compatible for inclusion under [ASF 
2.0](http://www.apache.org/legal/resolved.html#category-a)? 
- [x] If applicable, have you updated the LICENSE file, including the main 
LICENSE file under nifi-assembly?
- [ ] If applicable, have you updated the NOTICE file, including the main 
NOTICE file found under nifi-assembly?
- [ ] If adding new Properties, have you added .displayName in addition to 
.name (programmatic access) for each of the new properties?

### For documentation related changes:
- [x] Have you ensured that format looks appropriate for the output in 
which it is rendered?

### Note:
Please ensure that once the PR is submitted, you check travis-ci for build 
issues and submit an update to your PR as soon as possible.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/milanchandna/nifi NIFI-4360

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/nifi/pull/2158.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2158


commit 4f449c55c99868bd8b96bc370186503f4928c11f
Author: Milan Chandna <milan.chan...@gmail.com>
Date:   2017-09-18T07:00:17Z

NIFI-4360 Adding support for ADLS Processors. Feature includes List, Get, 
Put processors.




---