Repository: nifi
Updated Branches:
  refs/heads/master 6e0be8e64 -> 500a254e3


http://git-wip-us.apache.org/repos/asf/nifi/blob/0b851910/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestDeleteHBaseCells.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestDeleteHBaseCells.java
 
b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestDeleteHBaseCells.java
new file mode 100644
index 0000000..ce8e044
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestDeleteHBaseCells.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.hbase;
+
+import org.apache.nifi.reporting.InitializationException;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.List;
+
+public class TestDeleteHBaseCells extends DeleteTestBase {
+
+    @Before
+    public void setup() throws InitializationException {
+        super.setup(DeleteHBaseCells.class);
+    }
+
+    @Test
+    public void testSimpleDelete() {
+        final String SEP = "::::";
+        List<String> ids = populateTable(10000);
+        runner.setProperty(DeleteHBaseCells.SEPARATOR, SEP);
+        runner.assertValid();
+        StringBuilder sb = new StringBuilder();
+        for (String id : ids) {
+            sb.append(String.format("%s%sX%sY\n", id, SEP, SEP));
+        }
+        runner.enqueue(sb.toString().trim());
+        runner.run();
+        runner.assertAllFlowFilesTransferred(DeleteHBaseCells.REL_SUCCESS);
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/0b851910/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestDeleteHBaseRow.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestDeleteHBaseRow.java
 
b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestDeleteHBaseRow.java
index fe819dd..3315a27 100644
--- 
a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestDeleteHBaseRow.java
+++ 
b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestDeleteHBaseRow.java
@@ -19,45 +19,19 @@ package org.apache.nifi.hbase;
 
 import org.apache.nifi.reporting.InitializationException;
 import org.apache.nifi.util.MockFlowFile;
-import org.apache.nifi.util.TestRunner;
-import org.apache.nifi.util.TestRunners;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
-import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.UUID;
 
-public class TestDeleteHBaseRow {
-    private TestRunner runner;
-    private MockHBaseClientService hBaseClient;
+public class TestDeleteHBaseRow extends DeleteTestBase {
 
     @Before
     public void setup() throws InitializationException {
-        runner = TestRunners.newTestRunner(new DeleteHBaseRow());
-
-        hBaseClient = new MockHBaseClientService();
-        runner.addControllerService("hbaseClient", hBaseClient);
-        runner.enableControllerService(hBaseClient);
-
-        runner.setProperty(DeleteHBaseRow.TABLE_NAME, "nifi");
-        runner.setProperty(DeleteHBaseRow.HBASE_CLIENT_SERVICE, "hbaseClient");
-    }
-
-    List<String> populateTable(int max) {
-        List<String> ids = new ArrayList<>();
-        for (int index = 0; index < max; index++) {
-            String uuid = UUID.randomUUID().toString();
-            ids.add(uuid);
-            Map<String, String> cells = new HashMap<>();
-            cells.put("test", UUID.randomUUID().toString());
-            hBaseClient.addResult(uuid, cells, System.currentTimeMillis());
-        }
-
-        return ids;
+        super.setup(DeleteHBaseRow.class);
     }
 
     @Test

http://git-wip-us.apache.org/repos/asf/nifi/blob/0b851910/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestFetchHBaseRow.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestFetchHBaseRow.java
 
b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestFetchHBaseRow.java
index b44c6b0..5878bca 100644
--- 
a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestFetchHBaseRow.java
+++ 
b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestFetchHBaseRow.java
@@ -39,6 +39,7 @@ public class TestFetchHBaseRow {
     public void setup() throws InitializationException {
         proc = new FetchHBaseRow();
         runner = TestRunners.newTestRunner(proc);
+        runner.setProperty(FetchHBaseRow.AUTHORIZATIONS, "");
 
         hBaseClientService = new MockHBaseClientService();
         runner.addControllerService("hbaseClient", hBaseClientService);
@@ -48,6 +49,7 @@ public class TestFetchHBaseRow {
 
     @Test
     public void testColumnsValidation() {
+        runner.setProperty(FetchHBaseRow.AUTHORIZATIONS, "");
         runner.setProperty(FetchHBaseRow.TABLE_NAME, "table1");
         runner.setProperty(FetchHBaseRow.ROW_ID, "row1");
         runner.assertValid();
@@ -78,6 +80,7 @@ public class TestFetchHBaseRow {
     public void testNoIncomingFlowFile() {
         runner.setProperty(FetchHBaseRow.TABLE_NAME, "table1");
         runner.setProperty(FetchHBaseRow.ROW_ID, "row1");
+        runner.setProperty(FetchHBaseRow.AUTHORIZATIONS, "");
 
         runner.run();
         runner.assertTransferCount(FetchHBaseRow.REL_FAILURE, 0);
@@ -91,6 +94,7 @@ public class TestFetchHBaseRow {
     public void testInvalidTableName() {
         runner.setProperty(FetchHBaseRow.TABLE_NAME, "${hbase.table}");
         runner.setProperty(FetchHBaseRow.ROW_ID, "row1");
+        runner.setProperty(FetchHBaseRow.AUTHORIZATIONS, "");
 
         runner.enqueue("trigger flow file");
         runner.run();
@@ -106,6 +110,7 @@ public class TestFetchHBaseRow {
     public void testInvalidRowId() {
         runner.setProperty(FetchHBaseRow.TABLE_NAME, "table1");
         runner.setProperty(FetchHBaseRow.ROW_ID, "${hbase.row}");
+        runner.setProperty(FetchHBaseRow.AUTHORIZATIONS, "");
 
         runner.enqueue("trigger flow file");
         runner.run();
@@ -125,7 +130,7 @@ public class TestFetchHBaseRow {
 
         final long ts1 = 123456789;
         hBaseClientService.addResult("row1", cells, ts1);
-
+        runner.setProperty(FetchHBaseRow.AUTHORIZATIONS, "");
         runner.setProperty(FetchHBaseRow.TABLE_NAME, "table1");
         runner.setProperty(FetchHBaseRow.ROW_ID, "row1");
         runner.setProperty(FetchHBaseRow.DESTINATION, 
FetchHBaseRow.DESTINATION_ATTRIBUTES);
@@ -155,6 +160,7 @@ public class TestFetchHBaseRow {
         final long ts1 = 123456789;
         hBaseClientService.addResult("row1", cells, ts1);
 
+        runner.setProperty(FetchHBaseRow.AUTHORIZATIONS, "");
         runner.setProperty(FetchHBaseRow.TABLE_NAME, "table1");
         runner.setProperty(FetchHBaseRow.ROW_ID, "row1");
         runner.setProperty(FetchHBaseRow.COLUMNS, "nifi:cq2");

http://git-wip-us.apache.org/repos/asf/nifi/blob/0b851910/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestGetHBase.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestGetHBase.java
 
b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestGetHBase.java
index 24f83e9a..484d714 100644
--- 
a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestGetHBase.java
+++ 
b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestGetHBase.java
@@ -76,6 +76,9 @@ public class TestGetHBase {
         runner.setProperty(GetHBase.TABLE_NAME, "nifi");
         runner.setProperty(GetHBase.DISTRIBUTED_CACHE_SERVICE, "cacheClient");
         runner.setProperty(GetHBase.HBASE_CLIENT_SERVICE, "hbaseClient");
+        runner.setProperty(GetHBase.AUTHORIZATIONS, "");
+
+        runner.setValidateExpressionUsage(true);
     }
 
     @After

http://git-wip-us.apache.org/repos/asf/nifi/blob/0b851910/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestPutHBaseJSON.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestPutHBaseJSON.java
 
b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestPutHBaseJSON.java
index ee799e4..14a9bde 100644
--- 
a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestPutHBaseJSON.java
+++ 
b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestPutHBaseJSON.java
@@ -505,6 +505,7 @@ public class TestPutHBaseJSON {
         runner.setProperty(PutHBaseJSON.TABLE_NAME, table);
         runner.setProperty(PutHBaseJSON.COLUMN_FAMILY, columnFamily);
         runner.setProperty(PutHBaseJSON.BATCH_SIZE, batchSize);
+
         return runner;
     }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/0b851910/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestPutHBaseRecord.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestPutHBaseRecord.java
 
b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestPutHBaseRecord.java
index af0da95..e817a86 100644
--- 
a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestPutHBaseRecord.java
+++ 
b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestPutHBaseRecord.java
@@ -64,6 +64,8 @@ public class TestPutHBaseRecord {
         }
         runner.enableControllerService(parser);
         runner.setProperty(PutHBaseRecord.RECORD_READER_FACTORY, "parser");
+        runner.setProperty(PutHBaseRecord.DEFAULT_VISIBILITY_STRING, "");
+        runner.setProperty(PutHBaseRecord.VISIBILITY_RECORD_PATH, "");
 
         parser.addSchemaField("id", RecordFieldType.INT);
         parser.addSchemaField("name", RecordFieldType.STRING);

http://git-wip-us.apache.org/repos/asf/nifi/blob/0b851910/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestVisibilityUtil.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestVisibilityUtil.java
 
b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestVisibilityUtil.java
new file mode 100644
index 0000000..261efae
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestVisibilityUtil.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.hbase;
+
+import org.apache.nifi.hbase.util.VisibilityUtil;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.HashMap;
+
+public class TestVisibilityUtil {
+    private TestRunner runner;
+
+    @Before
+    public void setup() throws Exception {
+        runner = TestRunners.newTestRunner(PutHBaseCell.class);
+        final MockHBaseClientService hBaseClient = new 
MockHBaseClientService();
+        runner.addControllerService("hbaseClient", hBaseClient);
+        runner.enableControllerService(hBaseClient);
+        runner.setProperty(PutHBaseCell.HBASE_CLIENT_SERVICE, "hbaseClient");
+        runner.setProperty(PutHBaseCell.TABLE_NAME, "test");
+        runner.setProperty(PutHBaseCell.COLUMN_QUALIFIER, "test");
+        runner.setProperty(PutHBaseCell.COLUMN_FAMILY, "test");
+        runner.assertValid();
+    }
+
+    @Test
+    public void testAllPresentOnFlowfile() {
+        runner.setProperty("visibility.test.test", "U&PII");
+
+        MockFlowFile ff = new MockFlowFile(System.currentTimeMillis());
+        ff.putAttributes(new HashMap<String, String>(){{
+            put("visibility.test.test", "U&PII&PHI");
+        }});
+        ProcessContext context = runner.getProcessContext();
+
+        String label = VisibilityUtil.pickVisibilityString("test", "test", ff, 
context);
+
+        Assert.assertNotNull(label);
+        Assert.assertEquals("U&PII&PHI", label);
+    }
+
+    @Test
+    public void testOnlyColumnFamilyOnFlowfile() {
+        runner.setProperty("visibility.test", "U&PII");
+
+        MockFlowFile ff = new MockFlowFile(System.currentTimeMillis());
+        ff.putAttributes(new HashMap<String, String>(){{
+            put("visibility.test", "U&PII&PHI");
+        }});
+        ProcessContext context = runner.getProcessContext();
+
+        String label = VisibilityUtil.pickVisibilityString("test", "test", ff, 
context);
+
+        Assert.assertNotNull(label);
+        Assert.assertEquals("U&PII&PHI", label);
+    }
+
+    @Test
+    public void testInvalidAttributes() {
+        runner.setProperty("visibility.test", "U&PII");
+
+        MockFlowFile ff = new MockFlowFile(System.currentTimeMillis());
+        ff.putAttributes(new HashMap<String, String>(){{
+            put("visibility..test", "U&PII&PHI");
+        }});
+        ProcessContext context = runner.getProcessContext();
+
+        String label = VisibilityUtil.pickVisibilityString("test", "test", ff, 
context);
+
+        Assert.assertNotNull(label);
+        Assert.assertEquals("U&PII", label);
+    }
+
+    @Test
+    public void testColumnFamilyAttributeOnly() {
+        MockFlowFile ff = new MockFlowFile(System.currentTimeMillis());
+        ff.putAttributes(new HashMap<String, String>(){{
+            put("visibility.test", "U&PII");
+        }});
+        ProcessContext context = runner.getProcessContext();
+
+        String label = VisibilityUtil.pickVisibilityString("test", "test", ff, 
context);
+
+        Assert.assertNotNull(label);
+        Assert.assertEquals("U&PII", label);
+    }
+
+    @Test
+    public void testNoAttributes() {
+        runner.setProperty("visibility.test", "U&PII");
+
+        MockFlowFile ff = new MockFlowFile(System.currentTimeMillis());
+        ProcessContext context = runner.getProcessContext();
+
+        String label = VisibilityUtil.pickVisibilityString("test", "test", ff, 
context);
+
+        Assert.assertNotNull(label);
+        Assert.assertEquals("U&PII", label);
+
+        runner.setProperty("visibility.test.test", "U&PII&PHI");
+        label = VisibilityUtil.pickVisibilityString("test", "test", ff, 
context);
+
+        Assert.assertNotNull(label);
+        Assert.assertEquals("U&PII&PHI", label);
+
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/0b851910/nifi-nar-bundles/nifi-standard-services/nifi-hbase-client-service-api/src/main/java/org/apache/nifi/hbase/DeleteRequest.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-hbase-client-service-api/src/main/java/org/apache/nifi/hbase/DeleteRequest.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-hbase-client-service-api/src/main/java/org/apache/nifi/hbase/DeleteRequest.java
new file mode 100644
index 0000000..da3fd5c
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-hbase-client-service-api/src/main/java/org/apache/nifi/hbase/DeleteRequest.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.hbase;
+
+/**
+ * Encapsulates the information for a delete operation.
+ */
+public class DeleteRequest {
+    private byte[] rowId;
+    private byte[] columnFamily;
+    private byte[] columnQualifier;
+    private String visibilityLabel;
+
+    public DeleteRequest(byte[] rowId, byte[] columnFamily, byte[] 
columnQualifier, String visibilityLabel) {
+        this.rowId = rowId;
+        this.columnFamily = columnFamily;
+        this.columnQualifier = columnQualifier;
+        this.visibilityLabel = visibilityLabel;
+    }
+
+    public byte[] getRowId() {
+        return rowId;
+    }
+
+    public byte[] getColumnFamily() {
+        return columnFamily;
+    }
+
+    public byte[] getColumnQualifier() {
+        return columnQualifier;
+    }
+
+    public String getVisibilityLabel() {
+        return visibilityLabel;
+    }
+
+    @Override
+    public String toString() {
+        return new StringBuilder()
+            .append(String.format("Row ID: %s\n", new String(rowId)))
+            .append(String.format("Column Family: %s\n", new 
String(columnFamily)))
+            .append(String.format("Column Qualifier: %s\n", new 
String(columnQualifier)))
+            .append(visibilityLabel != null ? String.format("Visibility Label: 
%s", visibilityLabel) : "")
+            .toString();
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/0b851910/nifi-nar-bundles/nifi-standard-services/nifi-hbase-client-service-api/src/main/java/org/apache/nifi/hbase/HBaseClientService.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-hbase-client-service-api/src/main/java/org/apache/nifi/hbase/HBaseClientService.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-hbase-client-service-api/src/main/java/org/apache/nifi/hbase/HBaseClientService.java
index cff3fb6..cd3d851 100644
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-hbase-client-service-api/src/main/java/org/apache/nifi/hbase/HBaseClientService.java
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-hbase-client-service-api/src/main/java/org/apache/nifi/hbase/HBaseClientService.java
@@ -119,6 +119,18 @@ public interface HBaseClientService extends 
ControllerService {
     void delete(String tableName, byte[] rowId) throws IOException;
 
     /**
+     * Deletes the given row on HBase. Uses the supplied visibility label for 
all cells in the delete.
+     * It will fail if HBase cannot delete a cell because the visibility label 
on the cell does not match the specified
+     * label.
+     *
+     * @param tableName the name of an HBase table
+     * @param rowId the id of the row to delete
+     * @param visibilityLabel a visibility label to apply to the delete
+     * @throws IOException thrown when there are communication errors with 
HBase
+     */
+    void delete(String tableName, byte[] rowId, String visibilityLabel) throws 
IOException;
+
+    /**
      * Deletes a list of rows in HBase. All cells are deleted.
      *
      * @param tableName the name of an HBase table
@@ -128,6 +140,25 @@ public interface HBaseClientService extends 
ControllerService {
     void delete(String tableName, List<byte[]> rowIds) throws IOException;
 
     /**
+     * Deletes a list of cells from HBase. This is intended to be used with 
granular delete operations.
+     *
+     * @param tableName the name of an HBase table.
+     * @param deletes a list of DeleteRequest objects.
+     * @throws IOException thrown when there are communication errors with 
HBase
+     */
+    void deleteCells(String tableName, List<DeleteRequest> deletes) throws 
IOException;
+
+    /**
+     * Deletes a list of rows in HBase. All cells that match the visibility 
label are deleted.
+     *
+     * @param tableName the name of an HBase table
+     * @param rowIds a list of rowIds to send in a batch delete
+     * @param visibilityLabel a visibility label expression
+     */
+
+    void delete(String tableName, List<byte[]> rowIds, String visibilityLabel) 
throws IOException;
+
+    /**
      * Scans the given table using the optional filter criteria and passing 
each result to the provided handler.
      *
      * @param tableName the name of an HBase table to scan
@@ -140,6 +171,19 @@ public interface HBaseClientService extends 
ControllerService {
     void scan(String tableName, Collection<Column> columns, String 
filterExpression, long minTime, ResultHandler handler) throws IOException;
 
     /**
+     * Scans the given table using the optional filter criteria and passing 
each result to the provided handler.
+     *
+     * @param tableName the name of an HBase table to scan
+     * @param columns optional columns to return, if not specified all columns 
are returned
+     * @param filterExpression optional filter expression, if not specified no 
filtering is performed
+     * @param minTime the minimum timestamp of cells to return, passed to the 
HBase scanner timeRange
+     * @param authorizations the visibility labels to apply to the scanner.
+     * @param handler a handler to process rows of the result set
+     * @throws IOException thrown when there are communication errors with 
HBase
+     */
+    void scan(String tableName, Collection<Column> columns, String 
filterExpression, long minTime, List<String> authorizations, ResultHandler 
handler) throws IOException;
+
+    /**
      * Scans the given table for the given rowId and passes the result to the 
handler.
      *
      * @param tableName the name of an HBase table to scan
@@ -149,7 +193,7 @@ public interface HBaseClientService extends 
ControllerService {
      * @param handler a handler to process rows of the result
      * @throws IOException thrown when there are communication errors with 
HBase
      */
-    void scan(String tableName, byte[] startRow, byte[] endRow, 
Collection<Column> columns, ResultHandler handler) throws IOException;
+    void scan(String tableName, byte[] startRow, byte[] endRow, 
Collection<Column> columns, List<String> authorizations, ResultHandler handler) 
throws IOException;
 
     /**
      * Scans the given table for the given range of row keys or time rage and 
passes the result to a handler.<br/>
@@ -163,10 +207,11 @@ public interface HBaseClientService extends 
ControllerService {
      * @param limitRows the maximum number of rows to be returned by scanner
      * @param isReversed whether this scan is a reversed one.
      * @param columns optional columns to return, if not specified all columns 
are returned
+     * @param authorizations optional list of visibility labels that the user 
should be able to see when communicating with HBase
      * @param handler a handler to process rows of the result
      */
     void scan(String tableName, String startRow, String endRow, String 
filterExpression, Long timerangeMin, Long timerangeMax, Integer limitRows,
-            Boolean isReversed, Collection<Column> columns, ResultHandler 
handler) throws IOException;
+            Boolean isReversed, Collection<Column> columns, List<String> 
authorizations, ResultHandler handler) throws IOException;
 
     /**
      * Converts the given boolean to it's byte representation.

http://git-wip-us.apache.org/repos/asf/nifi/blob/0b851910/nifi-nar-bundles/nifi-standard-services/nifi-hbase-client-service-api/src/main/java/org/apache/nifi/hbase/put/PutColumn.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-hbase-client-service-api/src/main/java/org/apache/nifi/hbase/put/PutColumn.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-hbase-client-service-api/src/main/java/org/apache/nifi/hbase/put/PutColumn.java
index b29e032..7769165 100644
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-hbase-client-service-api/src/main/java/org/apache/nifi/hbase/put/PutColumn.java
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-hbase-client-service-api/src/main/java/org/apache/nifi/hbase/put/PutColumn.java
@@ -24,18 +24,24 @@ public class PutColumn {
     private final byte[] columnFamily;
     private final byte[] columnQualifier;
     private final byte[] buffer;
+    private final String visibility;
     private final Long timestamp;
 
 
     public PutColumn(final byte[] columnFamily, final byte[] columnQualifier, 
final byte[] buffer) {
-        this(columnFamily, columnQualifier, buffer, null);
+        this(columnFamily, columnQualifier, buffer, null, null);
     }
 
     public PutColumn(final byte[] columnFamily, final byte[] columnQualifier, 
final byte[] buffer, final Long timestamp) {
+        this(columnFamily, columnQualifier, buffer, timestamp, null);
+    }
+
+    public PutColumn(final byte[] columnFamily, final byte[] columnQualifier, 
final byte[] buffer, final Long timestamp, final String visibility) {
         this.columnFamily = columnFamily;
         this.columnQualifier = columnQualifier;
         this.buffer = buffer;
         this.timestamp = timestamp;
+        this.visibility = visibility;
     }
 
     public byte[] getColumnFamily() {
@@ -50,6 +56,10 @@ public class PutColumn {
         return buffer;
     }
 
+    public String getVisibility() {
+        return visibility;
+    }
+
     public Long getTimestamp() {
         return timestamp;
     }

http://git-wip-us.apache.org/repos/asf/nifi/blob/0b851910/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientMapCacheService.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientMapCacheService.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientMapCacheService.java
index 7c10800..634ebd9 100644
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientMapCacheService.java
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientMapCacheService.java
@@ -51,7 +51,7 @@ import org.apache.nifi.processor.util.StandardValidators;
 @CapabilityDescription("Provides the ability to use an HBase table as a cache, 
in place of a DistributedMapCache."
     + " Uses a HBase_1_1_2_ClientService controller to communicate with 
HBase.")
 
-public class HBase_1_1_2_ClientMapCacheService extends 
AbstractControllerService implements DistributedMapCacheClient {
+public class HBase_1_1_2_ClientMapCacheService extends 
AbstractControllerService implements DistributedMapCacheClient, 
VisibilityLabelService {
 
     static final PropertyDescriptor HBASE_CLIENT_SERVICE = new 
PropertyDescriptor.Builder()
         .name("HBase Client Service")
@@ -90,6 +90,7 @@ public class HBase_1_1_2_ClientMapCacheService extends 
AbstractControllerService
     protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
         final List<PropertyDescriptor> descriptors = new ArrayList<>();
         descriptors.add(HBASE_CACHE_TABLE_NAME);
+        descriptors.add(AUTHORIZATIONS);
         descriptors.add(HBASE_CLIENT_SERVICE);
         descriptors.add(HBASE_COLUMN_FAMILY);
         descriptors.add(HBASE_COLUMN_QUALIFIER);
@@ -106,6 +107,8 @@ public class HBase_1_1_2_ClientMapCacheService extends 
AbstractControllerService
     private volatile String hBaseColumnQualifier;
     private volatile byte[] hBaseColumnQualifierBytes;
 
+    private List<String> authorizations;
+
     @OnEnabled
     public void onConfigured(final ConfigurationContext context) throws 
InitializationException{
         hBaseClientService   = 
context.getProperty(HBASE_CLIENT_SERVICE).asControllerService(HBaseClientService.class);
@@ -116,6 +119,8 @@ public class HBase_1_1_2_ClientMapCacheService extends 
AbstractControllerService
 
         hBaseColumnFamilyBytes    = 
hBaseColumnFamily.getBytes(StandardCharsets.UTF_8);
         hBaseColumnQualifierBytes = 
hBaseColumnQualifier.getBytes(StandardCharsets.UTF_8);
+
+        authorizations = getAuthorizations(context);
     }
 
     private <T> byte[] serialize(final T value, final Serializer<T> 
serializer) throws IOException {
@@ -158,7 +163,7 @@ public class HBase_1_1_2_ClientMapCacheService extends 
AbstractControllerService
 
       final List<Column> columnsList = new ArrayList<Column>(0);
 
-      hBaseClientService.scan(hBaseCacheTableName, rowIdBytes, rowIdBytes, 
columnsList, handler);
+      hBaseClientService.scan(hBaseCacheTableName, rowIdBytes, rowIdBytes, 
columnsList, authorizations, handler);
       return (handler.numRows() > 0);
     }
 
@@ -190,7 +195,7 @@ public class HBase_1_1_2_ClientMapCacheService extends 
AbstractControllerService
 
       final List<Column> columnsList = new ArrayList<Column>(0);
 
-      hBaseClientService.scan(hBaseCacheTableName, rowIdBytes, rowIdBytes, 
columnsList, handler);
+      hBaseClientService.scan(hBaseCacheTableName, rowIdBytes, rowIdBytes, 
columnsList, authorizations, handler);
       if (handler.numRows() > 1) {
           throw new IOException("Found multiple rows in HBase for key");
       } else if(handler.numRows() == 1) {

http://git-wip-us.apache.org/repos/asf/nifi/blob/0b851910/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientService.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientService.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientService.java
index ccbed60..01022e3 100644
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientService.java
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientService.java
@@ -25,14 +25,17 @@ import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Admin;
 import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.client.ConnectionFactory;
-import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.exceptions.DeserializationException;
 import org.apache.hadoop.hbase.filter.Filter;
 import org.apache.hadoop.hbase.filter.ParseFilter;
+import org.apache.hadoop.hbase.security.visibility.Authorizations;
+import org.apache.hadoop.hbase.security.visibility.CellVisibility;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.nifi.annotation.behavior.DynamicProperty;
@@ -62,6 +65,7 @@ import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.IOException;
+import java.io.UnsupportedEncodingException;
 import java.nio.charset.StandardCharsets;
 import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
@@ -115,6 +119,10 @@ public class HBase_1_1_2_ClientService extends 
AbstractControllerService impleme
         return connection;
     }
 
+    protected void setConnection(Connection connection) {
+        this.connection = connection;
+    }
+
     @Override
     protected void init(ControllerServiceInitializationContext config) throws 
InitializationException {
         kerberosConfigFile = config.getKerberosConfigurationFile();
@@ -315,6 +323,8 @@ public class HBase_1_1_2_ClientService extends 
AbstractControllerService impleme
 
     }
 
+    private String principal = null;
+
     protected Configuration getConfigurationFromFiles(final String 
configFiles) {
         final Configuration hbaseConfig = HBaseConfiguration.create();
         if (StringUtils.isNotBlank(configFiles)) {
@@ -336,51 +346,85 @@ public class HBase_1_1_2_ClientService extends 
AbstractControllerService impleme
         }
     }
 
+    private static final byte[] EMPTY_VIS_STRING;
+
+    static {
+        try {
+            EMPTY_VIS_STRING = "".getBytes("UTF-8");
+        } catch (UnsupportedEncodingException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    private List<Put> buildPuts(byte[] rowKey, List<PutColumn> columns) {
+        List<Put> retVal = new ArrayList<>();
+
+        try {
+            Put put = null;
+
+            for (final PutColumn column : columns) {
+                if (put == null || (put.getCellVisibility() == null && 
column.getVisibility() != null) || ( put.getCellVisibility() != null
+                        && 
!put.getCellVisibility().getExpression().equals(column.getVisibility())
+                    )) {
+                    put = new Put(rowKey);
+
+                    if (column.getVisibility() != null) {
+                        put.setCellVisibility(new 
CellVisibility(column.getVisibility()));
+                    }
+                    retVal.add(put);
+                }
+
+                if (column.getTimestamp() != null) {
+                    put.addColumn(
+                            column.getColumnFamily(),
+                            column.getColumnQualifier(),
+                            column.getTimestamp(),
+                            column.getBuffer());
+                } else {
+                    put.addColumn(
+                            column.getColumnFamily(),
+                            column.getColumnQualifier(),
+                            column.getBuffer());
+                }
+            }
+        } catch (DeserializationException de) {
+            getLogger().error("Error writing cell visibility statement.", de);
+            throw new RuntimeException(de);
+        }
+
+        return retVal;
+    }
+
     @Override
     public void put(final String tableName, final Collection<PutFlowFile> 
puts) throws IOException {
         try (final Table table = 
connection.getTable(TableName.valueOf(tableName))) {
             // Create one Put per row....
-            final Map<String, Put> rowPuts = new HashMap<>();
+            final Map<String, List<PutColumn>> sorted = new HashMap<>();
+            final List<Put> newPuts = new ArrayList<>();
+
             for (final PutFlowFile putFlowFile : puts) {
-                //this is used for the map key as a byte[] does not work as a 
key.
                 final String rowKeyString = new String(putFlowFile.getRow(), 
StandardCharsets.UTF_8);
-                Put put = rowPuts.get(rowKeyString);
-                if (put == null) {
-                    put = new Put(putFlowFile.getRow());
-                    rowPuts.put(rowKeyString, put);
+                List<PutColumn> columns = sorted.get(rowKeyString);
+                if (columns == null) {
+                    columns = new ArrayList<>();
+                    sorted.put(rowKeyString, columns);
                 }
 
-                for (final PutColumn column : putFlowFile.getColumns()) {
-                    if (column.getTimestamp() != null) {
-                        put.addColumn(
-                                column.getColumnFamily(),
-                                column.getColumnQualifier(),
-                                column.getTimestamp(),
-                                column.getBuffer());
-                    } else {
-                        put.addColumn(
-                                column.getColumnFamily(),
-                                column.getColumnQualifier(),
-                                column.getBuffer());
-                    }
-                }
+                columns.addAll(putFlowFile.getColumns());
+            }
+
+            for (final Map.Entry<String, List<PutColumn>> entry : 
sorted.entrySet()) {
+                
newPuts.addAll(buildPuts(entry.getKey().getBytes(StandardCharsets.UTF_8), 
entry.getValue()));
             }
 
-            table.put(new ArrayList<>(rowPuts.values()));
+            table.put(newPuts);
         }
     }
 
     @Override
     public void put(final String tableName, final byte[] rowId, final 
Collection<PutColumn> columns) throws IOException {
         try (final Table table = 
connection.getTable(TableName.valueOf(tableName))) {
-            Put put = new Put(rowId);
-            for (final PutColumn column : columns) {
-                put.addColumn(
-                        column.getColumnFamily(),
-                        column.getColumnQualifier(),
-                        column.getBuffer());
-            }
-            table.put(put);
+            table.put(buildPuts(rowId, new ArrayList(columns)));
         }
     }
 
@@ -398,18 +442,54 @@ public class HBase_1_1_2_ClientService extends 
AbstractControllerService impleme
 
     @Override
     public void delete(final String tableName, final byte[] rowId) throws 
IOException {
+        delete(tableName, rowId, null);
+    }
+
+    @Override
+    public void delete(String tableName, byte[] rowId, String visibilityLabel) 
throws IOException {
         try (final Table table = 
connection.getTable(TableName.valueOf(tableName))) {
             Delete delete = new Delete(rowId);
+            if (!StringUtils.isEmpty(visibilityLabel)) {
+                delete.setCellVisibility(new CellVisibility(visibilityLabel));
+            }
             table.delete(delete);
         }
     }
 
     @Override
     public void delete(String tableName, List<byte[]> rowIds) throws 
IOException {
+        delete(tableName, rowIds);
+    }
+
+    @Override
+    public void deleteCells(String tableName, List<DeleteRequest> deletes) 
throws IOException {
+        List<Delete> deleteRequests = new ArrayList<>();
+        for (int index = 0; index < deletes.size(); index++) {
+            DeleteRequest req = deletes.get(index);
+            Delete delete = new Delete(req.getRowId())
+                .addColumn(req.getColumnFamily(), req.getColumnQualifier());
+            if (!StringUtils.isEmpty(req.getVisibilityLabel())) {
+                delete.setCellVisibility(new 
CellVisibility(req.getVisibilityLabel()));
+            }
+            deleteRequests.add(delete);
+        }
+        batchDelete(tableName, deleteRequests);
+    }
+
+    @Override
+    public void delete(String tableName, List<byte[]> rowIds, String 
visibilityLabel) throws IOException {
         List<Delete> deletes = new ArrayList<>();
         for (int index = 0; index < rowIds.size(); index++) {
-            deletes.add(new Delete(rowIds.get(index)));
+            Delete delete = new Delete(rowIds.get(index));
+            if (!StringUtils.isBlank(visibilityLabel)) {
+                delete.setCellVisibility(new CellVisibility(visibilityLabel));
+            }
+            deletes.add(delete);
         }
+        batchDelete(tableName, deletes);
+    }
+
+    private void batchDelete(String tableName, List<Delete> deletes) throws 
IOException {
         try (final Table table = 
connection.getTable(TableName.valueOf(tableName))) {
             table.delete(deletes);
         }
@@ -418,7 +498,11 @@ public class HBase_1_1_2_ClientService extends 
AbstractControllerService impleme
     @Override
     public void scan(final String tableName, final Collection<Column> columns, 
final String filterExpression, final long minTime, final ResultHandler handler)
             throws IOException {
+        scan(tableName, columns, filterExpression, minTime, null, handler);
+    }
 
+    @Override
+    public void scan(String tableName, Collection<Column> columns, String 
filterExpression, long minTime, List<String> visibilityLabels, ResultHandler 
handler) throws IOException {
         Filter filter = null;
         if (!StringUtils.isBlank(filterExpression)) {
             ParseFilter parseFilter = new ParseFilter();
@@ -426,7 +510,7 @@ public class HBase_1_1_2_ClientService extends 
AbstractControllerService impleme
         }
 
         try (final Table table = 
connection.getTable(TableName.valueOf(tableName));
-             final ResultScanner scanner = getResults(table, columns, filter, 
minTime)) {
+             final ResultScanner scanner = getResults(table, columns, filter, 
minTime, visibilityLabels)) {
 
             for (final Result result : scanner) {
                 final byte[] rowKey = result.getRow();
@@ -451,11 +535,11 @@ public class HBase_1_1_2_ClientService extends 
AbstractControllerService impleme
     }
 
     @Override
-    public void scan(final String tableName, final byte[] startRow, final 
byte[] endRow, final Collection<Column> columns, final ResultHandler handler)
+    public void scan(final String tableName, final byte[] startRow, final 
byte[] endRow, final Collection<Column> columns, List<String> authorizations, 
final ResultHandler handler)
             throws IOException {
 
         try (final Table table = 
connection.getTable(TableName.valueOf(tableName));
-             final ResultScanner scanner = getResults(table, startRow, endRow, 
columns)) {
+             final ResultScanner scanner = getResults(table, startRow, endRow, 
columns, authorizations)) {
 
             for (final Result result : scanner) {
                 final byte[] rowKey = result.getRow();
@@ -482,11 +566,11 @@ public class HBase_1_1_2_ClientService extends 
AbstractControllerService impleme
     @Override
     public void scan(final String tableName, final String startRow, final 
String endRow, String filterExpression,
             final Long timerangeMin, final Long timerangeMax, final Integer 
limitRows, final Boolean isReversed,
-            final Collection<Column> columns, final ResultHandler handler) 
throws IOException {
+            final Collection<Column> columns, List<String> visibilityLabels, 
final ResultHandler handler) throws IOException {
 
         try (final Table table = 
connection.getTable(TableName.valueOf(tableName));
                 final ResultScanner scanner = getResults(table, startRow, 
endRow, filterExpression, timerangeMin,
-                        timerangeMax, limitRows, isReversed, columns)) {
+                        timerangeMax, limitRows, isReversed, columns, 
visibilityLabels)) {
 
             int cnt = 0;
             final int lim = limitRows != null ? limitRows : 0;
@@ -515,12 +599,11 @@ public class HBase_1_1_2_ClientService extends 
AbstractControllerService impleme
                 handler.handle(rowKey, resultCells);
             }
         }
-
     }
 
     //
     protected ResultScanner getResults(final Table table, final String 
startRow, final String endRow, final String filterExpression, final Long 
timerangeMin, final Long timerangeMax,
-            final Integer limitRows, final Boolean isReversed, final 
Collection<Column> columns)  throws IOException {
+            final Integer limitRows, final Boolean isReversed, final 
Collection<Column> columns, List<String> authorizations)  throws IOException {
         final Scan scan = new Scan();
         if (!StringUtils.isBlank(startRow)){
             scan.setStartRow(startRow.getBytes(StandardCharsets.UTF_8));
@@ -529,6 +612,9 @@ public class HBase_1_1_2_ClientService extends 
AbstractControllerService impleme
             scan.setStopRow(   endRow.getBytes(StandardCharsets.UTF_8));
         }
 
+        if (authorizations != null && authorizations.size() > 0) {
+            scan.setAuthorizations(new Authorizations(authorizations));
+        }
 
         Filter filter = null;
         if (columns != null) {
@@ -565,12 +651,16 @@ public class HBase_1_1_2_ClientService extends 
AbstractControllerService impleme
     }
 
     // protected and extracted into separate method for testing
-    protected ResultScanner getResults(final Table table, final byte[] 
startRow, final byte[] endRow, final Collection<Column> columns) throws 
IOException {
+    protected ResultScanner getResults(final Table table, final byte[] 
startRow, final byte[] endRow, final Collection<Column> columns, List<String> 
authorizations) throws IOException {
         final Scan scan = new Scan();
         scan.setStartRow(startRow);
         scan.setStopRow(endRow);
 
-        if (columns != null) {
+        if (authorizations != null && authorizations.size() > 0) {
+            scan.setAuthorizations(new Authorizations(authorizations));
+        }
+
+        if (columns != null && columns.size() > 0) {
             for (Column col : columns) {
                 if (col.getQualifier() == null) {
                     scan.addFamily(col.getFamily());
@@ -584,7 +674,7 @@ public class HBase_1_1_2_ClientService extends 
AbstractControllerService impleme
     }
 
     // protected and extracted into separate method for testing
-    protected ResultScanner getResults(final Table table, final 
Collection<Column> columns, final Filter filter, final long minTime) throws 
IOException {
+    protected ResultScanner getResults(final Table table, final 
Collection<Column> columns, final Filter filter, final long minTime, 
List<String> authorizations) throws IOException {
         // Create a new scan. We will set the min timerange as the latest 
timestamp that
         // we have seen so far. The minimum timestamp is inclusive, so we will 
get duplicates.
         // We will record any cells that have the latest timestamp, so that 
when we scan again,
@@ -592,6 +682,10 @@ public class HBase_1_1_2_ClientService extends 
AbstractControllerService impleme
         final Scan scan = new Scan();
         scan.setTimeRange(minTime, Long.MAX_VALUE);
 
+        if (authorizations != null && authorizations.size() > 0) {
+            scan.setAuthorizations(new Authorizations(authorizations));
+        }
+
         if (filter != null) {
             scan.setFilter(filter);
         }

http://git-wip-us.apache.org/repos/asf/nifi/blob/0b851910/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_RecordLookupService.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_RecordLookupService.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_RecordLookupService.java
index ddaa765..12fb6cf 100644
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_RecordLookupService.java
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_RecordLookupService.java
@@ -54,7 +54,7 @@ import java.util.Set;
 @Tags({"hbase", "record", "lookup", "service"})
 @CapabilityDescription("A lookup service that retrieves one or more columns 
from HBase and returns them as a record. The lookup coordinates " +
         "must contain 'rowKey' which will be the HBase row id.")
-public class HBase_1_1_2_RecordLookupService extends AbstractControllerService 
implements LookupService<Record> {
+public class HBase_1_1_2_RecordLookupService extends AbstractControllerService 
implements LookupService<Record>, VisibilityLabelService {
 
     static final String ROW_KEY_KEY = "rowKey";
     private static final Set<String> REQUIRED_KEYS = 
Collections.unmodifiableSet(new HashSet<>(Arrays.asList(ROW_KEY_KEY)));
@@ -98,6 +98,7 @@ public class HBase_1_1_2_RecordLookupService extends 
AbstractControllerService i
         final List<PropertyDescriptor> props = new ArrayList<>();
         props.add(HBASE_CLIENT_SERVICE);
         props.add(TABLE_NAME);
+        props.add(AUTHORIZATIONS);
         props.add(RETURN_COLUMNS);
         props.add(CHARSET);
         PROPERTIES = Collections.unmodifiableList(props);
@@ -107,6 +108,7 @@ public class HBase_1_1_2_RecordLookupService extends 
AbstractControllerService i
     private List<Column> columns;
     private Charset charset;
     private HBaseClientService hBaseClientService;
+    private List<String> authorizations;
 
     @Override
     protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
@@ -127,7 +129,8 @@ public class HBase_1_1_2_RecordLookupService extends 
AbstractControllerService i
         final byte[] rowKeyBytes = rowKey.getBytes(StandardCharsets.UTF_8);
         try {
             final Map<String, Object> values = new HashMap<>();
-            hBaseClientService.scan(tableName, rowKeyBytes, rowKeyBytes, 
columns, (byte[] row, ResultCell[] resultCells) ->  {
+
+            hBaseClientService.scan(tableName, rowKeyBytes, rowKeyBytes, 
columns, authorizations, (byte[] row, ResultCell[] resultCells) ->  {
                 for (final ResultCell cell : resultCells) {
                     final byte[] qualifier = 
Arrays.copyOfRange(cell.getQualifierArray(), cell.getQualifierOffset(), 
cell.getQualifierOffset() + cell.getQualifierLength());
                     final byte[] value = 
Arrays.copyOfRange(cell.getValueArray(), cell.getValueOffset(), 
cell.getValueOffset() + cell.getValueLength());
@@ -167,6 +170,7 @@ public class HBase_1_1_2_RecordLookupService extends 
AbstractControllerService i
         this.tableName = context.getProperty(TABLE_NAME).getValue();
         this.columns = 
getColumns(context.getProperty(RETURN_COLUMNS).getValue());
         this.charset = 
Charset.forName(context.getProperty(CHARSET).getValue());
+        this.authorizations = getAuthorizations(context);
     }
 
     @OnDisabled

http://git-wip-us.apache.org/repos/asf/nifi/blob/0b851910/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/IntegrationTestClientService.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/IntegrationTestClientService.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/IntegrationTestClientService.java
new file mode 100644
index 0000000..452f6c2
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/IntegrationTestClientService.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.hbase;
+
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.reporting.InitializationException;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+
+public class IntegrationTestClientService extends HBase_1_1_2_ClientService {
+    IntegrationTestClientService(Connection hbaseConnection) {
+        super();
+        setConnection(hbaseConnection);
+    }
+
+
+    @Override
+    protected Collection<ValidationResult> customValidate(ValidationContext 
validationContext) {
+        return new ArrayList<>();
+    }
+
+    @OnEnabled
+    public void onEnabled(final ConfigurationContext context) throws 
InitializationException, IOException, InterruptedException {
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/0b851910/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/VisibilityLabelService.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/VisibilityLabelService.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/VisibilityLabelService.java
new file mode 100644
index 0000000..9c9ab72
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/VisibilityLabelService.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.hbase;
+
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.util.StringUtils;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+public interface VisibilityLabelService {
+    PropertyDescriptor AUTHORIZATIONS = new PropertyDescriptor.Builder()
+        .name("hb-lu-authorizations")
+        .displayName("Authorizations")
+        .description("The list of authorization tokens to be used with cell 
visibility if it is enabled. These will be used to " +
+                "override the default authorization list for the user 
accessing HBase.")
+        .required(false)
+        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+        .build();
+
+    default List<String> getAuthorizations(ConfigurationContext context) {
+        List<String> tokens = new ArrayList<>();
+        String authorizationString = 
context.getProperty(AUTHORIZATIONS).isSet()
+                ? context.getProperty(AUTHORIZATIONS).getValue()
+                : "";
+        if (!StringUtils.isEmpty(authorizationString)) {
+            tokens = Arrays.asList(authorizationString.split(",[\\s]*"));
+        }
+
+        return tokens;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/0b851910/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/test/java/org/apache/nifi/hbase/MockHBaseClientService.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/test/java/org/apache/nifi/hbase/MockHBaseClientService.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/test/java/org/apache/nifi/hbase/MockHBaseClientService.java
index e4b9280..917cb3b 100644
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/test/java/org/apache/nifi/hbase/MockHBaseClientService.java
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/test/java/org/apache/nifi/hbase/MockHBaseClientService.java
@@ -142,21 +142,19 @@ public class MockHBaseClientService extends 
HBase_1_1_2_ClientService {
         return true;
     }
 
-    @Override
-    protected ResultScanner getResults(Table table, byte[] startRow, byte[] 
endRow, Collection<Column> columns) throws IOException {
+    protected ResultScanner getResults(Table table, byte[] startRow, byte[] 
endRow, Collection<Column> columns, List<String> labels) throws IOException {
         final ResultScanner scanner = Mockito.mock(ResultScanner.class);
         Mockito.when(scanner.iterator()).thenReturn(results.iterator());
         return scanner;
     }
 
     @Override
-    protected ResultScanner getResults(Table table, Collection<Column> 
columns, Filter filter, long minTime) throws IOException {
+    protected ResultScanner getResults(Table table, Collection<Column> 
columns, Filter filter, long minTime, List<String> labels) throws IOException {
         final ResultScanner scanner = Mockito.mock(ResultScanner.class);
         Mockito.when(scanner.iterator()).thenReturn(results.iterator());
         return scanner;
     }
 
-    @Override
     protected ResultScanner getResults(final Table table, final String 
startRow, final String endRow, final String filterExpression, final Long 
timerangeMin, final Long timerangeMax,
             final Integer limitRows, final Boolean isReversed, final 
Collection<Column> columns)  throws IOException {
         final ResultScanner scanner = Mockito.mock(ResultScanner.class);

Reply via email to