Repository: nifi Updated Branches: refs/heads/master 6e0be8e64 -> 500a254e3
http://git-wip-us.apache.org/repos/asf/nifi/blob/0b851910/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestDeleteHBaseCells.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestDeleteHBaseCells.java b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestDeleteHBaseCells.java new file mode 100644 index 0000000..ce8e044 --- /dev/null +++ b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestDeleteHBaseCells.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.hbase; + +import org.apache.nifi.reporting.InitializationException; +import org.junit.Before; +import org.junit.Test; + +import java.util.List; + +public class TestDeleteHBaseCells extends DeleteTestBase { + + @Before + public void setup() throws InitializationException { + super.setup(DeleteHBaseCells.class); + } + + @Test + public void testSimpleDelete() { + final String SEP = "::::"; + List<String> ids = populateTable(10000); + runner.setProperty(DeleteHBaseCells.SEPARATOR, SEP); + runner.assertValid(); + StringBuilder sb = new StringBuilder(); + for (String id : ids) { + sb.append(String.format("%s%sX%sY\n", id, SEP, SEP)); + } + runner.enqueue(sb.toString().trim()); + runner.run(); + runner.assertAllFlowFilesTransferred(DeleteHBaseCells.REL_SUCCESS); + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/0b851910/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestDeleteHBaseRow.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestDeleteHBaseRow.java b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestDeleteHBaseRow.java index fe819dd..3315a27 100644 --- a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestDeleteHBaseRow.java +++ b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestDeleteHBaseRow.java @@ -19,45 +19,19 @@ package org.apache.nifi.hbase; import org.apache.nifi.reporting.InitializationException; import org.apache.nifi.util.MockFlowFile; -import org.apache.nifi.util.TestRunner; -import org.apache.nifi.util.TestRunners; import org.junit.Assert; import org.junit.Before; import org.junit.Test; -import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.UUID; -public class TestDeleteHBaseRow { - private TestRunner runner; - private MockHBaseClientService hBaseClient; +public class TestDeleteHBaseRow extends DeleteTestBase { @Before public void setup() throws InitializationException { - runner = TestRunners.newTestRunner(new DeleteHBaseRow()); - - hBaseClient = new MockHBaseClientService(); - runner.addControllerService("hbaseClient", hBaseClient); - runner.enableControllerService(hBaseClient); - - runner.setProperty(DeleteHBaseRow.TABLE_NAME, "nifi"); - runner.setProperty(DeleteHBaseRow.HBASE_CLIENT_SERVICE, "hbaseClient"); - } - - List<String> populateTable(int max) { - List<String> ids = new ArrayList<>(); - for (int index = 0; index < max; index++) { - String uuid = UUID.randomUUID().toString(); - ids.add(uuid); - Map<String, String> cells = new HashMap<>(); - cells.put("test", UUID.randomUUID().toString()); - hBaseClient.addResult(uuid, cells, System.currentTimeMillis()); - } - - return ids; + super.setup(DeleteHBaseRow.class); } @Test http://git-wip-us.apache.org/repos/asf/nifi/blob/0b851910/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestFetchHBaseRow.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestFetchHBaseRow.java b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestFetchHBaseRow.java index b44c6b0..5878bca 100644 --- a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestFetchHBaseRow.java +++ b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestFetchHBaseRow.java @@ -39,6 +39,7 @@ public class TestFetchHBaseRow { public void setup() throws InitializationException { proc = new FetchHBaseRow(); runner = TestRunners.newTestRunner(proc); + runner.setProperty(FetchHBaseRow.AUTHORIZATIONS, ""); hBaseClientService = new MockHBaseClientService(); runner.addControllerService("hbaseClient", hBaseClientService); @@ -48,6 +49,7 @@ public class TestFetchHBaseRow { @Test public void testColumnsValidation() { + runner.setProperty(FetchHBaseRow.AUTHORIZATIONS, ""); runner.setProperty(FetchHBaseRow.TABLE_NAME, "table1"); runner.setProperty(FetchHBaseRow.ROW_ID, "row1"); runner.assertValid(); @@ -78,6 +80,7 @@ public class TestFetchHBaseRow { public void testNoIncomingFlowFile() { runner.setProperty(FetchHBaseRow.TABLE_NAME, "table1"); runner.setProperty(FetchHBaseRow.ROW_ID, "row1"); + runner.setProperty(FetchHBaseRow.AUTHORIZATIONS, ""); runner.run(); runner.assertTransferCount(FetchHBaseRow.REL_FAILURE, 0); @@ -91,6 +94,7 @@ public class TestFetchHBaseRow { public void testInvalidTableName() { runner.setProperty(FetchHBaseRow.TABLE_NAME, "${hbase.table}"); runner.setProperty(FetchHBaseRow.ROW_ID, "row1"); + runner.setProperty(FetchHBaseRow.AUTHORIZATIONS, ""); runner.enqueue("trigger flow file"); runner.run(); @@ -106,6 +110,7 @@ public class TestFetchHBaseRow { public void testInvalidRowId() { runner.setProperty(FetchHBaseRow.TABLE_NAME, "table1"); runner.setProperty(FetchHBaseRow.ROW_ID, "${hbase.row}"); + runner.setProperty(FetchHBaseRow.AUTHORIZATIONS, ""); runner.enqueue("trigger flow file"); runner.run(); @@ -125,7 +130,7 @@ public class TestFetchHBaseRow { final long ts1 = 123456789; hBaseClientService.addResult("row1", cells, ts1); - + runner.setProperty(FetchHBaseRow.AUTHORIZATIONS, ""); runner.setProperty(FetchHBaseRow.TABLE_NAME, "table1"); runner.setProperty(FetchHBaseRow.ROW_ID, "row1"); runner.setProperty(FetchHBaseRow.DESTINATION, FetchHBaseRow.DESTINATION_ATTRIBUTES); @@ -155,6 +160,7 @@ public class TestFetchHBaseRow { final long ts1 = 123456789; hBaseClientService.addResult("row1", cells, ts1); + runner.setProperty(FetchHBaseRow.AUTHORIZATIONS, ""); runner.setProperty(FetchHBaseRow.TABLE_NAME, "table1"); runner.setProperty(FetchHBaseRow.ROW_ID, "row1"); runner.setProperty(FetchHBaseRow.COLUMNS, "nifi:cq2"); http://git-wip-us.apache.org/repos/asf/nifi/blob/0b851910/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestGetHBase.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestGetHBase.java b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestGetHBase.java index 24f83e9a..484d714 100644 --- a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestGetHBase.java +++ b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestGetHBase.java @@ -76,6 +76,9 @@ public class TestGetHBase { runner.setProperty(GetHBase.TABLE_NAME, "nifi"); runner.setProperty(GetHBase.DISTRIBUTED_CACHE_SERVICE, "cacheClient"); runner.setProperty(GetHBase.HBASE_CLIENT_SERVICE, "hbaseClient"); + runner.setProperty(GetHBase.AUTHORIZATIONS, ""); + + runner.setValidateExpressionUsage(true); } @After http://git-wip-us.apache.org/repos/asf/nifi/blob/0b851910/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestPutHBaseJSON.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestPutHBaseJSON.java b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestPutHBaseJSON.java index ee799e4..14a9bde 100644 --- a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestPutHBaseJSON.java +++ b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestPutHBaseJSON.java @@ -505,6 +505,7 @@ public class TestPutHBaseJSON { runner.setProperty(PutHBaseJSON.TABLE_NAME, table); runner.setProperty(PutHBaseJSON.COLUMN_FAMILY, columnFamily); runner.setProperty(PutHBaseJSON.BATCH_SIZE, batchSize); + return runner; } http://git-wip-us.apache.org/repos/asf/nifi/blob/0b851910/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestPutHBaseRecord.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestPutHBaseRecord.java b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestPutHBaseRecord.java index af0da95..e817a86 100644 --- a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestPutHBaseRecord.java +++ b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestPutHBaseRecord.java @@ -64,6 +64,8 @@ public class TestPutHBaseRecord { } runner.enableControllerService(parser); runner.setProperty(PutHBaseRecord.RECORD_READER_FACTORY, "parser"); + runner.setProperty(PutHBaseRecord.DEFAULT_VISIBILITY_STRING, ""); + runner.setProperty(PutHBaseRecord.VISIBILITY_RECORD_PATH, ""); parser.addSchemaField("id", RecordFieldType.INT); parser.addSchemaField("name", RecordFieldType.STRING); http://git-wip-us.apache.org/repos/asf/nifi/blob/0b851910/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestVisibilityUtil.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestVisibilityUtil.java b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestVisibilityUtil.java new file mode 100644 index 0000000..261efae --- /dev/null +++ b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestVisibilityUtil.java @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.hbase; + +import org.apache.nifi.hbase.util.VisibilityUtil; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.util.HashMap; + +public class TestVisibilityUtil { + private TestRunner runner; + + @Before + public void setup() throws Exception { + runner = TestRunners.newTestRunner(PutHBaseCell.class); + final MockHBaseClientService hBaseClient = new MockHBaseClientService(); + runner.addControllerService("hbaseClient", hBaseClient); + runner.enableControllerService(hBaseClient); + runner.setProperty(PutHBaseCell.HBASE_CLIENT_SERVICE, "hbaseClient"); + runner.setProperty(PutHBaseCell.TABLE_NAME, "test"); + runner.setProperty(PutHBaseCell.COLUMN_QUALIFIER, "test"); + runner.setProperty(PutHBaseCell.COLUMN_FAMILY, "test"); + runner.assertValid(); + } + + @Test + public void testAllPresentOnFlowfile() { + runner.setProperty("visibility.test.test", "U&PII"); + + MockFlowFile ff = new MockFlowFile(System.currentTimeMillis()); + ff.putAttributes(new HashMap<String, String>(){{ + put("visibility.test.test", "U&PII&PHI"); + }}); + ProcessContext context = runner.getProcessContext(); + + String label = VisibilityUtil.pickVisibilityString("test", "test", ff, context); + + Assert.assertNotNull(label); + Assert.assertEquals("U&PII&PHI", label); + } + + @Test + public void testOnlyColumnFamilyOnFlowfile() { + runner.setProperty("visibility.test", "U&PII"); + + MockFlowFile ff = new MockFlowFile(System.currentTimeMillis()); + ff.putAttributes(new HashMap<String, String>(){{ + put("visibility.test", "U&PII&PHI"); + }}); + ProcessContext context = runner.getProcessContext(); + + String label = VisibilityUtil.pickVisibilityString("test", "test", ff, context); + + Assert.assertNotNull(label); + Assert.assertEquals("U&PII&PHI", label); + } + + @Test + public void testInvalidAttributes() { + runner.setProperty("visibility.test", "U&PII"); + + MockFlowFile ff = new MockFlowFile(System.currentTimeMillis()); + ff.putAttributes(new HashMap<String, String>(){{ + put("visibility..test", "U&PII&PHI"); + }}); + ProcessContext context = runner.getProcessContext(); + + String label = VisibilityUtil.pickVisibilityString("test", "test", ff, context); + + Assert.assertNotNull(label); + Assert.assertEquals("U&PII", label); + } + + @Test + public void testColumnFamilyAttributeOnly() { + MockFlowFile ff = new MockFlowFile(System.currentTimeMillis()); + ff.putAttributes(new HashMap<String, String>(){{ + put("visibility.test", "U&PII"); + }}); + ProcessContext context = runner.getProcessContext(); + + String label = VisibilityUtil.pickVisibilityString("test", "test", ff, context); + + Assert.assertNotNull(label); + Assert.assertEquals("U&PII", label); + } + + @Test + public void testNoAttributes() { + runner.setProperty("visibility.test", "U&PII"); + + MockFlowFile ff = new MockFlowFile(System.currentTimeMillis()); + ProcessContext context = runner.getProcessContext(); + + String label = VisibilityUtil.pickVisibilityString("test", "test", ff, context); + + Assert.assertNotNull(label); + Assert.assertEquals("U&PII", label); + + runner.setProperty("visibility.test.test", "U&PII&PHI"); + label = VisibilityUtil.pickVisibilityString("test", "test", ff, context); + + Assert.assertNotNull(label); + Assert.assertEquals("U&PII&PHI", label); + + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/0b851910/nifi-nar-bundles/nifi-standard-services/nifi-hbase-client-service-api/src/main/java/org/apache/nifi/hbase/DeleteRequest.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-hbase-client-service-api/src/main/java/org/apache/nifi/hbase/DeleteRequest.java b/nifi-nar-bundles/nifi-standard-services/nifi-hbase-client-service-api/src/main/java/org/apache/nifi/hbase/DeleteRequest.java new file mode 100644 index 0000000..da3fd5c --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-services/nifi-hbase-client-service-api/src/main/java/org/apache/nifi/hbase/DeleteRequest.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.hbase; + +/** + * Encapsulates the information for a delete operation. + */ +public class DeleteRequest { + private byte[] rowId; + private byte[] columnFamily; + private byte[] columnQualifier; + private String visibilityLabel; + + public DeleteRequest(byte[] rowId, byte[] columnFamily, byte[] columnQualifier, String visibilityLabel) { + this.rowId = rowId; + this.columnFamily = columnFamily; + this.columnQualifier = columnQualifier; + this.visibilityLabel = visibilityLabel; + } + + public byte[] getRowId() { + return rowId; + } + + public byte[] getColumnFamily() { + return columnFamily; + } + + public byte[] getColumnQualifier() { + return columnQualifier; + } + + public String getVisibilityLabel() { + return visibilityLabel; + } + + @Override + public String toString() { + return new StringBuilder() + .append(String.format("Row ID: %s\n", new String(rowId))) + .append(String.format("Column Family: %s\n", new String(columnFamily))) + .append(String.format("Column Qualifier: %s\n", new String(columnQualifier))) + .append(visibilityLabel != null ? String.format("Visibility Label: %s", visibilityLabel) : "") + .toString(); + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/0b851910/nifi-nar-bundles/nifi-standard-services/nifi-hbase-client-service-api/src/main/java/org/apache/nifi/hbase/HBaseClientService.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-hbase-client-service-api/src/main/java/org/apache/nifi/hbase/HBaseClientService.java b/nifi-nar-bundles/nifi-standard-services/nifi-hbase-client-service-api/src/main/java/org/apache/nifi/hbase/HBaseClientService.java index cff3fb6..cd3d851 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-hbase-client-service-api/src/main/java/org/apache/nifi/hbase/HBaseClientService.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-hbase-client-service-api/src/main/java/org/apache/nifi/hbase/HBaseClientService.java @@ -119,6 +119,18 @@ public interface HBaseClientService extends ControllerService { void delete(String tableName, byte[] rowId) throws IOException; /** + * Deletes the given row on HBase. Uses the supplied visibility label for all cells in the delete. + * It will fail if HBase cannot delete a cell because the visibility label on the cell does not match the specified + * label. + * + * @param tableName the name of an HBase table + * @param rowId the id of the row to delete + * @param visibilityLabel a visibility label to apply to the delete + * @throws IOException thrown when there are communication errors with HBase + */ + void delete(String tableName, byte[] rowId, String visibilityLabel) throws IOException; + + /** * Deletes a list of rows in HBase. All cells are deleted. * * @param tableName the name of an HBase table @@ -128,6 +140,25 @@ public interface HBaseClientService extends ControllerService { void delete(String tableName, List<byte[]> rowIds) throws IOException; /** + * Deletes a list of cells from HBase. This is intended to be used with granular delete operations. + * + * @param tableName the name of an HBase table. + * @param deletes a list of DeleteRequest objects. + * @throws IOException thrown when there are communication errors with HBase + */ + void deleteCells(String tableName, List<DeleteRequest> deletes) throws IOException; + + /** + * Deletes a list of rows in HBase. All cells that match the visibility label are deleted. + * + * @param tableName the name of an HBase table + * @param rowIds a list of rowIds to send in a batch delete + * @param visibilityLabel a visibility label expression + */ + + void delete(String tableName, List<byte[]> rowIds, String visibilityLabel) throws IOException; + + /** * Scans the given table using the optional filter criteria and passing each result to the provided handler. * * @param tableName the name of an HBase table to scan @@ -140,6 +171,19 @@ public interface HBaseClientService extends ControllerService { void scan(String tableName, Collection<Column> columns, String filterExpression, long minTime, ResultHandler handler) throws IOException; /** + * Scans the given table using the optional filter criteria and passing each result to the provided handler. + * + * @param tableName the name of an HBase table to scan + * @param columns optional columns to return, if not specified all columns are returned + * @param filterExpression optional filter expression, if not specified no filtering is performed + * @param minTime the minimum timestamp of cells to return, passed to the HBase scanner timeRange + * @param authorizations the visibility labels to apply to the scanner. + * @param handler a handler to process rows of the result set + * @throws IOException thrown when there are communication errors with HBase + */ + void scan(String tableName, Collection<Column> columns, String filterExpression, long minTime, List<String> authorizations, ResultHandler handler) throws IOException; + + /** * Scans the given table for the given rowId and passes the result to the handler. * * @param tableName the name of an HBase table to scan @@ -149,7 +193,7 @@ public interface HBaseClientService extends ControllerService { * @param handler a handler to process rows of the result * @throws IOException thrown when there are communication errors with HBase */ - void scan(String tableName, byte[] startRow, byte[] endRow, Collection<Column> columns, ResultHandler handler) throws IOException; + void scan(String tableName, byte[] startRow, byte[] endRow, Collection<Column> columns, List<String> authorizations, ResultHandler handler) throws IOException; /** * Scans the given table for the given range of row keys or time rage and passes the result to a handler.<br/> @@ -163,10 +207,11 @@ public interface HBaseClientService extends ControllerService { * @param limitRows the maximum number of rows to be returned by scanner * @param isReversed whether this scan is a reversed one. * @param columns optional columns to return, if not specified all columns are returned + * @param authorizations optional list of visibility labels that the user should be able to see when communicating with HBase * @param handler a handler to process rows of the result */ void scan(String tableName, String startRow, String endRow, String filterExpression, Long timerangeMin, Long timerangeMax, Integer limitRows, - Boolean isReversed, Collection<Column> columns, ResultHandler handler) throws IOException; + Boolean isReversed, Collection<Column> columns, List<String> authorizations, ResultHandler handler) throws IOException; /** * Converts the given boolean to it's byte representation. http://git-wip-us.apache.org/repos/asf/nifi/blob/0b851910/nifi-nar-bundles/nifi-standard-services/nifi-hbase-client-service-api/src/main/java/org/apache/nifi/hbase/put/PutColumn.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-hbase-client-service-api/src/main/java/org/apache/nifi/hbase/put/PutColumn.java b/nifi-nar-bundles/nifi-standard-services/nifi-hbase-client-service-api/src/main/java/org/apache/nifi/hbase/put/PutColumn.java index b29e032..7769165 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-hbase-client-service-api/src/main/java/org/apache/nifi/hbase/put/PutColumn.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-hbase-client-service-api/src/main/java/org/apache/nifi/hbase/put/PutColumn.java @@ -24,18 +24,24 @@ public class PutColumn { private final byte[] columnFamily; private final byte[] columnQualifier; private final byte[] buffer; + private final String visibility; private final Long timestamp; public PutColumn(final byte[] columnFamily, final byte[] columnQualifier, final byte[] buffer) { - this(columnFamily, columnQualifier, buffer, null); + this(columnFamily, columnQualifier, buffer, null, null); } public PutColumn(final byte[] columnFamily, final byte[] columnQualifier, final byte[] buffer, final Long timestamp) { + this(columnFamily, columnQualifier, buffer, timestamp, null); + } + + public PutColumn(final byte[] columnFamily, final byte[] columnQualifier, final byte[] buffer, final Long timestamp, final String visibility) { this.columnFamily = columnFamily; this.columnQualifier = columnQualifier; this.buffer = buffer; this.timestamp = timestamp; + this.visibility = visibility; } public byte[] getColumnFamily() { @@ -50,6 +56,10 @@ public class PutColumn { return buffer; } + public String getVisibility() { + return visibility; + } + public Long getTimestamp() { return timestamp; } http://git-wip-us.apache.org/repos/asf/nifi/blob/0b851910/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientMapCacheService.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientMapCacheService.java b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientMapCacheService.java index 7c10800..634ebd9 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientMapCacheService.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientMapCacheService.java @@ -51,7 +51,7 @@ import org.apache.nifi.processor.util.StandardValidators; @CapabilityDescription("Provides the ability to use an HBase table as a cache, in place of a DistributedMapCache." + " Uses a HBase_1_1_2_ClientService controller to communicate with HBase.") -public class HBase_1_1_2_ClientMapCacheService extends AbstractControllerService implements DistributedMapCacheClient { +public class HBase_1_1_2_ClientMapCacheService extends AbstractControllerService implements DistributedMapCacheClient, VisibilityLabelService { static final PropertyDescriptor HBASE_CLIENT_SERVICE = new PropertyDescriptor.Builder() .name("HBase Client Service") @@ -90,6 +90,7 @@ public class HBase_1_1_2_ClientMapCacheService extends AbstractControllerService protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { final List<PropertyDescriptor> descriptors = new ArrayList<>(); descriptors.add(HBASE_CACHE_TABLE_NAME); + descriptors.add(AUTHORIZATIONS); descriptors.add(HBASE_CLIENT_SERVICE); descriptors.add(HBASE_COLUMN_FAMILY); descriptors.add(HBASE_COLUMN_QUALIFIER); @@ -106,6 +107,8 @@ public class HBase_1_1_2_ClientMapCacheService extends AbstractControllerService private volatile String hBaseColumnQualifier; private volatile byte[] hBaseColumnQualifierBytes; + private List<String> authorizations; + @OnEnabled public void onConfigured(final ConfigurationContext context) throws InitializationException{ hBaseClientService = context.getProperty(HBASE_CLIENT_SERVICE).asControllerService(HBaseClientService.class); @@ -116,6 +119,8 @@ public class HBase_1_1_2_ClientMapCacheService extends AbstractControllerService hBaseColumnFamilyBytes = hBaseColumnFamily.getBytes(StandardCharsets.UTF_8); hBaseColumnQualifierBytes = hBaseColumnQualifier.getBytes(StandardCharsets.UTF_8); + + authorizations = getAuthorizations(context); } private <T> byte[] serialize(final T value, final Serializer<T> serializer) throws IOException { @@ -158,7 +163,7 @@ public class HBase_1_1_2_ClientMapCacheService extends AbstractControllerService final List<Column> columnsList = new ArrayList<Column>(0); - hBaseClientService.scan(hBaseCacheTableName, rowIdBytes, rowIdBytes, columnsList, handler); + hBaseClientService.scan(hBaseCacheTableName, rowIdBytes, rowIdBytes, columnsList, authorizations, handler); return (handler.numRows() > 0); } @@ -190,7 +195,7 @@ public class HBase_1_1_2_ClientMapCacheService extends AbstractControllerService final List<Column> columnsList = new ArrayList<Column>(0); - hBaseClientService.scan(hBaseCacheTableName, rowIdBytes, rowIdBytes, columnsList, handler); + hBaseClientService.scan(hBaseCacheTableName, rowIdBytes, rowIdBytes, columnsList, authorizations, handler); if (handler.numRows() > 1) { throw new IOException("Found multiple rows in HBase for key"); } else if(handler.numRows() == 1) { http://git-wip-us.apache.org/repos/asf/nifi/blob/0b851910/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientService.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientService.java b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientService.java index ccbed60..01022e3 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientService.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientService.java @@ -25,14 +25,17 @@ import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; -import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.exceptions.DeserializationException; import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hbase.filter.ParseFilter; +import org.apache.hadoop.hbase.security.visibility.Authorizations; +import org.apache.hadoop.hbase.security.visibility.CellVisibility; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.security.UserGroupInformation; import org.apache.nifi.annotation.behavior.DynamicProperty; @@ -62,6 +65,7 @@ import org.slf4j.LoggerFactory; import java.io.File; import java.io.IOException; +import java.io.UnsupportedEncodingException; import java.nio.charset.StandardCharsets; import java.security.PrivilegedExceptionAction; import java.util.ArrayList; @@ -115,6 +119,10 @@ public class HBase_1_1_2_ClientService extends AbstractControllerService impleme return connection; } + protected void setConnection(Connection connection) { + this.connection = connection; + } + @Override protected void init(ControllerServiceInitializationContext config) throws InitializationException { kerberosConfigFile = config.getKerberosConfigurationFile(); @@ -315,6 +323,8 @@ public class HBase_1_1_2_ClientService extends AbstractControllerService impleme } + private String principal = null; + protected Configuration getConfigurationFromFiles(final String configFiles) { final Configuration hbaseConfig = HBaseConfiguration.create(); if (StringUtils.isNotBlank(configFiles)) { @@ -336,51 +346,85 @@ public class HBase_1_1_2_ClientService extends AbstractControllerService impleme } } + private static final byte[] EMPTY_VIS_STRING; + + static { + try { + EMPTY_VIS_STRING = "".getBytes("UTF-8"); + } catch (UnsupportedEncodingException e) { + throw new RuntimeException(e); + } + } + + private List<Put> buildPuts(byte[] rowKey, List<PutColumn> columns) { + List<Put> retVal = new ArrayList<>(); + + try { + Put put = null; + + for (final PutColumn column : columns) { + if (put == null || (put.getCellVisibility() == null && column.getVisibility() != null) || ( put.getCellVisibility() != null + && !put.getCellVisibility().getExpression().equals(column.getVisibility()) + )) { + put = new Put(rowKey); + + if (column.getVisibility() != null) { + put.setCellVisibility(new CellVisibility(column.getVisibility())); + } + retVal.add(put); + } + + if (column.getTimestamp() != null) { + put.addColumn( + column.getColumnFamily(), + column.getColumnQualifier(), + column.getTimestamp(), + column.getBuffer()); + } else { + put.addColumn( + column.getColumnFamily(), + column.getColumnQualifier(), + column.getBuffer()); + } + } + } catch (DeserializationException de) { + getLogger().error("Error writing cell visibility statement.", de); + throw new RuntimeException(de); + } + + return retVal; + } + @Override public void put(final String tableName, final Collection<PutFlowFile> puts) throws IOException { try (final Table table = connection.getTable(TableName.valueOf(tableName))) { // Create one Put per row.... - final Map<String, Put> rowPuts = new HashMap<>(); + final Map<String, List<PutColumn>> sorted = new HashMap<>(); + final List<Put> newPuts = new ArrayList<>(); + for (final PutFlowFile putFlowFile : puts) { - //this is used for the map key as a byte[] does not work as a key. final String rowKeyString = new String(putFlowFile.getRow(), StandardCharsets.UTF_8); - Put put = rowPuts.get(rowKeyString); - if (put == null) { - put = new Put(putFlowFile.getRow()); - rowPuts.put(rowKeyString, put); + List<PutColumn> columns = sorted.get(rowKeyString); + if (columns == null) { + columns = new ArrayList<>(); + sorted.put(rowKeyString, columns); } - for (final PutColumn column : putFlowFile.getColumns()) { - if (column.getTimestamp() != null) { - put.addColumn( - column.getColumnFamily(), - column.getColumnQualifier(), - column.getTimestamp(), - column.getBuffer()); - } else { - put.addColumn( - column.getColumnFamily(), - column.getColumnQualifier(), - column.getBuffer()); - } - } + columns.addAll(putFlowFile.getColumns()); + } + + for (final Map.Entry<String, List<PutColumn>> entry : sorted.entrySet()) { + newPuts.addAll(buildPuts(entry.getKey().getBytes(StandardCharsets.UTF_8), entry.getValue())); } - table.put(new ArrayList<>(rowPuts.values())); + table.put(newPuts); } } @Override public void put(final String tableName, final byte[] rowId, final Collection<PutColumn> columns) throws IOException { try (final Table table = connection.getTable(TableName.valueOf(tableName))) { - Put put = new Put(rowId); - for (final PutColumn column : columns) { - put.addColumn( - column.getColumnFamily(), - column.getColumnQualifier(), - column.getBuffer()); - } - table.put(put); + table.put(buildPuts(rowId, new ArrayList(columns))); } } @@ -398,18 +442,54 @@ public class HBase_1_1_2_ClientService extends AbstractControllerService impleme @Override public void delete(final String tableName, final byte[] rowId) throws IOException { + delete(tableName, rowId, null); + } + + @Override + public void delete(String tableName, byte[] rowId, String visibilityLabel) throws IOException { try (final Table table = connection.getTable(TableName.valueOf(tableName))) { Delete delete = new Delete(rowId); + if (!StringUtils.isEmpty(visibilityLabel)) { + delete.setCellVisibility(new CellVisibility(visibilityLabel)); + } table.delete(delete); } } @Override public void delete(String tableName, List<byte[]> rowIds) throws IOException { + delete(tableName, rowIds); + } + + @Override + public void deleteCells(String tableName, List<DeleteRequest> deletes) throws IOException { + List<Delete> deleteRequests = new ArrayList<>(); + for (int index = 0; index < deletes.size(); index++) { + DeleteRequest req = deletes.get(index); + Delete delete = new Delete(req.getRowId()) + .addColumn(req.getColumnFamily(), req.getColumnQualifier()); + if (!StringUtils.isEmpty(req.getVisibilityLabel())) { + delete.setCellVisibility(new CellVisibility(req.getVisibilityLabel())); + } + deleteRequests.add(delete); + } + batchDelete(tableName, deleteRequests); + } + + @Override + public void delete(String tableName, List<byte[]> rowIds, String visibilityLabel) throws IOException { List<Delete> deletes = new ArrayList<>(); for (int index = 0; index < rowIds.size(); index++) { - deletes.add(new Delete(rowIds.get(index))); + Delete delete = new Delete(rowIds.get(index)); + if (!StringUtils.isBlank(visibilityLabel)) { + delete.setCellVisibility(new CellVisibility(visibilityLabel)); + } + deletes.add(delete); } + batchDelete(tableName, deletes); + } + + private void batchDelete(String tableName, List<Delete> deletes) throws IOException { try (final Table table = connection.getTable(TableName.valueOf(tableName))) { table.delete(deletes); } @@ -418,7 +498,11 @@ public class HBase_1_1_2_ClientService extends AbstractControllerService impleme @Override public void scan(final String tableName, final Collection<Column> columns, final String filterExpression, final long minTime, final ResultHandler handler) throws IOException { + scan(tableName, columns, filterExpression, minTime, null, handler); + } + @Override + public void scan(String tableName, Collection<Column> columns, String filterExpression, long minTime, List<String> visibilityLabels, ResultHandler handler) throws IOException { Filter filter = null; if (!StringUtils.isBlank(filterExpression)) { ParseFilter parseFilter = new ParseFilter(); @@ -426,7 +510,7 @@ public class HBase_1_1_2_ClientService extends AbstractControllerService impleme } try (final Table table = connection.getTable(TableName.valueOf(tableName)); - final ResultScanner scanner = getResults(table, columns, filter, minTime)) { + final ResultScanner scanner = getResults(table, columns, filter, minTime, visibilityLabels)) { for (final Result result : scanner) { final byte[] rowKey = result.getRow(); @@ -451,11 +535,11 @@ public class HBase_1_1_2_ClientService extends AbstractControllerService impleme } @Override - public void scan(final String tableName, final byte[] startRow, final byte[] endRow, final Collection<Column> columns, final ResultHandler handler) + public void scan(final String tableName, final byte[] startRow, final byte[] endRow, final Collection<Column> columns, List<String> authorizations, final ResultHandler handler) throws IOException { try (final Table table = connection.getTable(TableName.valueOf(tableName)); - final ResultScanner scanner = getResults(table, startRow, endRow, columns)) { + final ResultScanner scanner = getResults(table, startRow, endRow, columns, authorizations)) { for (final Result result : scanner) { final byte[] rowKey = result.getRow(); @@ -482,11 +566,11 @@ public class HBase_1_1_2_ClientService extends AbstractControllerService impleme @Override public void scan(final String tableName, final String startRow, final String endRow, String filterExpression, final Long timerangeMin, final Long timerangeMax, final Integer limitRows, final Boolean isReversed, - final Collection<Column> columns, final ResultHandler handler) throws IOException { + final Collection<Column> columns, List<String> visibilityLabels, final ResultHandler handler) throws IOException { try (final Table table = connection.getTable(TableName.valueOf(tableName)); final ResultScanner scanner = getResults(table, startRow, endRow, filterExpression, timerangeMin, - timerangeMax, limitRows, isReversed, columns)) { + timerangeMax, limitRows, isReversed, columns, visibilityLabels)) { int cnt = 0; final int lim = limitRows != null ? limitRows : 0; @@ -515,12 +599,11 @@ public class HBase_1_1_2_ClientService extends AbstractControllerService impleme handler.handle(rowKey, resultCells); } } - } // protected ResultScanner getResults(final Table table, final String startRow, final String endRow, final String filterExpression, final Long timerangeMin, final Long timerangeMax, - final Integer limitRows, final Boolean isReversed, final Collection<Column> columns) throws IOException { + final Integer limitRows, final Boolean isReversed, final Collection<Column> columns, List<String> authorizations) throws IOException { final Scan scan = new Scan(); if (!StringUtils.isBlank(startRow)){ scan.setStartRow(startRow.getBytes(StandardCharsets.UTF_8)); @@ -529,6 +612,9 @@ public class HBase_1_1_2_ClientService extends AbstractControllerService impleme scan.setStopRow( endRow.getBytes(StandardCharsets.UTF_8)); } + if (authorizations != null && authorizations.size() > 0) { + scan.setAuthorizations(new Authorizations(authorizations)); + } Filter filter = null; if (columns != null) { @@ -565,12 +651,16 @@ public class HBase_1_1_2_ClientService extends AbstractControllerService impleme } // protected and extracted into separate method for testing - protected ResultScanner getResults(final Table table, final byte[] startRow, final byte[] endRow, final Collection<Column> columns) throws IOException { + protected ResultScanner getResults(final Table table, final byte[] startRow, final byte[] endRow, final Collection<Column> columns, List<String> authorizations) throws IOException { final Scan scan = new Scan(); scan.setStartRow(startRow); scan.setStopRow(endRow); - if (columns != null) { + if (authorizations != null && authorizations.size() > 0) { + scan.setAuthorizations(new Authorizations(authorizations)); + } + + if (columns != null && columns.size() > 0) { for (Column col : columns) { if (col.getQualifier() == null) { scan.addFamily(col.getFamily()); @@ -584,7 +674,7 @@ public class HBase_1_1_2_ClientService extends AbstractControllerService impleme } // protected and extracted into separate method for testing - protected ResultScanner getResults(final Table table, final Collection<Column> columns, final Filter filter, final long minTime) throws IOException { + protected ResultScanner getResults(final Table table, final Collection<Column> columns, final Filter filter, final long minTime, List<String> authorizations) throws IOException { // Create a new scan. We will set the min timerange as the latest timestamp that // we have seen so far. The minimum timestamp is inclusive, so we will get duplicates. // We will record any cells that have the latest timestamp, so that when we scan again, @@ -592,6 +682,10 @@ public class HBase_1_1_2_ClientService extends AbstractControllerService impleme final Scan scan = new Scan(); scan.setTimeRange(minTime, Long.MAX_VALUE); + if (authorizations != null && authorizations.size() > 0) { + scan.setAuthorizations(new Authorizations(authorizations)); + } + if (filter != null) { scan.setFilter(filter); } http://git-wip-us.apache.org/repos/asf/nifi/blob/0b851910/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_RecordLookupService.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_RecordLookupService.java b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_RecordLookupService.java index ddaa765..12fb6cf 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_RecordLookupService.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_RecordLookupService.java @@ -54,7 +54,7 @@ import java.util.Set; @Tags({"hbase", "record", "lookup", "service"}) @CapabilityDescription("A lookup service that retrieves one or more columns from HBase and returns them as a record. The lookup coordinates " + "must contain 'rowKey' which will be the HBase row id.") -public class HBase_1_1_2_RecordLookupService extends AbstractControllerService implements LookupService<Record> { +public class HBase_1_1_2_RecordLookupService extends AbstractControllerService implements LookupService<Record>, VisibilityLabelService { static final String ROW_KEY_KEY = "rowKey"; private static final Set<String> REQUIRED_KEYS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(ROW_KEY_KEY))); @@ -98,6 +98,7 @@ public class HBase_1_1_2_RecordLookupService extends AbstractControllerService i final List<PropertyDescriptor> props = new ArrayList<>(); props.add(HBASE_CLIENT_SERVICE); props.add(TABLE_NAME); + props.add(AUTHORIZATIONS); props.add(RETURN_COLUMNS); props.add(CHARSET); PROPERTIES = Collections.unmodifiableList(props); @@ -107,6 +108,7 @@ public class HBase_1_1_2_RecordLookupService extends AbstractControllerService i private List<Column> columns; private Charset charset; private HBaseClientService hBaseClientService; + private List<String> authorizations; @Override protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { @@ -127,7 +129,8 @@ public class HBase_1_1_2_RecordLookupService extends AbstractControllerService i final byte[] rowKeyBytes = rowKey.getBytes(StandardCharsets.UTF_8); try { final Map<String, Object> values = new HashMap<>(); - hBaseClientService.scan(tableName, rowKeyBytes, rowKeyBytes, columns, (byte[] row, ResultCell[] resultCells) -> { + + hBaseClientService.scan(tableName, rowKeyBytes, rowKeyBytes, columns, authorizations, (byte[] row, ResultCell[] resultCells) -> { for (final ResultCell cell : resultCells) { final byte[] qualifier = Arrays.copyOfRange(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierOffset() + cell.getQualifierLength()); final byte[] value = Arrays.copyOfRange(cell.getValueArray(), cell.getValueOffset(), cell.getValueOffset() + cell.getValueLength()); @@ -167,6 +170,7 @@ public class HBase_1_1_2_RecordLookupService extends AbstractControllerService i this.tableName = context.getProperty(TABLE_NAME).getValue(); this.columns = getColumns(context.getProperty(RETURN_COLUMNS).getValue()); this.charset = Charset.forName(context.getProperty(CHARSET).getValue()); + this.authorizations = getAuthorizations(context); } @OnDisabled http://git-wip-us.apache.org/repos/asf/nifi/blob/0b851910/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/IntegrationTestClientService.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/IntegrationTestClientService.java b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/IntegrationTestClientService.java new file mode 100644 index 0000000..452f6c2 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/IntegrationTestClientService.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.hbase; + +import org.apache.hadoop.hbase.client.Connection; +import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.reporting.InitializationException; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; + +public class IntegrationTestClientService extends HBase_1_1_2_ClientService { + IntegrationTestClientService(Connection hbaseConnection) { + super(); + setConnection(hbaseConnection); + } + + + @Override + protected Collection<ValidationResult> customValidate(ValidationContext validationContext) { + return new ArrayList<>(); + } + + @OnEnabled + public void onEnabled(final ConfigurationContext context) throws InitializationException, IOException, InterruptedException { + + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/0b851910/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/VisibilityLabelService.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/VisibilityLabelService.java b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/VisibilityLabelService.java new file mode 100644 index 0000000..9c9ab72 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/VisibilityLabelService.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.hbase; + +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.util.StringUtils; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +public interface VisibilityLabelService { + PropertyDescriptor AUTHORIZATIONS = new PropertyDescriptor.Builder() + .name("hb-lu-authorizations") + .displayName("Authorizations") + .description("The list of authorization tokens to be used with cell visibility if it is enabled. These will be used to " + + "override the default authorization list for the user accessing HBase.") + .required(false) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + default List<String> getAuthorizations(ConfigurationContext context) { + List<String> tokens = new ArrayList<>(); + String authorizationString = context.getProperty(AUTHORIZATIONS).isSet() + ? context.getProperty(AUTHORIZATIONS).getValue() + : ""; + if (!StringUtils.isEmpty(authorizationString)) { + tokens = Arrays.asList(authorizationString.split(",[\\s]*")); + } + + return tokens; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/0b851910/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/test/java/org/apache/nifi/hbase/MockHBaseClientService.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/test/java/org/apache/nifi/hbase/MockHBaseClientService.java b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/test/java/org/apache/nifi/hbase/MockHBaseClientService.java index e4b9280..917cb3b 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/test/java/org/apache/nifi/hbase/MockHBaseClientService.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/test/java/org/apache/nifi/hbase/MockHBaseClientService.java @@ -142,21 +142,19 @@ public class MockHBaseClientService extends HBase_1_1_2_ClientService { return true; } - @Override - protected ResultScanner getResults(Table table, byte[] startRow, byte[] endRow, Collection<Column> columns) throws IOException { + protected ResultScanner getResults(Table table, byte[] startRow, byte[] endRow, Collection<Column> columns, List<String> labels) throws IOException { final ResultScanner scanner = Mockito.mock(ResultScanner.class); Mockito.when(scanner.iterator()).thenReturn(results.iterator()); return scanner; } @Override - protected ResultScanner getResults(Table table, Collection<Column> columns, Filter filter, long minTime) throws IOException { + protected ResultScanner getResults(Table table, Collection<Column> columns, Filter filter, long minTime, List<String> labels) throws IOException { final ResultScanner scanner = Mockito.mock(ResultScanner.class); Mockito.when(scanner.iterator()).thenReturn(results.iterator()); return scanner; } - @Override protected ResultScanner getResults(final Table table, final String startRow, final String endRow, final String filterExpression, final Long timerangeMin, final Long timerangeMax, final Integer limitRows, final Boolean isReversed, final Collection<Column> columns) throws IOException { final ResultScanner scanner = Mockito.mock(ResultScanner.class);