keith-turner closed pull request #1032: fixes #1026 enable scanning
notifications
URL: https://github.com/apache/fluo/pull/1032
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/.travis.yml b/.travis.yml
index b071db0d..c996f5c0 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -27,6 +27,6 @@ jdk:
- openjdk8
env:
- ADDITIONAL_MAVEN_OPTS=
- - ADDITIONAL_MAVEN_OPTS=-Daccumulo.version=1.8.1
+ - ADDITIONAL_MAVEN_OPTS=-Daccumulo.version=1.9.0
script:
- mvn clean verify javadoc:jar $ADDITIONAL_MAVEN_OPTS
diff --git
a/modules/cluster/src/main/java/org/apache/fluo/cluster/runner/AppRunner.java
b/modules/cluster/src/main/java/org/apache/fluo/cluster/runner/AppRunner.java
index 3ec090f5..355e16ba 100644
---
a/modules/cluster/src/main/java/org/apache/fluo/cluster/runner/AppRunner.java
+++
b/modules/cluster/src/main/java/org/apache/fluo/cluster/runner/AppRunner.java
@@ -19,6 +19,7 @@
import java.lang.reflect.Method;
import java.util.Arrays;
import java.util.Collections;
+import java.util.EnumSet;
import java.util.List;
import javax.inject.Provider;
@@ -37,6 +38,7 @@
import org.apache.fluo.core.impl.Environment;
import org.apache.fluo.core.impl.Notification;
import org.apache.fluo.core.util.ScanUtil;
+import org.apache.fluo.core.util.ScanUtil.ScanFlags;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -233,8 +235,13 @@ public String getRowPrefix() {
}
public ScanUtil.ScanOpts getScanOpts() {
- return new ScanUtil.ScanOpts(startRow, endRow, columns, exactRow,
rowPrefix, help,
- hexEncNonAscii, scanAccumuloTable, false);
+ EnumSet<ScanFlags> flags = EnumSet.noneOf(ScanFlags.class);
+
+ ScanUtil.setFlag(flags, help, ScanFlags.HELP);
+ ScanUtil.setFlag(flags, hexEncNonAscii, ScanFlags.HEX);
+ ScanUtil.setFlag(flags, scanAccumuloTable, ScanFlags.ACCUMULO);
+
+ return new ScanUtil.ScanOpts(startRow, endRow, columns, exactRow,
rowPrefix, flags);
}
}
}
diff --git
a/modules/command/src/main/java/org/apache/fluo/command/FluoScan.java
b/modules/command/src/main/java/org/apache/fluo/command/FluoScan.java
index bcd03c07..3844e4d7 100644
--- a/modules/command/src/main/java/org/apache/fluo/command/FluoScan.java
+++ b/modules/command/src/main/java/org/apache/fluo/command/FluoScan.java
@@ -17,12 +17,14 @@
import java.io.IOException;
import java.util.Collections;
+import java.util.EnumSet;
import java.util.List;
import com.beust.jcommander.Parameter;
import org.apache.fluo.api.config.FluoConfiguration;
import org.apache.fluo.core.client.FluoAdminImpl;
import org.apache.fluo.core.util.ScanUtil;
+import org.apache.fluo.core.util.ScanUtil.ScanFlags;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
@@ -59,6 +61,9 @@
description = "Export key/values stored in Accumulo as JSON file.")
public boolean exportAsJson = false;
+ @Parameter(names = "--ntfy", help = true, description = "Scan active
notifications")
+ public boolean scanNtfy = false;
+
public String getStartRow() {
return startRow;
}
@@ -90,11 +95,23 @@ private void checkScanOptions() {
throw new IllegalArgumentException(
"Both \"--raw\" and \"--json\" can not be set together.");
}
+
+ if (this.scanAccumuloTable && this.scanNtfy) {
+ throw new IllegalArgumentException(
+ "Both \"--raw\" and \"--ntfy\" can not be set together.");
+ }
}
public ScanUtil.ScanOpts getScanOpts() {
- return new ScanUtil.ScanOpts(startRow, endRow, columns, exactRow,
rowPrefix, help,
- hexEncNonAscii, scanAccumuloTable, exportAsJson);
+ EnumSet<ScanFlags> flags = EnumSet.noneOf(ScanFlags.class);
+
+ ScanUtil.setFlag(flags, help, ScanFlags.HELP);
+ ScanUtil.setFlag(flags, hexEncNonAscii, ScanFlags.HEX);
+ ScanUtil.setFlag(flags, scanAccumuloTable, ScanFlags.ACCUMULO);
+ ScanUtil.setFlag(flags, exportAsJson, ScanFlags.JSON);
+ ScanUtil.setFlag(flags, scanNtfy, ScanFlags.NTFY);
+
+ return new ScanUtil.ScanOpts(startRow, endRow, columns, exactRow,
rowPrefix, flags);
}
public static ScanOptions parse(String[] args) {
@@ -114,14 +131,17 @@ public static void main(String[] args) {
FluoConfiguration config = CommandUtil.resolveFluoConfig();
config.setApplicationName(options.getApplicationName());
options.overrideFluoConfig(config);
- CommandUtil.verifyAppRunning(config);
try {
options.overrideFluoConfig(config);
if (options.scanAccumuloTable) {
config = FluoAdminImpl.mergeZookeeperConfig(config);
ScanUtil.scanAccumulo(options.getScanOpts(), config, System.out);
+ } else if (options.scanNtfy) {
+ config = FluoAdminImpl.mergeZookeeperConfig(config);
+ ScanUtil.scanNotifications(options.getScanOpts(), config, System.out);
} else {
+ CommandUtil.verifyAppRunning(config);
ScanUtil.scanFluo(options.getScanOpts(), config, System.out);
}
} catch (RuntimeException | IOException e) {
diff --git
a/modules/core/src/main/java/org/apache/fluo/core/util/NotificationScanner.java
b/modules/core/src/main/java/org/apache/fluo/core/util/NotificationScanner.java
new file mode 100644
index 00000000..04afc13d
--- /dev/null
+++
b/modules/core/src/main/java/org/apache/fluo/core/util/NotificationScanner.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional
information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express
+ * or implied. See the License for the specific language governing permissions
and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.core.util;
+
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.function.Predicate;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Iterators;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Value;
+import org.apache.fluo.api.client.scanner.CellScanner;
+import org.apache.fluo.api.data.Bytes;
+import org.apache.fluo.api.data.Column;
+import org.apache.fluo.api.data.RowColumnValue;
+import org.apache.fluo.core.impl.Notification;
+
+import static java.util.stream.Collectors.toSet;
+
+public class NotificationScanner implements CellScanner {
+
+ private Iterable<Entry<Key, Value>> scanner;
+ private Predicate<RowColumnValue> filter;
+
+ private static Predicate<RowColumnValue>
createColumnFilter(Collection<Column> allColumns) {
+ if (allColumns.size() == 0) {
+ return rcv -> true;
+ } else {
+ Set<Bytes> families = allColumns.stream().filter(col ->
!col.isQualifierSet())
+ .map(col -> col.getFamily()).collect(toSet());
+ Set<Column> columns =
+ allColumns.stream().filter(col ->
col.isQualifierSet()).collect(toSet());
+
+ if (families.size() == 0) {
+ return rcv -> columns.contains(rcv.getColumn());
+ } else if (columns.size() == 0) {
+ return rcv -> families.contains(rcv.getColumn().getFamily());
+ } else {
+ return rcv -> families.contains(rcv.getColumn().getFamily())
+ || columns.contains(rcv.getColumn());
+ }
+ }
+ }
+
+ NotificationScanner(Scanner scanner, Collection<Column> columns) {
+ this(scanner, createColumnFilter(columns));
+ }
+
+ NotificationScanner(Scanner scanner, Predicate<RowColumnValue> filter) {
+ scanner.clearColumns();
+ Notification.configureScanner(scanner);
+ this.scanner = scanner;
+ this.filter = filter;
+ }
+
+ @VisibleForTesting
+ NotificationScanner(Iterable<Entry<Key, Value>> scanner, Collection<Column>
columns) {
+ this.scanner = scanner;
+ this.filter = createColumnFilter(columns);
+ }
+
+ @Override
+ public Iterator<RowColumnValue> iterator() {
+ Iterator<RowColumnValue> iter = Iterators.transform(scanner.iterator(),
entry -> {
+ Notification n = Notification.from(entry.getKey());
+ return new RowColumnValue(n.getRow(), n.getColumn(),
Bytes.of(entry.getValue().get()));
+ });
+
+ return Iterators.filter(iter, rcv -> filter.test(rcv));
+ }
+}
diff --git a/modules/core/src/main/java/org/apache/fluo/core/util/ScanUtil.java
b/modules/core/src/main/java/org/apache/fluo/core/util/ScanUtil.java
index 4690fd5a..a6c8e68a 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/util/ScanUtil.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/util/ScanUtil.java
@@ -20,14 +20,21 @@
import java.text.DateFormat;
import java.util.Collection;
import java.util.Collections;
+import java.util.EnumSet;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
+import com.google.common.collect.Iterables;
+import com.google.gson.FieldNamingPolicy;
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import com.google.gson.JsonIOException;
import org.apache.accumulo.core.client.Connector;
import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.TableNotFoundException;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.fluo.accumulo.format.FluoFormatter;
import org.apache.fluo.api.client.FluoClient;
@@ -40,12 +47,6 @@
import org.apache.fluo.api.data.RowColumnValue;
import org.apache.fluo.api.data.Span;
-import com.google.common.collect.Iterables;
-import com.google.gson.FieldNamingPolicy;
-import com.google.gson.Gson;
-import com.google.gson.GsonBuilder;
-import com.google.gson.JsonIOException;
-
public class ScanUtil {
public static final String FLUO_VALUE = "value";
public static final String FLUO_COLUMN_VISIBILITY = "visibility";
@@ -113,6 +114,31 @@ public static Span getSpan(ScanOpts options) {
}
}
+
+ private static void scan(ScanOpts options, PrintStream out, CellScanner
cellScanner) {
+ Function<Bytes, String> encoder = getEncoder(options);
+
+ if (options.exportAsJson) {
+ generateJson(cellScanner, encoder, out);
+ } else {
+ for (RowColumnValue rcv : cellScanner) {
+ out.print(encoder.apply(rcv.getRow()));
+ out.print(' ');
+ out.print(encoder.apply(rcv.getColumn().getFamily()));
+ out.print(' ');
+ out.print(encoder.apply(rcv.getColumn().getQualifier()));
+ out.print(' ');
+ out.print(encoder.apply(rcv.getColumn().getVisibility()));
+ out.print("\t");
+ out.print(encoder.apply(rcv.getValue()));
+ out.println();
+ if (out.checkError()) {
+ break;
+ }
+ }
+ }
+ }
+
public static void scanFluo(ScanOpts options, FluoConfiguration sConfig,
PrintStream out)
throws IOException {
@@ -122,27 +148,34 @@ public static void scanFluo(ScanOpts options,
FluoConfiguration sConfig, PrintSt
Span span = getSpan(options);
Collection<Column> columns = getColumns(options);
CellScanner cellScanner =
s.scanner().over(span).fetch(columns).build();
- Function<Bytes, String> encoder = getEncoder(options);
- if (options.exportAsJson) {
- generateJson(cellScanner, encoder, out);
- } else {
- for (RowColumnValue rcv : cellScanner) {
- out.print(encoder.apply(rcv.getRow()));
- out.print(' ');
- out.print(encoder.apply(rcv.getColumn().getFamily()));
- out.print(' ');
- out.print(encoder.apply(rcv.getColumn().getQualifier()));
- out.print(' ');
- out.print(encoder.apply(rcv.getColumn().getVisibility()));
- out.print("\t");
- out.print(encoder.apply(rcv.getValue()));
- out.println();
- if (out.checkError()) {
- break;
- }
- }
- }
+ scan(options, out, cellScanner);
+ }
+ }
+ }
+
+ public static void scanNotifications(ScanOpts options, FluoConfiguration
sConfig, PrintStream out)
+ throws IOException {
+
+ Connector conn = AccumuloUtil.getConnector(sConfig);
+
+ Span span = getSpan(options);
+ Collection<Column> columns = getColumns(options);
+
+ Scanner scanner = null;
+ try {
+ scanner = conn.createScanner(sConfig.getAccumuloTable(),
Authorizations.EMPTY);
+
+ scanner.setRange(SpanUtil.toRange(span));
+
+ NotificationScanner ntfyScanner = new NotificationScanner(scanner,
columns);
+
+ scan(options, out, ntfyScanner);
+ } catch (TableNotFoundException e) {
+ throw new RuntimeException(e);
+ } finally {
+ if (scanner != null) {
+ scanner.close();
}
}
}
@@ -203,6 +236,18 @@ public static void scanAccumulo(ScanOpts options,
FluoConfiguration sConfig, Pri
}
}
+ public static enum ScanFlags {
+ HELP,
+ // hex encode node ascii
+ HEX,
+ // scan accumuo table directly
+ ACCUMULO,
+ // endode output as json
+ JSON,
+ // scan notification
+ NTFY
+ }
+
public static class ScanOpts {
private String startRow;
@@ -210,23 +255,24 @@ public static void scanAccumulo(ScanOpts options,
FluoConfiguration sConfig, Pri
private List<String> columns;
private String exactRow;
private String rowPrefix;
- public boolean help;
- public boolean hexEncNonAscii = true;
- public boolean scanAccumuloTable = false;
- public boolean exportAsJson = false;
+ public final boolean help;
+ public final boolean hexEncNonAscii;
+ public final boolean scanAccumuloTable;
+ public final boolean exportAsJson;
+ public final boolean scanNtfy;
public ScanOpts(String startRow, String endRow, List<String> columns,
String exactRow,
- String rowPrefix, boolean help, boolean hexEncNonAscii, boolean
scanAccumuloTable,
- boolean exportAsJson) {
+ String rowPrefix, EnumSet<ScanFlags> flags) {
this.startRow = startRow;
this.endRow = endRow;
this.columns = columns;
this.exactRow = exactRow;
this.rowPrefix = rowPrefix;
- this.help = help;
- this.hexEncNonAscii = hexEncNonAscii;
- this.scanAccumuloTable = scanAccumuloTable;
- this.exportAsJson = exportAsJson;
+ this.help = flags.contains(ScanFlags.HELP);
+ this.hexEncNonAscii = flags.contains(ScanFlags.HEX);
+ this.scanAccumuloTable = flags.contains(ScanFlags.ACCUMULO);
+ this.exportAsJson = flags.contains(ScanFlags.JSON);
+ this.scanNtfy = flags.contains(ScanFlags.NTFY);
}
public String getStartRow() {
@@ -252,4 +298,10 @@ public String getRowPrefix() {
return columns;
}
}
+
+ public static void setFlag(EnumSet<ScanFlags> flags, boolean b, ScanFlags
flag) {
+ if (b) {
+ flags.add(flag);
+ }
+ }
}
diff --git
a/modules/core/src/test/java/org/apache/fluo/core/util/NotificationScannerTest.java
b/modules/core/src/test/java/org/apache/fluo/core/util/NotificationScannerTest.java
new file mode 100644
index 00000000..0228243e
--- /dev/null
+++
b/modules/core/src/test/java/org/apache/fluo/core/util/NotificationScannerTest.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional
information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express
+ * or implied. See the License for the specific language governing permissions
and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.core.util;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map.Entry;
+import java.util.TreeMap;
+
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Value;
+import org.apache.fluo.accumulo.util.ColumnConstants;
+import org.apache.fluo.accumulo.util.NotificationUtil;
+import org.apache.fluo.api.data.Column;
+import org.apache.fluo.api.data.RowColumnValue;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class NotificationScannerTest {
+
+ private static class Data implements Iterable<Entry<Key, Value>> {
+ TreeMap<Key, Value> data = new TreeMap<>();
+
+ void putNtfy(String row, String fam, String qual) {
+ byte[] r = row.getBytes(StandardCharsets.UTF_8);
+ byte[] f = ColumnConstants.NOTIFY_CF.toArray();
+ byte[] q = NotificationUtil.encodeCol(new Column(fam, qual));
+
+ data.put(new Key(r, f, q, new byte[0], 42L), new Value(new byte[0]));
+ }
+
+ @Override
+ public Iterator<Entry<Key, Value>> iterator() {
+ return data.entrySet().iterator();
+ }
+ }
+
+
+ /**
+ * When scanning notifications, column filtering is done on the client side.
This test ensures
+ * that filtering works correctly.
+ */
+ @Test
+ public void testColumnFiltering() {
+
+ Data data = new Data();
+ data.putNtfy("r001", "f8", "q2");
+ data.putNtfy("r001", "f9", "q1");
+ data.putNtfy("r002", "f8", "q2");
+ data.putNtfy("r002", "f8", "q3");
+ data.putNtfy("r004", "f9", "q3");
+ data.putNtfy("r004", "f9", "q4");
+
+ HashSet<RowColumnValue> expected = new HashSet<>();
+ expected.add(new RowColumnValue("r001", new Column("f8", "q2"), ""));
+ expected.add(new RowColumnValue("r001", new Column("f9", "q1"), ""));
+ expected.add(new RowColumnValue("r002", new Column("f8", "q2"), ""));
+ expected.add(new RowColumnValue("r002", new Column("f8", "q3"), ""));
+ expected.add(new RowColumnValue("r004", new Column("f9", "q3"), ""));
+ expected.add(new RowColumnValue("r004", new Column("f9", "q4"), ""));
+
+ NotificationScanner scanner = new NotificationScanner(data,
Collections.emptySet());
+ HashSet<RowColumnValue> actual = new HashSet<>();
+ scanner.forEach(actual::add);
+ Assert.assertEquals(expected, actual);
+
+ scanner = new NotificationScanner(data, Arrays.asList(new Column("f9")));
+ actual.clear();
+ scanner.forEach(actual::add);
+ HashSet<RowColumnValue> expected2 = new HashSet<>();
+ expected.stream().filter(rcv -> rcv.getColumn().getsFamily().equals("f9"))
+ .forEach(expected2::add);
+ Assert.assertEquals(expected2, actual);
+
+ scanner = new NotificationScanner(data, Arrays.asList(new Column("f9"),
new Column("f8")));
+ actual.clear();
+ scanner.forEach(actual::add);
+ Assert.assertEquals(expected, actual);
+
+ scanner = new NotificationScanner(data, Arrays.asList(new Column("f9",
"q1")));
+ actual.clear();
+ scanner.forEach(actual::add);
+ expected2.clear();
+ expected2.add(new RowColumnValue("r001", new Column("f9", "q1"), ""));
+ Assert.assertEquals(expected2, actual);
+
+ scanner =
+ new NotificationScanner(data, Arrays.asList(new Column("f9", "q1"),
new Column("f8")));
+ actual.clear();
+ scanner.forEach(actual::add);
+ expected2.clear();
+ expected2.add(new RowColumnValue("r001", new Column("f9", "q1"), ""));
+ expected2.add(new RowColumnValue("r001", new Column("f8", "q2"), ""));
+ expected2.add(new RowColumnValue("r002", new Column("f8", "q2"), ""));
+ expected2.add(new RowColumnValue("r002", new Column("f8", "q3"), ""));
+ Assert.assertEquals(expected2, actual);
+ }
+}
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services