This is an automated email from the ASF dual-hosted git repository. vitalii pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/drill.git
The following commit(s) were added to refs/heads/master by this push: new 360b080 DRILL-7828: Refactor Pcap and Pcapng format plugin (#2192) 360b080 is described below commit 360b080ff04703aadfb0085ca6464fb2e85dd6d5 Author: Vitalii Diravka <vita...@apache.org> AuthorDate: Mon Apr 26 11:19:43 2021 +0300 DRILL-7828: Refactor Pcap and Pcapng format plugin (#2192) --- .../drill/exec/store/pcap/PcapBatchReader.java | 78 +---------- .../drill/exec/store/pcap/PcapFormatUtils.java | 2 +- .../drill/exec/store/pcap/decoder/Murmur128.java | 0 .../drill/exec/store/pcap/decoder/Packet.java | 0 .../exec/store/pcap/decoder/PacketConstants.java | 0 .../exec/store/pcap/decoder/PacketDecoder.java | 27 +++- .../exec/store/pcap/decoder/TcpHandshake.java | 0 .../drill/exec/store/pcap/decoder/TcpSession.java | 0 .../drill/exec/store/pcap/dto/ColumnDto.java | 0 .../apache/drill/exec/store/pcap/package-info.java | 0 .../store/pcap/plugin/BasePcapFormatPlugin.java | 153 +++++++++++++++++++++ .../plugin/PcapFormatConfig.java} | 35 +++-- .../exec/store/pcap/plugin/PcapFormatPlugin.java | 22 ++- .../plugin}/PcapngFormatConfig.java | 57 ++------ .../exec/store/pcap/plugin/PcapngFormatPlugin.java | 25 ++-- .../drill/exec/store/pcap/schema/PcapTypes.java | 0 .../drill/exec/store/pcap/schema/Schema.java | 0 .../drill/exec/store/pcapng/PcapngBatchReader.java | 5 +- .../exec/store/pcapng/PcapngFormatPlugin.java | 90 ------------ .../main/resources/bootstrap-format-plugins.json | 15 +- .../store/pcap/TestPcapWithPersistentStore.java | 84 +++++++++++ .../apache/drill/exec/store/pcap/ConcatPcap.java | 0 .../drill/exec/store/pcap/TestPcapDecoder.java | 4 +- .../drill/exec/store/pcap/TestPcapEVFReader.java | 11 +- .../exec/store/pcap/TestPcapRecordReader.java | 41 ++++-- .../drill/exec/store/pcap/TestSessionizePCAP.java | 15 +- .../exec/store/pcapng/TestPcapngRecordReader.java | 48 ++++++- .../store/pcapng/TestPcapngStatRecordReader.java | 6 +- .../src/test/resources/config/oldPcapPlugins.json | 12 ++ .../src/test/resources}/pcap/arpWithNullIP.pcap | Bin .../src/test/resources}/pcap/data-1.pcap | Bin .../src/test/resources/pcap/data-2.pcap | Bin 0 -> 1475104 bytes .../src/test/resources}/pcap/dataFromRemote.txt | 0 .../src/test/resources}/pcap/http.pcap | Bin .../src/test/resources}/pcap/synscan.pcap | Bin .../src/test/resources}/pcap/tcp-1.pcap | Bin .../src/test/resources}/pcap/tcp-2.pcap | Bin .../src/test/resources}/pcap/testv1.pcap | Bin .../src/test/resources/pcapng/example.pcap | Bin 0 -> 512 bytes .../src/test/resources/todo/dhcp.pcapng | Bin 0 -> 1733 bytes .../src/test/resources/todo/dhcp_big_endian.pcapng | Bin 0 -> 1757 bytes .../test/resources/todo/dhcp_little_endian.pcapng | Bin 0 -> 1757 bytes .../src/test/resources/todo/many_interfaces.pcapng | Bin 0 -> 30743 bytes .../apache/drill/hbase/TestHBaseTableProvider.java | 6 +- .../impl/scan/columns/ColumnsScanFramework.java | 1 - .../physical/impl/scan/file/FileScanFramework.java | 18 ++- .../exec/store/dfs/easy/EasyFormatPlugin.java | 3 +- .../drill/exec/store/pcap/PcapFormatConfig.java | 83 ----------- .../drill/exec/store/pcap/PcapFormatPlugin.java | 88 ------------ .../main/resources/bootstrap-storage-plugins.json | 9 -- .../org/apache/drill/exec/TestWithZookeeper.java | 17 +++ .../drill/exec/store/FormatPluginSerDeTest.java | 10 -- .../drill/exec/store/sys/TestPStoreProviders.java | 15 -- .../resources/plugins/mock-plugin-upgrade.json | 6 - .../src/test/resources/store/pcap/data-2.pcap | Bin 1167896 -> 0 bytes 55 files changed, 481 insertions(+), 505 deletions(-) diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/PcapBatchReader.java b/contrib/format-pcapng/src/main/java/org/apache/drill/exec/store/pcap/PcapBatchReader.java similarity index 95% rename from exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/PcapBatchReader.java rename to contrib/format-pcapng/src/main/java/org/apache/drill/exec/store/pcap/PcapBatchReader.java index 83e2115..fd7fef9 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/PcapBatchReader.java +++ b/contrib/format-pcapng/src/main/java/org/apache/drill/exec/store/pcap/PcapBatchReader.java @@ -28,6 +28,7 @@ import org.apache.drill.exec.store.pcap.decoder.Packet; import org.apache.drill.exec.store.pcap.decoder.PacketDecoder; import org.apache.drill.exec.store.pcap.decoder.TcpSession; import org.apache.drill.exec.store.pcap.schema.Schema; +import org.apache.drill.exec.store.pcap.plugin.PcapFormatConfig; import org.apache.drill.exec.vector.accessor.ScalarWriter; import org.apache.hadoop.mapred.FileSplit; import org.slf4j.Logger; @@ -48,126 +49,61 @@ public class PcapBatchReader implements ManagedReader<FileSchemaNegotiator> { private static final Logger logger = LoggerFactory.getLogger(PcapBatchReader.class); private FileSplit split; - private PacketDecoder decoder; - private InputStream fsStream; - private RowSetLoader rowWriter; - private int validBytes; - private byte[] buffer; - private int offset; - private ScalarWriter typeWriter; - private ScalarWriter timestampWriter; - private ScalarWriter timestampMicroWriter; - private ScalarWriter networkWriter; - private ScalarWriter srcMacAddressWriter; - private ScalarWriter dstMacAddressWriter; - private ScalarWriter dstIPWriter; - private ScalarWriter srcIPWriter; - private ScalarWriter srcPortWriter; - private ScalarWriter dstPortWriter; - private ScalarWriter packetLengthWriter; - private ScalarWriter tcpSessionWriter; - private ScalarWriter tcpSequenceWriter; - private ScalarWriter tcpAckNumberWriter; - private ScalarWriter tcpFlagsWriter; - private ScalarWriter tcpParsedFlagsWriter; - private ScalarWriter tcpNsWriter; - private ScalarWriter tcpCwrWriter; - private ScalarWriter tcpEceWriter; - private ScalarWriter tcpFlagsEceEcnCapableWriter; - private ScalarWriter tcpFlagsCongestionWriter; - private ScalarWriter tcpUrgWriter; - private ScalarWriter tcpAckWriter; - private ScalarWriter tcpPshWriter; - private ScalarWriter tcpRstWriter; - private ScalarWriter tcpSynWriter; - private ScalarWriter tcpFinWriter; - private ScalarWriter dataWriter; - private ScalarWriter isCorruptWriter; - - private final PcapReaderConfig readerConfig; - - + private final PcapFormatConfig readerConfig; // Writers for TCP Sessions private ScalarWriter sessionStartTimeWriter; - private ScalarWriter sessionEndTimeWriter; - private ScalarWriter sessionDurationWriter; - private ScalarWriter connectionTimeWriter; - private ScalarWriter packetCountWriter; - private ScalarWriter originPacketCounterWriter; - private ScalarWriter remotePacketCounterWriter; - private ScalarWriter originDataVolumeWriter; - private ScalarWriter remoteDataVolumeWriter; - private ScalarWriter hostDataWriter; - private ScalarWriter remoteDataWriter; - private final int maxRecords; - private Map<Long, TcpSession> sessionQueue; - public static class PcapReaderConfig { - - protected final PcapFormatPlugin plugin; - - public boolean sessionizeTCPStreams; - - private final PcapFormatConfig config; - - public PcapReaderConfig(PcapFormatPlugin plugin) { - this.plugin = plugin; - this.config = plugin.getConfig(); - this.sessionizeTCPStreams = config.getSessionizeTCPStreams(); - } - } - - public PcapBatchReader(PcapReaderConfig readerConfig, int maxRecords) { + public PcapBatchReader(PcapFormatConfig readerConfig, int maxRecords) { this.readerConfig = readerConfig; - if (readerConfig.sessionizeTCPStreams) { + if (readerConfig.getSessionizeTCPStreams()) { sessionQueue = new HashMap<>(); } this.maxRecords = maxRecords; @@ -178,7 +114,7 @@ public class PcapBatchReader implements ManagedReader<FileSchemaNegotiator> { split = negotiator.split(); openFile(negotiator); SchemaBuilder builder = new SchemaBuilder(); - Schema pcapSchema = new Schema(readerConfig.sessionizeTCPStreams); + Schema pcapSchema = new Schema(readerConfig.getSessionizeTCPStreams()); TupleMetadata schema = pcapSchema.buildSchema(builder); negotiator.tableSchema(schema, false); ResultSetLoader loader = negotiator.build(); @@ -238,7 +174,7 @@ public class PcapBatchReader implements ManagedReader<FileSchemaNegotiator> { } private void populateColumnWriters(RowSetLoader rowWriter) { - if (readerConfig.sessionizeTCPStreams) { + if (readerConfig.getSessionizeTCPStreams()) { srcMacAddressWriter = rowWriter.scalar("src_mac_address"); dstMacAddressWriter = rowWriter.scalar("dst_mac_address"); dstIPWriter = rowWriter.scalar("dst_ip"); @@ -323,7 +259,7 @@ public class PcapBatchReader implements ManagedReader<FileSchemaNegotiator> { } // If we are resessionizing the TCP Stream, add the packet to the stream - if (readerConfig.sessionizeTCPStreams) { + if (readerConfig.getSessionizeTCPStreams()) { // If the session has not been seen before, add it to the queue long sessionID = packet.getSessionHash(); if (!sessionQueue.containsKey(sessionID)) { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/PcapFormatUtils.java b/contrib/format-pcapng/src/main/java/org/apache/drill/exec/store/pcap/PcapFormatUtils.java similarity index 97% rename from exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/PcapFormatUtils.java rename to contrib/format-pcapng/src/main/java/org/apache/drill/exec/store/pcap/PcapFormatUtils.java index eb9ea05..6c23159 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/PcapFormatUtils.java +++ b/contrib/format-pcapng/src/main/java/org/apache/drill/exec/store/pcap/PcapFormatUtils.java @@ -40,7 +40,7 @@ public class PcapFormatUtils { /** * - * @param byteOrder true for forward file order, false fore revers file order + * @param byteOrder true for forward file order, false for reverse file order * @param buf byte buffer * @param offset buffer offset * @return short value as int of specific bytes from buffer diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/decoder/Murmur128.java b/contrib/format-pcapng/src/main/java/org/apache/drill/exec/store/pcap/decoder/Murmur128.java similarity index 100% rename from exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/decoder/Murmur128.java rename to contrib/format-pcapng/src/main/java/org/apache/drill/exec/store/pcap/decoder/Murmur128.java diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/decoder/Packet.java b/contrib/format-pcapng/src/main/java/org/apache/drill/exec/store/pcap/decoder/Packet.java similarity index 100% rename from exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/decoder/Packet.java rename to contrib/format-pcapng/src/main/java/org/apache/drill/exec/store/pcap/decoder/Packet.java diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/decoder/PacketConstants.java b/contrib/format-pcapng/src/main/java/org/apache/drill/exec/store/pcap/decoder/PacketConstants.java similarity index 100% rename from exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/decoder/PacketConstants.java rename to contrib/format-pcapng/src/main/java/org/apache/drill/exec/store/pcap/decoder/PacketConstants.java diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/decoder/PacketDecoder.java b/contrib/format-pcapng/src/main/java/org/apache/drill/exec/store/pcap/decoder/PacketDecoder.java similarity index 87% rename from exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/decoder/PacketDecoder.java rename to contrib/format-pcapng/src/main/java/org/apache/drill/exec/store/pcap/decoder/PacketDecoder.java index 8e6d867..977e465 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/decoder/PacketDecoder.java +++ b/contrib/format-pcapng/src/main/java/org/apache/drill/exec/store/pcap/decoder/PacketDecoder.java @@ -59,12 +59,15 @@ public class PacketDecoder { private static final int GLOBAL_HEADER_SIZE = 24; private static final int PCAP_MAGIC_LITTLE_ENDIAN = 0xD4C3B2A1; private static final int PCAP_MAGIC_NUMBER = 0xA1B2C3D4; + private static final int PCAPNG_MAGIC_LITTLE_ENDIAN = 0x4D3C2B1A; + private static final int PCAPNG_MAGIC_NUMBER = 0x0A0D0D0A; private static final Logger logger = LoggerFactory.getLogger(PacketDecoder.class); private final int maxLength; private final int network; private boolean bigEndian; + private FileFormat fileFormat; private InputStream input; @@ -78,16 +81,28 @@ public class PacketDecoder { switch (getInt(globalHeader, 0)) { case PCAP_MAGIC_NUMBER: bigEndian = true; + fileFormat = FileFormat.PCAP; break; case PCAP_MAGIC_LITTLE_ENDIAN: bigEndian = false; + fileFormat = FileFormat.PCAP; + break; + case PCAPNG_MAGIC_NUMBER: + bigEndian = true; + fileFormat = FileFormat.PCAPNG; + break; + case PCAPNG_MAGIC_LITTLE_ENDIAN: + bigEndian = false; + fileFormat = FileFormat.PCAPNG; break; default: //noinspection ConstantConditions Preconditions.checkState(false, String.format("Bad magic number = %08x", getIntFileOrder(bigEndian, globalHeader, 0))); } - Preconditions.checkState(getShortFileOrder(bigEndian, globalHeader, 4) == 2, "Wanted major version == 2"); + if(fileFormat == FileFormat.PCAP) { + Preconditions.checkState(getShortFileOrder(bigEndian, globalHeader, 4) == 2, "Wanted major version == 2"); + } // todo: pcapng major version == 1 precondition maxLength = getIntFileOrder(bigEndian, globalHeader, 16); network = getIntFileOrder(bigEndian, globalHeader, 20); } @@ -116,6 +131,10 @@ public class PacketDecoder { return bigEndian; } + public FileFormat getFileFormat() { + return fileFormat; + } + public Packet nextPacket() throws IOException { Packet r = new Packet(); if (r.readPcap(input, bigEndian, maxLength)) { @@ -124,4 +143,10 @@ public class PacketDecoder { return null; } } + + public enum FileFormat { + PCAP, + PCAPNG, + UNKNOWN + } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/decoder/TcpHandshake.java b/contrib/format-pcapng/src/main/java/org/apache/drill/exec/store/pcap/decoder/TcpHandshake.java similarity index 100% rename from exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/decoder/TcpHandshake.java rename to contrib/format-pcapng/src/main/java/org/apache/drill/exec/store/pcap/decoder/TcpHandshake.java diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/decoder/TcpSession.java b/contrib/format-pcapng/src/main/java/org/apache/drill/exec/store/pcap/decoder/TcpSession.java similarity index 100% rename from exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/decoder/TcpSession.java rename to contrib/format-pcapng/src/main/java/org/apache/drill/exec/store/pcap/decoder/TcpSession.java diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/dto/ColumnDto.java b/contrib/format-pcapng/src/main/java/org/apache/drill/exec/store/pcap/dto/ColumnDto.java similarity index 100% rename from exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/dto/ColumnDto.java rename to contrib/format-pcapng/src/main/java/org/apache/drill/exec/store/pcap/dto/ColumnDto.java diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/package-info.java b/contrib/format-pcapng/src/main/java/org/apache/drill/exec/store/pcap/package-info.java similarity index 100% rename from exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/package-info.java rename to contrib/format-pcapng/src/main/java/org/apache/drill/exec/store/pcap/package-info.java diff --git a/contrib/format-pcapng/src/main/java/org/apache/drill/exec/store/pcap/plugin/BasePcapFormatPlugin.java b/contrib/format-pcapng/src/main/java/org/apache/drill/exec/store/pcap/plugin/BasePcapFormatPlugin.java new file mode 100644 index 0000000..4052245 --- /dev/null +++ b/contrib/format-pcapng/src/main/java/org/apache/drill/exec/store/pcap/plugin/BasePcapFormatPlugin.java @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.pcap.plugin; + +import org.apache.drill.common.exceptions.UserException; +import org.apache.drill.common.logical.StoragePluginConfig; +import org.apache.drill.common.types.TypeProtos.MinorType; +import org.apache.drill.common.types.Types; +import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileReaderFactory; +import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileScanBuilder; +import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileSchemaNegotiator; +import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader; +import org.apache.drill.exec.physical.impl.scan.framework.ManagedScanFramework; +import org.apache.drill.exec.server.DrillbitContext; +import org.apache.drill.exec.server.options.OptionManager; +import org.apache.drill.exec.store.dfs.DrillFileSystem; +import org.apache.drill.exec.store.dfs.easy.EasyFormatPlugin; +import org.apache.drill.exec.store.dfs.easy.EasySubScan; +import org.apache.drill.exec.store.pcap.PcapBatchReader; +import org.apache.drill.exec.store.pcap.decoder.PacketDecoder; +import org.apache.drill.exec.store.pcapng.PcapngBatchReader; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.InputStream; + +public abstract class BasePcapFormatPlugin<T extends PcapFormatConfig> extends EasyFormatPlugin<T> { + + static final Logger logger = LoggerFactory.getLogger(ManagedScanFramework.class); + private static PacketDecoder.FileFormat fileFormat = PacketDecoder.FileFormat.UNKNOWN; + + public BasePcapFormatPlugin(String name, + DrillbitContext context, + Configuration fsConf, + StoragePluginConfig storageConfig, + T formatConfig) { + super(name, easyConfig(fsConf, formatConfig), context, storageConfig, formatConfig); + } + + private static EasyFormatConfig easyConfig(Configuration fsConf, PcapFormatConfig pluginConfig) { + return EasyFormatConfig.builder() + .readable(true) + .writable(false) + .blockSplittable(false) + .compressible(true) + .extensions(pluginConfig.getExtensions()) + .fsConf(fsConf) + .useEnhancedScan(true) + .supportsLimitPushdown(true) + .supportsProjectPushdown(true) + .defaultName(PcapFormatConfig.NAME) + .build(); + } + + private static class PcapReaderFactory extends FileReaderFactory { + + private final PcapFormatConfig config; + private final EasySubScan scan; + + public PcapReaderFactory(PcapFormatConfig config, EasySubScan scan) { + this.config = config; + this.scan = scan; + } + + /** + * Reader creator. If file format can't be detected try to use default PCAP format plugin + * + * @return PCAP or PCAPNG batch reader + */ + @Override + public ManagedReader<? extends FileSchemaNegotiator> newReader() { + if (fileFramework().isPresent()) { // todo: can be simplified with java9 ifPresentOrElse + Path path = scan.getWorkUnits().stream() + .findFirst() + .orElseThrow(() -> UserException. + dataReadError() + .addContext("There are no files for scanning") + .build(logger)) + .getPath(); + fileFormat = getFileFormat(fileFramework().get().fileSystem(), path); + if (config.getExtensions().stream() + .noneMatch(f -> f.equals(fileFormat.name().toLowerCase()))) { + logger.error("File format {} is not within plugin extensions: {}. Trying to use default PCAP format plugin to " + + "read the file", fileFormat, config.getExtensions()); + } + } else { + logger.error("It is not possible to detect file format, because the File Framework is not initialized. " + + "Trying to use default PCAP format plugin to read the file"); + } + return createReader(scan, config); + } + } + + @Override + public ManagedReader<? extends FileSchemaNegotiator> newBatchReader(EasySubScan scan, OptionManager options) { + return createReader(scan, formatConfig); + } + + private static ManagedReader<? extends FileSchemaNegotiator> createReader(EasySubScan scan, PcapFormatConfig config) { + switch(fileFormat) { + case PCAPNG: return new PcapngBatchReader(config, scan); + case PCAP: + case UNKNOWN: + default: return new PcapBatchReader(config, scan.getMaxRecords()); + } + } + + @Override + protected FileScanBuilder frameworkBuilder(OptionManager options, EasySubScan scan) { + FileScanBuilder builder = new FileScanBuilder(); + builder.setReaderFactory(new PcapReaderFactory(formatConfig, scan)); + + initScanBuilder(builder, scan); + builder.nullType(Types.optional(MinorType.VARCHAR)); + return builder; + } + + /** + * Helper method to detect PCAP or PCAPNG file format based on file Magic Number + * + * @param dfs for obtaining InputStream + * @return PCAP/PCAPNG file format + */ + private static PacketDecoder.FileFormat getFileFormat(DrillFileSystem dfs, Path path) { + try (InputStream inputStream = dfs.openPossiblyCompressedStream(path)) { + PacketDecoder decoder = new PacketDecoder(inputStream); + return decoder.getFileFormat(); + } catch (IOException io) { + throw UserException + .dataReadError(io) + .addContext("File name:", path.toString()) + .build(logger); + } + } +} diff --git a/contrib/format-pcapng/src/main/java/org/apache/drill/exec/store/pcapng/PcapngFormatConfig.java b/contrib/format-pcapng/src/main/java/org/apache/drill/exec/store/pcap/plugin/PcapFormatConfig.java similarity index 59% copy from contrib/format-pcapng/src/main/java/org/apache/drill/exec/store/pcapng/PcapngFormatConfig.java copy to contrib/format-pcapng/src/main/java/org/apache/drill/exec/store/pcap/plugin/PcapFormatConfig.java index 7210f93..b5f8d53 100644 --- a/contrib/format-pcapng/src/main/java/org/apache/drill/exec/store/pcapng/PcapngFormatConfig.java +++ b/contrib/format-pcapng/src/main/java/org/apache/drill/exec/store/pcap/plugin/PcapFormatConfig.java @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.drill.exec.store.pcapng; +package org.apache.drill.exec.store.pcap.plugin; import java.util.List; import java.util.Objects; @@ -29,18 +29,23 @@ import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonTypeName; -@JsonTypeName(PcapngFormatConfig.NAME) +@JsonTypeName(PcapFormatConfig.NAME) @JsonInclude(JsonInclude.Include.NON_DEFAULT) -public class PcapngFormatConfig implements FormatPluginConfig { +public class PcapFormatConfig implements FormatPluginConfig { + private static final List<String> DEFAULT_EXTNS = ImmutableList.of("pcap"); - public static final String NAME = "pcapng"; + public static final String NAME = "pcap"; private final List<String> extensions; private final boolean stat; + private final boolean sessionizeTCPStreams; @JsonCreator - public PcapngFormatConfig(@JsonProperty("extensions") List<String> extensions, @JsonProperty("stat") boolean stat) { - this.extensions = extensions == null ? ImmutableList.of(PcapngFormatConfig.NAME) : ImmutableList.copyOf(extensions); + public PcapFormatConfig(@JsonProperty("extensions") List<String> extensions, + @JsonProperty("stat") boolean stat, + @JsonProperty("sessionizeTCPStreams") Boolean sessionizeTCPStreams) { + this.extensions = extensions == null ? DEFAULT_EXTNS : ImmutableList.copyOf(extensions); this.stat = stat; + this.sessionizeTCPStreams = sessionizeTCPStreams != null && sessionizeTCPStreams; } @JsonProperty("extensions") @@ -53,6 +58,11 @@ public class PcapngFormatConfig implements FormatPluginConfig { return this.stat; } + @JsonProperty("sessionizeTCPStreams") + public boolean getSessionizeTCPStreams() { + return sessionizeTCPStreams; + } + @Override public boolean equals(Object o) { if (this == o) { @@ -61,17 +71,22 @@ public class PcapngFormatConfig implements FormatPluginConfig { if (o == null || getClass() != o.getClass()) { return false; } - PcapngFormatConfig that = (PcapngFormatConfig) o; - return Objects.equals(extensions, that.extensions) && Objects.equals(stat, that.getStat()); + PcapFormatConfig that = (PcapFormatConfig) o; + return Objects.equals(extensions, that.extensions) && Objects.equals(stat, that.getStat()) && + Objects.equals(sessionizeTCPStreams, that.sessionizeTCPStreams); } @Override public int hashCode() { - return Objects.hash(extensions, stat); + return Objects.hash(extensions, stat, sessionizeTCPStreams); } @Override public String toString() { - return new PlanStringBuilder(this).field("extensions", extensions).field("stat", stat).toString(); + return new PlanStringBuilder(this) + .field("extensions", extensions) + .field("stat", stat) + .field("sessionizeTCPStreams", sessionizeTCPStreams) + .toString(); } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/LocalPStoreProvider.java b/contrib/format-pcapng/src/main/java/org/apache/drill/exec/store/pcap/plugin/PcapFormatPlugin.java similarity index 55% rename from exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/LocalPStoreProvider.java rename to contrib/format-pcapng/src/main/java/org/apache/drill/exec/store/pcap/plugin/PcapFormatPlugin.java index a078a3e..34ba5c2 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/LocalPStoreProvider.java +++ b/contrib/format-pcapng/src/main/java/org/apache/drill/exec/store/pcap/plugin/PcapFormatPlugin.java @@ -15,20 +15,16 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.drill.exec.store.sys.local; +package org.apache.drill.exec.store.pcap.plugin; -import org.apache.drill.exec.exception.StoreException; -import org.apache.drill.exec.store.sys.PersistentStoreRegistry; -import org.apache.drill.exec.store.sys.store.provider.LocalPersistentStoreProvider; +import org.apache.drill.common.logical.StoragePluginConfig; +import org.apache.drill.exec.server.DrillbitContext; +import org.apache.hadoop.conf.Configuration; -/** - * Kept for possible references to old class name in configuration. - * - * @deprecated will be removed in 1.7 - * use {@link org.apache.drill.exec.store.sys.store.provider.LocalPersistentStoreProvider} instead. - */ -public class LocalPStoreProvider extends LocalPersistentStoreProvider { - public LocalPStoreProvider(PersistentStoreRegistry registry) throws StoreException { - super(registry); +public class PcapFormatPlugin extends BasePcapFormatPlugin<PcapFormatConfig> { + + public PcapFormatPlugin(String name, DrillbitContext context, Configuration fsConf, + StoragePluginConfig storageConfig, PcapFormatConfig formatConfig) { + super(name, context, fsConf, storageConfig, formatConfig); } } diff --git a/contrib/format-pcapng/src/main/java/org/apache/drill/exec/store/pcapng/PcapngFormatConfig.java b/contrib/format-pcapng/src/main/java/org/apache/drill/exec/store/pcap/plugin/PcapngFormatConfig.java similarity index 51% rename from contrib/format-pcapng/src/main/java/org/apache/drill/exec/store/pcapng/PcapngFormatConfig.java rename to contrib/format-pcapng/src/main/java/org/apache/drill/exec/store/pcap/plugin/PcapngFormatConfig.java index 7210f93..082840a 100644 --- a/contrib/format-pcapng/src/main/java/org/apache/drill/exec/store/pcapng/PcapngFormatConfig.java +++ b/contrib/format-pcapng/src/main/java/org/apache/drill/exec/store/pcap/plugin/PcapngFormatConfig.java @@ -15,63 +15,26 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.drill.exec.store.pcapng; - -import java.util.List; -import java.util.Objects; - -import org.apache.drill.common.PlanStringBuilder; -import org.apache.drill.common.logical.FormatPluginConfig; -import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList; +package org.apache.drill.exec.store.pcap.plugin; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonTypeName; +import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList; + +import java.util.List; @JsonTypeName(PcapngFormatConfig.NAME) @JsonInclude(JsonInclude.Include.NON_DEFAULT) -public class PcapngFormatConfig implements FormatPluginConfig { - +@Deprecated // for backward compatibility +public class PcapngFormatConfig extends PcapFormatConfig { + private static final List<String> DEFAULT_EXTNS = ImmutableList.of("pcapng"); public static final String NAME = "pcapng"; - private final List<String> extensions; - private final boolean stat; @JsonCreator - public PcapngFormatConfig(@JsonProperty("extensions") List<String> extensions, @JsonProperty("stat") boolean stat) { - this.extensions = extensions == null ? ImmutableList.of(PcapngFormatConfig.NAME) : ImmutableList.copyOf(extensions); - this.stat = stat; - } - - @JsonProperty("extensions") - public List<String> getExtensions() { - return extensions; - } - - @JsonProperty("stat") - public boolean getStat() { - return this.stat; - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - PcapngFormatConfig that = (PcapngFormatConfig) o; - return Objects.equals(extensions, that.extensions) && Objects.equals(stat, that.getStat()); - } - - @Override - public int hashCode() { - return Objects.hash(extensions, stat); - } - - @Override - public String toString() { - return new PlanStringBuilder(this).field("extensions", extensions).field("stat", stat).toString(); + public PcapngFormatConfig(@JsonProperty("extensions") List<String> extensions, + @JsonProperty("stat") boolean stat) { + super(extensions == null ? DEFAULT_EXTNS : ImmutableList.copyOf(extensions), stat, null); } } diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/TestWithZookeeper.java b/contrib/format-pcapng/src/main/java/org/apache/drill/exec/store/pcap/plugin/PcapngFormatPlugin.java similarity index 58% copy from exec/java-exec/src/test/java/org/apache/drill/exec/TestWithZookeeper.java copy to contrib/format-pcapng/src/main/java/org/apache/drill/exec/store/pcap/plugin/PcapngFormatPlugin.java index a55c7c3..a57e429 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/TestWithZookeeper.java +++ b/contrib/format-pcapng/src/main/java/org/apache/drill/exec/store/pcap/plugin/PcapngFormatPlugin.java @@ -15,24 +15,17 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.drill.exec; +package org.apache.drill.exec.store.pcap.plugin; -import org.junit.After; -import org.junit.Before; +import org.apache.drill.common.logical.StoragePluginConfig; +import org.apache.drill.exec.server.DrillbitContext; +import org.apache.hadoop.conf.Configuration; -public class TestWithZookeeper extends ExecTest { -// private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestWithZookeeper.class); +@Deprecated // for backward compatibility +public class PcapngFormatPlugin extends BasePcapFormatPlugin<PcapngFormatConfig> { - protected ZookeeperHelper zkHelper; - - @Before - public void setUp() throws Exception { - zkHelper = new ZookeeperHelper(); - zkHelper.startZookeeper(1); - } - - @After - public void tearDown() throws Exception { - zkHelper.stopZookeeper(); + public PcapngFormatPlugin(String name, DrillbitContext context, Configuration fsConf, + StoragePluginConfig storageConfig, PcapngFormatConfig formatConfig) { + super(name, context, fsConf, storageConfig, formatConfig); } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/schema/PcapTypes.java b/contrib/format-pcapng/src/main/java/org/apache/drill/exec/store/pcap/schema/PcapTypes.java similarity index 100% rename from exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/schema/PcapTypes.java rename to contrib/format-pcapng/src/main/java/org/apache/drill/exec/store/pcap/schema/PcapTypes.java diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/schema/Schema.java b/contrib/format-pcapng/src/main/java/org/apache/drill/exec/store/pcap/schema/Schema.java similarity index 100% rename from exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/schema/Schema.java rename to contrib/format-pcapng/src/main/java/org/apache/drill/exec/store/pcap/schema/Schema.java diff --git a/contrib/format-pcapng/src/main/java/org/apache/drill/exec/store/pcapng/PcapngBatchReader.java b/contrib/format-pcapng/src/main/java/org/apache/drill/exec/store/pcapng/PcapngBatchReader.java index eeabebf..e8367c5 100644 --- a/contrib/format-pcapng/src/main/java/org/apache/drill/exec/store/pcapng/PcapngBatchReader.java +++ b/contrib/format-pcapng/src/main/java/org/apache/drill/exec/store/pcapng/PcapngBatchReader.java @@ -40,6 +40,7 @@ import org.apache.drill.exec.record.metadata.SchemaBuilder; import org.apache.drill.exec.record.metadata.TupleMetadata; import org.apache.drill.exec.store.dfs.DrillFileSystem; import org.apache.drill.exec.store.dfs.easy.EasySubScan; +import org.apache.drill.exec.store.pcap.plugin.PcapFormatConfig; import org.apache.drill.exec.util.Utilities; import org.apache.drill.exec.vector.accessor.ScalarWriter; import org.apache.hadoop.fs.Path; @@ -54,7 +55,7 @@ public class PcapngBatchReader implements ManagedReader<FileSchemaNegotiator> { private static final Logger logger = LoggerFactory.getLogger(PcapngBatchReader.class); - private final PcapngFormatConfig config; + private final PcapFormatConfig config; private final EasySubScan scan; private final int maxRecords; private CustomErrorContext errorContext; @@ -66,7 +67,7 @@ public class PcapngBatchReader implements ManagedReader<FileSchemaNegotiator> { private InputStream in; private Path path; - public PcapngBatchReader(final PcapngFormatConfig config, final EasySubScan scan) { + public PcapngBatchReader(final PcapFormatConfig config, final EasySubScan scan) { this.config = config; this.scan = scan; this.maxRecords = scan.getMaxRecords(); diff --git a/contrib/format-pcapng/src/main/java/org/apache/drill/exec/store/pcapng/PcapngFormatPlugin.java b/contrib/format-pcapng/src/main/java/org/apache/drill/exec/store/pcapng/PcapngFormatPlugin.java deleted file mode 100644 index 0cccd6b..0000000 --- a/contrib/format-pcapng/src/main/java/org/apache/drill/exec/store/pcapng/PcapngFormatPlugin.java +++ /dev/null @@ -1,90 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.drill.exec.store.pcapng; - -import org.apache.drill.common.exceptions.ExecutionSetupException; -import org.apache.drill.common.logical.StoragePluginConfig; -import org.apache.drill.common.types.TypeProtos.MinorType; -import org.apache.drill.common.types.Types; -import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileReaderFactory; -import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileScanBuilder; -import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileSchemaNegotiator; -import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader; -import org.apache.drill.exec.server.DrillbitContext; -import org.apache.drill.exec.server.options.OptionManager; -import org.apache.drill.exec.store.dfs.easy.EasyFormatPlugin; -import org.apache.drill.exec.store.dfs.easy.EasySubScan; -import org.apache.hadoop.conf.Configuration; - -public class PcapngFormatPlugin extends EasyFormatPlugin<PcapngFormatConfig> { - - public PcapngFormatPlugin(String name, - DrillbitContext context, - Configuration fsConf, - StoragePluginConfig storageConfig, - PcapngFormatConfig formatConfig) { - super(name, easyConfig(fsConf, formatConfig), context, storageConfig, formatConfig); - } - - private static EasyFormatConfig easyConfig(Configuration fsConf, PcapngFormatConfig pluginConfig) { - return EasyFormatConfig.builder() - .readable(true) - .writable(false) - .blockSplittable(false) - .compressible(true) - .extensions(pluginConfig.getExtensions()) - .fsConf(fsConf) - .useEnhancedScan(true) - .supportsLimitPushdown(true) - .supportsProjectPushdown(true) - .defaultName(PcapngFormatConfig.NAME) - .build(); - } - - private static class PcapngReaderFactory extends FileReaderFactory { - - private final PcapngFormatConfig config; - private final EasySubScan scan; - - public PcapngReaderFactory(PcapngFormatConfig config, EasySubScan scan) { - this.config = config; - this.scan = scan; - } - - @Override - public ManagedReader<? extends FileSchemaNegotiator> newReader() { - return new PcapngBatchReader(config, scan); - } - } - - @Override - public ManagedReader<? extends FileSchemaNegotiator> newBatchReader(EasySubScan scan, OptionManager options) - throws ExecutionSetupException { - return new PcapngBatchReader(formatConfig, scan); - } - - @Override - protected FileScanBuilder frameworkBuilder(OptionManager options, EasySubScan scan) throws ExecutionSetupException { - FileScanBuilder builder = new FileScanBuilder(); - builder.setReaderFactory(new PcapngReaderFactory(formatConfig, scan)); - - initScanBuilder(builder, scan); - builder.nullType(Types.optional(MinorType.VARCHAR)); - return builder; - } -} diff --git a/contrib/format-pcapng/src/main/resources/bootstrap-format-plugins.json b/contrib/format-pcapng/src/main/resources/bootstrap-format-plugins.json index 60b04d1..550e6e5 100644 --- a/contrib/format-pcapng/src/main/resources/bootstrap-format-plugins.json +++ b/contrib/format-pcapng/src/main/resources/bootstrap-format-plugins.json @@ -3,8 +3,9 @@ "dfs": { "type": "file", "formats": { - "pcapng": { - "type": "pcapng", + "pcap": { + "type": "pcap", + "extensions": ["pcap", "pcapng"], "stat" : false } } @@ -12,8 +13,9 @@ "cp": { "type": "file", "formats": { - "pcapng": { - "type": "pcapng", + "pcap": { + "type": "pcap", + "extensions": ["pcap", "pcapng"], "stat" : false } } @@ -21,8 +23,9 @@ "s3": { "type": "file", "formats": { - "pcapng": { - "type": "pcapng", + "pcap": { + "type": "pcap", + "extensions": ["pcap", "pcapng"], "stat" : false } } diff --git a/contrib/format-pcapng/src/test/java/org/apache/drill/exec/persistent/store/pcap/TestPcapWithPersistentStore.java b/contrib/format-pcapng/src/test/java/org/apache/drill/exec/persistent/store/pcap/TestPcapWithPersistentStore.java new file mode 100644 index 0000000..1328455 --- /dev/null +++ b/contrib/format-pcapng/src/test/java/org/apache/drill/exec/persistent/store/pcap/TestPcapWithPersistentStore.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.persistent.store.pcap; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList; +import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableSet; +import org.apache.curator.framework.CuratorFramework; +import org.apache.drill.common.logical.FormatPluginConfig; +import org.apache.drill.common.util.DrillFileUtils; +import org.apache.drill.exec.TestWithZookeeper; +import org.apache.drill.exec.coord.zk.PathUtils; +import org.apache.drill.exec.coord.zk.ZookeeperClient; +import org.apache.drill.exec.store.dfs.FileSystemConfig; +import org.apache.drill.exec.store.pcap.plugin.PcapFormatConfig; +import org.apache.drill.exec.store.pcap.plugin.PcapngFormatConfig; +import org.apache.drill.exec.store.sys.PersistentStore; +import org.apache.drill.exec.store.sys.PersistentStoreConfig; +import org.apache.drill.exec.store.sys.store.ZookeeperPersistentStore; +import org.apache.drill.exec.store.sys.store.provider.ZookeeperPersistentStoreProvider; +import org.apache.zookeeper.CreateMode; +import org.junit.Assert; +import org.junit.Test; + +import java.util.Map; + +import static org.junit.Assert.assertTrue; + +public class TestPcapWithPersistentStore extends TestWithZookeeper { + /** + * DRILL-7828 + * Note: If this test breaks you are probably breaking backward and forward compatibility. Verify with the community + * that breaking compatibility is acceptable and planned for. + */ + @Test + public void pcapPluginBackwardCompatabilityTest() throws Exception { + final String oldPlugin = "oldFormatPlugin"; + + try (CuratorFramework curator = createCurator()) { + curator.start(); + ObjectMapper objectMapper = new ObjectMapper(); + objectMapper.registerSubtypes(PcapFormatConfig.class, PcapngFormatConfig.class); + PersistentStoreConfig<FileSystemConfig> storeConfig = + PersistentStoreConfig.newJacksonBuilder(objectMapper, FileSystemConfig.class).name("type").build(); + + + try (ZookeeperClient zkClient = new ZookeeperClient(curator, + PathUtils.join("/", storeConfig.getName()), CreateMode.PERSISTENT)) { + zkClient.start(); + String oldFormatPlugin = DrillFileUtils.getResourceAsString("/config/oldPcapPlugins.json"); + zkClient.put(oldPlugin, oldFormatPlugin.getBytes(), null); + } + + try (ZookeeperPersistentStoreProvider provider = + new ZookeeperPersistentStoreProvider(zkHelper.getConfig(), curator)) { + PersistentStore<FileSystemConfig> store = provider.getOrCreateStore(storeConfig); + assertTrue(store instanceof ZookeeperPersistentStore); + + FileSystemConfig oldPluginConfig = ((ZookeeperPersistentStore<FileSystemConfig>)store).get(oldPlugin, null); + Map<String, FormatPluginConfig> formats = oldPluginConfig.getFormats(); + Assert.assertEquals(formats.keySet(), ImmutableSet.of("pcap", "pcapng")); + PcapFormatConfig pcap = (PcapFormatConfig) formats.get("pcap"); + PcapngFormatConfig pcapng = (PcapngFormatConfig) formats.get("pcapng"); + Assert.assertEquals(pcap.getExtensions(), ImmutableList.of("pcap")); + assertTrue(pcapng.getStat()); + } + } + } +} diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/pcap/ConcatPcap.java b/contrib/format-pcapng/src/test/java/org/apache/drill/exec/store/pcap/ConcatPcap.java similarity index 100% rename from exec/java-exec/src/test/java/org/apache/drill/exec/store/pcap/ConcatPcap.java rename to contrib/format-pcapng/src/test/java/org/apache/drill/exec/store/pcap/ConcatPcap.java diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/pcap/TestPcapDecoder.java b/contrib/format-pcapng/src/test/java/org/apache/drill/exec/store/pcap/TestPcapDecoder.java similarity index 98% rename from exec/java-exec/src/test/java/org/apache/drill/exec/store/pcap/TestPcapDecoder.java rename to contrib/format-pcapng/src/test/java/org/apache/drill/exec/store/pcap/TestPcapDecoder.java index 1a29902..d171c31 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/pcap/TestPcapDecoder.java +++ b/contrib/format-pcapng/src/test/java/org/apache/drill/exec/store/pcap/TestPcapDecoder.java @@ -57,7 +57,7 @@ public class TestPcapDecoder extends BaseTestQuery { @Test public void testBasics() throws IOException { - InputStream in = Resources.getResource("store/pcap/tcp-2.pcap").openStream(); + InputStream in = Resources.getResource("pcap/tcp-2.pcap").openStream(); PacketDecoder pd = new PacketDecoder(in); Packet p = pd.packet(); int offset = 0; @@ -226,7 +226,7 @@ public class TestPcapDecoder extends BaseTestQuery { // might be faster to keep this open and rewind each time, but // that is hard to do with a resource, especially if it comes // from the class path instead of files. - try (InputStream in = Resources.getResource("store/pcap/tcp-2.pcap").openStream()) { + try (InputStream in = Resources.getResource("pcap/tcp-2.pcap").openStream()) { ConcatPcap.copy(first, in, out); } first = false; diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/pcap/TestPcapEVFReader.java b/contrib/format-pcapng/src/test/java/org/apache/drill/exec/store/pcap/TestPcapEVFReader.java similarity index 92% rename from exec/java-exec/src/test/java/org/apache/drill/exec/store/pcap/TestPcapEVFReader.java rename to contrib/format-pcapng/src/test/java/org/apache/drill/exec/store/pcap/TestPcapEVFReader.java index 5796fa8..9a27276 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/pcap/TestPcapEVFReader.java +++ b/contrib/format-pcapng/src/test/java/org/apache/drill/exec/store/pcap/TestPcapEVFReader.java @@ -19,6 +19,7 @@ package org.apache.drill.exec.store.pcap; import org.apache.drill.categories.RowSetTests; +import org.apache.drill.exec.store.pcap.plugin.PcapFormatConfig; import org.apache.drill.test.ClusterFixture; import org.apache.drill.test.ClusterTest; import org.junit.BeforeClass; @@ -34,12 +35,12 @@ public class TestPcapEVFReader extends ClusterTest { @BeforeClass public static void setup() throws Exception { ClusterTest.startCluster(ClusterFixture.builder(dirTestWatcher)); - cluster.defineFormat("cp", "sample", new PcapFormatConfig(null, null)); + cluster.defineFormat("cp", "sample", new PcapFormatConfig(null, true, false)); } @Test public void testStarQuery() throws Exception { - String sql = "SELECT * FROM cp.`store/pcap/synscan.pcap` LIMIT 1"; + String sql = "SELECT * FROM cp.`pcap/synscan.pcap` LIMIT 1"; testBuilder() .sqlQuery(sql) @@ -58,7 +59,7 @@ public class TestPcapEVFReader extends ClusterTest { "packet_length, tcp_session, " + "tcp_sequence, tcp_ack, tcp_flags," + " tcp_parsed_flags, tcp_flags_ns, tcp_flags_cwr, tcp_flags_ece, tcp_flags_ece_ecn_capable, tcp_flags_ece_congestion_experienced, tcp_flags_urg, tcp_flags_ack, tcp_flags_psh, tcp_flags_rst, tcp_flags_syn," + - " tcp_flags_fin, data, is_corrupt FROM cp.`store/pcap/synscan.pcap` LIMIT 1"; + " tcp_flags_fin, data, is_corrupt FROM cp.`pcap/synscan.pcap` LIMIT 1"; testBuilder() .sqlQuery(sql) @@ -72,7 +73,7 @@ public class TestPcapEVFReader extends ClusterTest { @Test public void testAggregateQuery() throws Exception { - String sql = "SELECT is_corrupt, COUNT(*) as packet_count FROM cp.`store/pcap/testv1.pcap` GROUP BY is_corrupt ORDER BY packet_count DESC"; + String sql = "SELECT is_corrupt, COUNT(*) as packet_count FROM cp.`pcap/testv1.pcap` GROUP BY is_corrupt ORDER BY packet_count DESC"; testBuilder() .sqlQuery(sql) @@ -85,7 +86,7 @@ public class TestPcapEVFReader extends ClusterTest { @Test public void testArpPcapFile() throws Exception { - String sql = "SELECT src_ip, dst_ip FROM cp.`store/pcap/arpWithNullIP.pcap` WHERE src_port=1"; + String sql = "SELECT src_ip, dst_ip FROM cp.`pcap/arpWithNullIP.pcap` WHERE src_port=1"; testBuilder() .sqlQuery(sql) .ordered() diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/pcap/TestPcapRecordReader.java b/contrib/format-pcapng/src/test/java/org/apache/drill/exec/store/pcap/TestPcapRecordReader.java similarity index 69% rename from exec/java-exec/src/test/java/org/apache/drill/exec/store/pcap/TestPcapRecordReader.java rename to contrib/format-pcapng/src/test/java/org/apache/drill/exec/store/pcap/TestPcapRecordReader.java index e1a71b6..43fc3ff3 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/pcap/TestPcapRecordReader.java +++ b/contrib/format-pcapng/src/test/java/org/apache/drill/exec/store/pcap/TestPcapRecordReader.java @@ -17,6 +17,7 @@ */ package org.apache.drill.exec.store.pcap; +import org.apache.drill.PlanTestBase; import org.apache.drill.exec.store.pcap.decoder.Packet; import org.apache.drill.test.BaseTestQuery; import org.apache.drill.exec.rpc.user.QueryDataBatch; @@ -31,42 +32,42 @@ import static org.junit.Assert.assertEquals; public class TestPcapRecordReader extends BaseTestQuery { @BeforeClass public static void setupTestFiles() { - dirTestWatcher.copyResourceToRoot(Paths.get("store", "pcap")); + dirTestWatcher.copyResourceToRoot(Paths.get("pcap")); } @Test public void testStarQuery() throws Exception { - runSQLVerifyCount("select * from dfs.`store/pcap/tcp-1.pcap`", 16); - runSQLVerifyCount("select distinct DST_IP from dfs.`store/pcap/tcp-1.pcap`", 1); - runSQLVerifyCount("select distinct DsT_IP from dfs.`store/pcap/tcp-1.pcap`", 1); - runSQLVerifyCount("select distinct dst_ip from dfs.`store/pcap/tcp-1.pcap`", 1); + runSQLVerifyCount("select * from dfs.`pcap/tcp-1.pcap`", 16); + runSQLVerifyCount("select distinct DST_IP from dfs.`pcap/tcp-1.pcap`", 1); + runSQLVerifyCount("select distinct DsT_IP from dfs.`pcap/tcp-1.pcap`", 1); + runSQLVerifyCount("select distinct dst_ip from dfs.`pcap/tcp-1.pcap`", 1); } @Test public void testCorruptPCAPQuery() throws Exception { - runSQLVerifyCount("select * from dfs.`store/pcap/testv1.pcap`", 7000); + runSQLVerifyCount("select * from dfs.`pcap/testv1.pcap`", 7000); } @Test public void testTrueCorruptPCAPQuery() throws Exception { - runSQLVerifyCount("select * from dfs.`store/pcap/testv1.pcap` WHERE is_corrupt=true", 16); + runSQLVerifyCount("select * from dfs.`pcap/testv1.pcap` WHERE is_corrupt=true", 16); } @Test public void testNotCorruptPCAPQuery() throws Exception { - runSQLVerifyCount("select * from dfs.`store/pcap/testv1.pcap` WHERE is_corrupt=false", 6984); + runSQLVerifyCount("select * from dfs.`pcap/testv1.pcap` WHERE is_corrupt=false", 6984); } @Test public void testCountQuery() throws Exception { - runSQLVerifyCount("select count(*) from dfs.`store/pcap/tcp-1.pcap`", 1); - runSQLVerifyCount("select count(*) from dfs.`store/pcap/tcp-2.pcap`", 1); + runSQLVerifyCount("select count(*) from dfs.`pcap/tcp-1.pcap`", 1); + runSQLVerifyCount("select count(*) from dfs.`pcap/tcp-2.pcap`", 1); } @Test public void testDistinctQuery() throws Exception { // omit data field from distinct count for now - runSQLVerifyCount("select distinct type, network, `timestamp`, src_ip, dst_ip, src_port, dst_port, src_mac_address, dst_mac_address, tcp_session, packet_length from dfs.`store/pcap/tcp-1.pcap`", 1); + runSQLVerifyCount("select distinct type, network, `timestamp`, src_ip, dst_ip, src_port, dst_port, src_mac_address, dst_mac_address, tcp_session, packet_length from dfs.`pcap/tcp-1.pcap`", 1); } @Test @@ -86,7 +87,17 @@ public class TestPcapRecordReader extends BaseTestQuery { @Test public void checkFlags() throws Exception { - runSQLVerifyCount("select tcp_session, tcp_ack, tcp_flags from dfs.`store/pcap/synscan.pcap`", 2011); + runSQLVerifyCount("select tcp_session, tcp_ack, tcp_flags from dfs.`pcap/synscan.pcap`", 2011); + } + + @Test + public void testSerDe() throws Exception { + String path = "pcap/tcp-1.pcap"; + dirTestWatcher.copyResourceToRoot(Paths.get(path)); + testPhysicalPlanSubmission( + String.format("select * from dfs.`%s`", path), + String.format("select * from table(dfs.`%s`(type=>'pcap'))", path) + ); } private void runSQLVerifyCount(String sql, int expectedRowCount) throws Exception { @@ -106,4 +117,10 @@ public class TestPcapRecordReader extends BaseTestQuery { } assertEquals(expectedRowCount, count); } + + private void testPhysicalPlanSubmission(String...queries) throws Exception { + for (String query : queries) { + PlanTestBase.testPhysicalPlanExecutionBasedOnQuery(query); + } + } } diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/pcap/TestSessionizePCAP.java b/contrib/format-pcapng/src/test/java/org/apache/drill/exec/store/pcap/TestSessionizePCAP.java similarity index 95% rename from exec/java-exec/src/test/java/org/apache/drill/exec/store/pcap/TestSessionizePCAP.java rename to contrib/format-pcapng/src/test/java/org/apache/drill/exec/store/pcap/TestSessionizePCAP.java index addc1a1..001576d 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/pcap/TestSessionizePCAP.java +++ b/contrib/format-pcapng/src/test/java/org/apache/drill/exec/store/pcap/TestSessionizePCAP.java @@ -23,6 +23,7 @@ import org.apache.drill.exec.physical.rowSet.RowSet; import org.apache.drill.exec.physical.rowSet.RowSetBuilder; import org.apache.drill.exec.record.metadata.SchemaBuilder; import org.apache.drill.exec.record.metadata.TupleMetadata; +import org.apache.drill.exec.store.pcap.plugin.PcapFormatConfig; import org.apache.drill.test.ClusterFixture; import org.apache.drill.test.ClusterTest; import org.apache.drill.test.QueryBuilder; @@ -43,15 +44,15 @@ public class TestSessionizePCAP extends ClusterTest { public static void setup() throws Exception { ClusterTest.startCluster(ClusterFixture.builder(dirTestWatcher)); - PcapFormatConfig sampleConfig = new PcapFormatConfig(null, true); + PcapFormatConfig sampleConfig = new PcapFormatConfig(null, true, true); cluster.defineFormat("cp", "pcap", sampleConfig); - dirTestWatcher.copyResourceToRoot(Paths.get("store/pcap/")); + dirTestWatcher.copyResourceToRoot(Paths.get("pcap/")); } @Test public void testSessionizedStarQuery() throws Exception { - String sql = "SELECT * FROM cp.`/store/pcap/http.pcap`"; - String dataFromRemote = readAFileIntoString(dirTestWatcher.getRootDir().getAbsolutePath() + "/store/pcap/dataFromRemote.txt"); + String sql = "SELECT * FROM cp.`/pcap/http.pcap`"; + String dataFromRemote = readAFileIntoString(dirTestWatcher.getRootDir().getAbsolutePath() + "/pcap/dataFromRemote.txt"); QueryBuilder q = client.queryBuilder().sql(sql); RowSet results = q.rowSet(); @@ -104,9 +105,9 @@ public class TestSessionizePCAP extends ClusterTest { String sql = "SELECT src_ip, dst_ip, src_port, dst_port, src_mac_address, dst_mac_address," + "session_start_time, session_end_time, session_duration, total_packet_count, data_volume_from_origin, data_volume_from_remote," + "packet_count_from_origin, packet_count_from_remote, connection_time, tcp_session, is_corrupt, data_from_originator, data_from_remote " + - "FROM cp.`/store/pcap/http.pcap`"; + "FROM cp.`/pcap/http.pcap`"; - String dataFromRemote = readAFileIntoString(dirTestWatcher.getRootDir().getAbsolutePath() + "/store/pcap/dataFromRemote.txt"); + String dataFromRemote = readAFileIntoString(dirTestWatcher.getRootDir().getAbsolutePath() + "/pcap/dataFromRemote.txt"); QueryBuilder q = client.queryBuilder().sql(sql); RowSet results = q.rowSet(); @@ -156,7 +157,7 @@ public class TestSessionizePCAP extends ClusterTest { @Test public void testSerDe() throws Exception { - String sql = "SELECT COUNT(*) FROM cp.`/store/pcap/http.pcap`"; + String sql = "SELECT COUNT(*) FROM cp.`/pcap/http.pcap`"; String plan = queryBuilder().sql(sql).explainJson(); long cnt = queryBuilder().physical(plan).singletonLong(); assertEquals("Counts should match", 1L, cnt); diff --git a/contrib/format-pcapng/src/test/java/org/apache/drill/exec/store/pcapng/TestPcapngRecordReader.java b/contrib/format-pcapng/src/test/java/org/apache/drill/exec/store/pcapng/TestPcapngRecordReader.java index b069432..ceb76bf 100644 --- a/contrib/format-pcapng/src/test/java/org/apache/drill/exec/store/pcapng/TestPcapngRecordReader.java +++ b/contrib/format-pcapng/src/test/java/org/apache/drill/exec/store/pcapng/TestPcapngRecordReader.java @@ -35,6 +35,7 @@ import org.apache.drill.test.QueryBuilder; import org.apache.drill.test.QueryTestUtil; import org.apache.drill.test.rowSet.RowSetComparison; import org.junit.BeforeClass; +import org.junit.Ignore; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -45,6 +46,7 @@ public class TestPcapngRecordReader extends ClusterTest { public static void setup() throws Exception { ClusterTest.startCluster(ClusterFixture.builder(dirTestWatcher)); dirTestWatcher.copyResourceToRoot(Paths.get("pcapng/")); + dirTestWatcher.copyResourceToRoot(Paths.get("todo/")); } @Test @@ -190,6 +192,20 @@ public class TestPcapngRecordReader extends ClusterTest { } @Test + @Ignore // todo: infinite loop with current PcapNGReader + public void testPcapNG() throws Exception { +// String sql = "select * from dfs.`todo/dhcp_big_endian.pcapng` limit 1"; // Bad magic number = 000a0a0a +// String sql = "select * from dfs.`todo/dhcp_little_endian.pcapng` limit 1"; // Bad magic number = 1c0a0a0a +// String sql = "select * from dfs.`todo/many_interfaces.pcapng` limit 1"; // Bad magic number = ef0a0a0a + String sql = "select * from dfs.`todo/mac2.pcap` limit 1"; // Bad magic number = 1c0a0a0a + QueryBuilder builder = client.queryBuilder().sql(sql); + RowSet sets = builder.rowSet(); + + assertEquals(1, sets.rowCount()); + sets.clear(); + } + + @Test public void testGroupBy() throws Exception { String sql = "select src_ip, count(1), sum(packet_length) from dfs.`pcapng/sniff.pcapng` group by src_ip"; QueryBuilder builder = client.queryBuilder().sql(sql); @@ -214,4 +230,34 @@ public class TestPcapngRecordReader extends ClusterTest { String sql = "select * from dfs.`pcapng/drill.pcapng`"; client.queryBuilder().sql(sql).rowSet(); } -} \ No newline at end of file + + @Test + public void testPcapNGFileWithPcapExt() throws Exception { + String sql = "select count(*) from dfs.`pcapng/example.pcap`"; + String plan = queryBuilder().sql(sql).explainJson(); + long cnt = queryBuilder().physical(plan).singletonLong(); + + assertEquals("Counts should match", 1, cnt); + } + + @Test + public void testInlineSchema() throws Exception { + String sql = "SELECT type, packet_length, `timestamp` FROM table(dfs.`pcapng/sniff.pcapng` " + + "(type => 'pcapng', stat => false, sessionizeTCPStreams => true )) where type = 'ARP'"; + RowSet sets = client.queryBuilder().sql(sql).rowSet(); + + TupleMetadata schema = new SchemaBuilder() + .addNullable("type", MinorType.VARCHAR) + .add("packet_length", MinorType.INT) + .add("timestamp", MinorType.TIMESTAMP) + .buildSchema(); + + RowSet expected = new RowSetBuilder(client.allocator(), schema) + .addRow("ARP", 90, Instant.ofEpochMilli(1518010669927L)) + .addRow("ARP", 90, Instant.ofEpochMilli(1518010671874L)) + .build(); + + assertEquals(2, sets.rowCount()); + new RowSetComparison(expected).verifyAndClearAll(sets); + } +} diff --git a/contrib/format-pcapng/src/test/java/org/apache/drill/exec/store/pcapng/TestPcapngStatRecordReader.java b/contrib/format-pcapng/src/test/java/org/apache/drill/exec/store/pcapng/TestPcapngStatRecordReader.java index d1d966e..9971a88 100644 --- a/contrib/format-pcapng/src/test/java/org/apache/drill/exec/store/pcapng/TestPcapngStatRecordReader.java +++ b/contrib/format-pcapng/src/test/java/org/apache/drill/exec/store/pcapng/TestPcapngStatRecordReader.java @@ -27,6 +27,8 @@ import org.apache.drill.exec.physical.rowSet.RowSet; import org.apache.drill.exec.physical.rowSet.RowSetBuilder; import org.apache.drill.exec.record.metadata.SchemaBuilder; import org.apache.drill.exec.record.metadata.TupleMetadata; +import org.apache.drill.exec.store.pcap.plugin.PcapFormatConfig; +import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList; import org.apache.drill.test.ClusterFixture; import org.apache.drill.test.ClusterTest; import org.apache.drill.test.QueryBuilder; @@ -41,7 +43,7 @@ public class TestPcapngStatRecordReader extends ClusterTest { @BeforeClass public static void setup() throws Exception { ClusterTest.startCluster(ClusterFixture.builder(dirTestWatcher)); - cluster.defineFormat("dfs", "pcapng", new PcapngFormatConfig(null, true)); + cluster.defineFormat("dfs", "pcapng", new PcapFormatConfig(ImmutableList.of("pcapng"), true, false)); dirTestWatcher.copyResourceToRoot(Paths.get("pcapng/")); } @@ -136,4 +138,4 @@ public class TestPcapngStatRecordReader extends ClusterTest { RowSet expected = new RowSetBuilder(client.allocator(), schema).build(); new RowSetComparison(expected).verifyAndClearAll(sets); } -} \ No newline at end of file +} diff --git a/contrib/format-pcapng/src/test/resources/config/oldPcapPlugins.json b/contrib/format-pcapng/src/test/resources/config/oldPcapPlugins.json new file mode 100644 index 0000000..c8c6645 --- /dev/null +++ b/contrib/format-pcapng/src/test/resources/config/oldPcapPlugins.json @@ -0,0 +1,12 @@ +{ + "type": "file", + "formats": { + "pcapng": { + "type": "pcapng", + "stat" : true + }, + "pcap" : { + "type" : "pcap" + } + } +} \ No newline at end of file diff --git a/exec/java-exec/src/test/resources/store/pcap/arpWithNullIP.pcap b/contrib/format-pcapng/src/test/resources/pcap/arpWithNullIP.pcap similarity index 100% rename from exec/java-exec/src/test/resources/store/pcap/arpWithNullIP.pcap rename to contrib/format-pcapng/src/test/resources/pcap/arpWithNullIP.pcap diff --git a/exec/java-exec/src/test/resources/store/pcap/data-1.pcap b/contrib/format-pcapng/src/test/resources/pcap/data-1.pcap similarity index 100% rename from exec/java-exec/src/test/resources/store/pcap/data-1.pcap rename to contrib/format-pcapng/src/test/resources/pcap/data-1.pcap diff --git a/contrib/format-pcapng/src/test/resources/pcap/data-2.pcap b/contrib/format-pcapng/src/test/resources/pcap/data-2.pcap new file mode 100644 index 0000000..a9a4563 Binary files /dev/null and b/contrib/format-pcapng/src/test/resources/pcap/data-2.pcap differ diff --git a/exec/java-exec/src/test/resources/store/pcap/dataFromRemote.txt b/contrib/format-pcapng/src/test/resources/pcap/dataFromRemote.txt similarity index 100% rename from exec/java-exec/src/test/resources/store/pcap/dataFromRemote.txt rename to contrib/format-pcapng/src/test/resources/pcap/dataFromRemote.txt diff --git a/exec/java-exec/src/test/resources/store/pcap/http.pcap b/contrib/format-pcapng/src/test/resources/pcap/http.pcap similarity index 100% rename from exec/java-exec/src/test/resources/store/pcap/http.pcap rename to contrib/format-pcapng/src/test/resources/pcap/http.pcap diff --git a/exec/java-exec/src/test/resources/store/pcap/synscan.pcap b/contrib/format-pcapng/src/test/resources/pcap/synscan.pcap similarity index 100% rename from exec/java-exec/src/test/resources/store/pcap/synscan.pcap rename to contrib/format-pcapng/src/test/resources/pcap/synscan.pcap diff --git a/exec/java-exec/src/test/resources/store/pcap/tcp-1.pcap b/contrib/format-pcapng/src/test/resources/pcap/tcp-1.pcap similarity index 100% rename from exec/java-exec/src/test/resources/store/pcap/tcp-1.pcap rename to contrib/format-pcapng/src/test/resources/pcap/tcp-1.pcap diff --git a/exec/java-exec/src/test/resources/store/pcap/tcp-2.pcap b/contrib/format-pcapng/src/test/resources/pcap/tcp-2.pcap similarity index 100% rename from exec/java-exec/src/test/resources/store/pcap/tcp-2.pcap rename to contrib/format-pcapng/src/test/resources/pcap/tcp-2.pcap diff --git a/exec/java-exec/src/test/resources/store/pcap/testv1.pcap b/contrib/format-pcapng/src/test/resources/pcap/testv1.pcap similarity index 100% rename from exec/java-exec/src/test/resources/store/pcap/testv1.pcap rename to contrib/format-pcapng/src/test/resources/pcap/testv1.pcap diff --git a/contrib/format-pcapng/src/test/resources/pcapng/example.pcap b/contrib/format-pcapng/src/test/resources/pcapng/example.pcap new file mode 100644 index 0000000..002cb8d Binary files /dev/null and b/contrib/format-pcapng/src/test/resources/pcapng/example.pcap differ diff --git a/contrib/format-pcapng/src/test/resources/todo/dhcp.pcapng b/contrib/format-pcapng/src/test/resources/todo/dhcp.pcapng new file mode 100644 index 0000000..d7d2e33 Binary files /dev/null and b/contrib/format-pcapng/src/test/resources/todo/dhcp.pcapng differ diff --git a/contrib/format-pcapng/src/test/resources/todo/dhcp_big_endian.pcapng b/contrib/format-pcapng/src/test/resources/todo/dhcp_big_endian.pcapng new file mode 100644 index 0000000..894b361 Binary files /dev/null and b/contrib/format-pcapng/src/test/resources/todo/dhcp_big_endian.pcapng differ diff --git a/contrib/format-pcapng/src/test/resources/todo/dhcp_little_endian.pcapng b/contrib/format-pcapng/src/test/resources/todo/dhcp_little_endian.pcapng new file mode 100644 index 0000000..3378440 Binary files /dev/null and b/contrib/format-pcapng/src/test/resources/todo/dhcp_little_endian.pcapng differ diff --git a/contrib/format-pcapng/src/test/resources/todo/many_interfaces.pcapng b/contrib/format-pcapng/src/test/resources/todo/many_interfaces.pcapng new file mode 100644 index 0000000..6a8b397 Binary files /dev/null and b/contrib/format-pcapng/src/test/resources/todo/many_interfaces.pcapng differ diff --git a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseTableProvider.java b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseTableProvider.java index 43fa88b..592479e 100644 --- a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseTableProvider.java +++ b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseTableProvider.java @@ -49,7 +49,8 @@ public class TestHBaseTableProvider extends BaseHBaseTest { @Test public void testTableProvider() throws StoreException { LogicalPlanPersistence lp = PhysicalPlanReaderTestFactory.defaultLogicalPlanPersistence(config); - PersistentStore<String> hbaseStore = provider.getOrCreateStore(PersistentStoreConfig.newJacksonBuilder(lp.getMapper(), String.class).name("hbase").build()); + PersistentStore<String> hbaseStore = provider.getOrCreateStore( + PersistentStoreConfig.newJacksonBuilder(lp.getMapper(), String.class).name("hbase").build()); hbaseStore.put("", "v0"); hbaseStore.put("k1", "v1"); hbaseStore.put("k2", "v2"); @@ -66,7 +67,8 @@ public class TestHBaseTableProvider extends BaseHBaseTest { assertEquals(7, Lists.newArrayList(hbaseStore.getAll()).size()); - PersistentStore<String> hbaseTestStore = provider.getOrCreateStore(PersistentStoreConfig.newJacksonBuilder(lp.getMapper(), String.class).name("hbase.test").build()); + PersistentStore<String> hbaseTestStore = provider.getOrCreateStore( + PersistentStoreConfig.newJacksonBuilder(lp.getMapper(), String.class).name("hbase.test").build()); hbaseTestStore.put("", "v0"); hbaseTestStore.put("k1", "v1"); hbaseTestStore.put("k2", "v2"); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/columns/ColumnsScanFramework.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/columns/ColumnsScanFramework.java index 842c798..9fe2f94 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/columns/ColumnsScanFramework.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/columns/ColumnsScanFramework.java @@ -62,7 +62,6 @@ public class ColumnsScanFramework extends FileScanFramework { /** * Implementation of the columns array schema negotiator. */ - public static class ColumnsSchemaNegotiatorImpl extends FileSchemaNegotiatorImpl implements ColumnsSchemaNegotiator { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/file/FileScanFramework.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/file/FileScanFramework.java index b241528..12e1bd0 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/file/FileScanFramework.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/file/FileScanFramework.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Iterator; import java.util.List; +import java.util.Optional; import org.apache.drill.common.exceptions.ChildErrorContext; import org.apache.drill.common.exceptions.CustomErrorContext; @@ -188,11 +189,18 @@ public class FileScanFramework extends ManagedScanFramework { } public abstract ManagedReader<? extends FileSchemaNegotiator> newReader(); + + /** + * @return FileScanFramework or empty object in case it is not binded yet with {@link #bind(ManagedScanFramework)} + */ + protected Optional<FileScanFramework> fileFramework() { + return Optional.ofNullable(fileFramework); + } } private ImplicitColumnManager metadataManager; private DrillFileSystem dfs; - private final List<FileSplit> spilts = new ArrayList<>(); + private final List<FileSplit> splits = new ArrayList<>(); private Iterator<FileSplit> splitIter; private FileSplit currentSplit; @@ -230,9 +238,9 @@ public class FileScanFramework extends ManagedScanFramework { Path path = dfs.makeQualified(work.getPath()); paths.add(path); FileSplit split = new FileSplit(path, work.getStart(), work.getLength(), new String[]{""}); - spilts.add(split); + splits.add(split); } - splitIter = spilts.iterator(); + splitIter = splits.iterator(); // Create the metadata manager to handle file metadata columns // (so-called implicit columns and partition columns.) @@ -274,4 +282,8 @@ public class FileScanFramework extends ManagedScanFramework { .build(logger); } } + + public DrillFileSystem fileSystem() { + return dfs; + } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java index 5c290d3..d701da4 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java @@ -382,8 +382,7 @@ public abstract class EasyFormatPlugin<T extends FormatPluginConfig> implements */ protected EasyFormatPlugin(String name, DrillbitContext context, Configuration fsConf, StoragePluginConfig storageConfig, T formatConfig, boolean readable, boolean writable, - boolean blockSplittable, - boolean compressible, List<String> extensions, String defaultName) { + boolean blockSplittable, boolean compressible, List<String> extensions, String defaultName) { this.name = name == null ? defaultName : name; easyConfig = EasyFormatConfig.builder() .matcher(new BasicFormatMatcher(this, fsConf, extensions, compressible)) diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/PcapFormatConfig.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/PcapFormatConfig.java deleted file mode 100644 index 1312151..0000000 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/PcapFormatConfig.java +++ /dev/null @@ -1,83 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.drill.exec.store.pcap; - -import com.fasterxml.jackson.annotation.JsonCreator; -import com.fasterxml.jackson.annotation.JsonInclude; -import com.fasterxml.jackson.annotation.JsonProperty; -import com.fasterxml.jackson.annotation.JsonTypeName; - -import org.apache.drill.common.PlanStringBuilder; -import org.apache.drill.common.logical.FormatPluginConfig; -import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList; - -import java.util.List; -import java.util.Objects; - -@JsonTypeName(PcapFormatPlugin.PLUGIN_NAME) -public class PcapFormatConfig implements FormatPluginConfig { - private static final List<String> DEFAULT_EXTNS = ImmutableList.of(PcapFormatPlugin.PLUGIN_NAME); - - private final List<String> extensions; - private final boolean sessionizeTCPStreams; - - @JsonCreator - public PcapFormatConfig( - @JsonProperty("extensions") List<String> extensions, - @JsonProperty("sessionizeTCPStreams") Boolean sessionizeTCPStreams) { - this.extensions = extensions == null ? - DEFAULT_EXTNS : ImmutableList.copyOf(extensions); - this.sessionizeTCPStreams = sessionizeTCPStreams == null ? false : sessionizeTCPStreams; - } - - @JsonInclude(JsonInclude.Include.NON_DEFAULT) - public List<String> getExtensions() { - return extensions; - } - - @JsonInclude(JsonInclude.Include.NON_DEFAULT) - public boolean getSessionizeTCPStreams() { - return sessionizeTCPStreams; - } - - @Override - public int hashCode() { - return Objects.hash(extensions, sessionizeTCPStreams); - } - - @Override - public boolean equals(Object obj) { - if (this == obj) { - return true; - } - if (obj == null || getClass() != obj.getClass()) { - return false; - } - PcapFormatConfig other = (PcapFormatConfig) obj; - return Objects.equals(extensions, other.extensions) && - Objects.equals(sessionizeTCPStreams, other.sessionizeTCPStreams); - } - - @Override - public String toString() { - return new PlanStringBuilder(this) - .field("extensions", extensions) - .field("sessionizeTCPStreams", sessionizeTCPStreams) - .toString(); - } -} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/PcapFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/PcapFormatPlugin.java deleted file mode 100644 index 69dbfc5..0000000 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/PcapFormatPlugin.java +++ /dev/null @@ -1,88 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.drill.exec.store.pcap; - -import org.apache.drill.common.types.TypeProtos; -import org.apache.drill.common.types.Types; -import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileSchemaNegotiator; -import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileReaderFactory; -import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileScanBuilder; -import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader; -import org.apache.drill.exec.server.options.OptionManager; -import org.apache.drill.exec.store.dfs.easy.EasySubScan; -import org.apache.drill.common.logical.StoragePluginConfig; -import org.apache.drill.exec.server.DrillbitContext; -import org.apache.drill.exec.store.dfs.easy.EasyFormatPlugin; -import org.apache.hadoop.conf.Configuration; -import org.apache.drill.exec.store.pcap.PcapBatchReader.PcapReaderConfig; - -public class PcapFormatPlugin extends EasyFormatPlugin<PcapFormatConfig> { - - public static final String PLUGIN_NAME = "pcap"; - - private static class PcapReaderFactory extends FileReaderFactory { - - private final PcapReaderConfig readerConfig; - private final int maxRecords; - - public PcapReaderFactory(PcapReaderConfig config, int maxRecords) { - readerConfig = config; - this.maxRecords = maxRecords; - } - - @Override - public ManagedReader<? extends FileSchemaNegotiator> newReader() { - return new PcapBatchReader(readerConfig, maxRecords); - } - } - - public PcapFormatPlugin(String name, DrillbitContext context, - Configuration fsConf, StoragePluginConfig storageConfig, - PcapFormatConfig formatConfig) { - super(name, easyConfig(fsConf, formatConfig), context, storageConfig, formatConfig); - } - - private static EasyFormatConfig easyConfig(Configuration fsConf, PcapFormatConfig pluginConfig) { - return EasyFormatConfig.builder() - .readable(true) - .writable(false) - .blockSplittable(false) - .compressible(true) - .supportsProjectPushdown(true) - .extensions(pluginConfig.getExtensions()) - .fsConf(fsConf) - .defaultName(PLUGIN_NAME) - .useEnhancedScan(true) - .supportsLimitPushdown(true) - .build(); - } - - @Override - public ManagedReader<? extends FileSchemaNegotiator> newBatchReader(EasySubScan scan, OptionManager options) { - return new PcapBatchReader(new PcapReaderConfig(this), scan.getMaxRecords()); - } - - @Override - protected FileScanBuilder frameworkBuilder(OptionManager options, EasySubScan scan) { - FileScanBuilder builder = new FileScanBuilder(); - builder.setReaderFactory(new PcapReaderFactory(new PcapReaderConfig(this), scan.getMaxRecords())); - initScanBuilder(builder, scan); - builder.nullType(Types.optional(TypeProtos.MinorType.VARCHAR)); - return builder; - } -} diff --git a/exec/java-exec/src/main/resources/bootstrap-storage-plugins.json b/exec/java-exec/src/main/resources/bootstrap-storage-plugins.json index 4ee29c2..81f63f2 100644 --- a/exec/java-exec/src/main/resources/bootstrap-storage-plugins.json +++ b/exec/java-exec/src/main/resources/bootstrap-storage-plugins.json @@ -38,9 +38,6 @@ "type" : "json", "extensions" : [ "json" ] }, - "pcap" : { - "type" : "pcap" - }, "avro" : { "type" : "avro", "extensions" : [ "avro" ] @@ -98,9 +95,6 @@ "type" : "json", "extensions" : [ "json" ] }, - "pcap" : { - "type" : "pcap" - }, "avro" : { "type" : "avro" }, @@ -135,9 +129,6 @@ "type" : "json", "extensions" : [ "json" ] }, - "pcap" : { - "type" : "pcap" - }, "parquet" : { "type" : "parquet" }, diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/TestWithZookeeper.java b/exec/java-exec/src/test/java/org/apache/drill/exec/TestWithZookeeper.java index a55c7c3..1ceb91e 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/TestWithZookeeper.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/TestWithZookeeper.java @@ -17,6 +17,10 @@ */ package org.apache.drill.exec; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.retry.RetryNTimes; +import org.apache.drill.common.config.DrillConfig; import org.junit.After; import org.junit.Before; @@ -35,4 +39,17 @@ public class TestWithZookeeper extends ExecTest { public void tearDown() throws Exception { zkHelper.stopZookeeper(); } + + protected CuratorFramework createCurator() { + String connect = zkHelper.getConnectionString(); + DrillConfig config = zkHelper.getConfig(); + + CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder() + .namespace(zkHelper.getConfig().getString(ExecConstants.ZK_ROOT)) + .retryPolicy(new RetryNTimes(1, 100)) + .connectionTimeoutMs(config.getInt(ExecConstants.ZK_TIMEOUT)) + .connectString(connect); + + return builder.build(); + } } diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/FormatPluginSerDeTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/FormatPluginSerDeTest.java index 6001773..f8dee17 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/FormatPluginSerDeTest.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/FormatPluginSerDeTest.java @@ -82,16 +82,6 @@ public class FormatPluginSerDeTest extends PlanTestBase { } @Test - public void testPcap() throws Exception { - String path = "store/pcap/tcp-1.pcap"; - dirTestWatcher.copyResourceToRoot(Paths.get(path)); - testPhysicalPlanSubmission( - String.format("select * from dfs.`%s`", path), - String.format("select * from table(dfs.`%s`(type=>'pcap'))", path) - ); - } - - @Test public void testJson() throws Exception { testPhysicalPlanSubmission( "select * from cp.`donuts.json`", diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/TestPStoreProviders.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/TestPStoreProviders.java index efc8a7b..cab9877 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/TestPStoreProviders.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/TestPStoreProviders.java @@ -19,8 +19,6 @@ package org.apache.drill.exec.store.sys; import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.curator.framework.CuratorFramework; -import org.apache.curator.framework.CuratorFrameworkFactory; -import org.apache.curator.retry.RetryNTimes; import org.apache.drill.categories.SlowTest; import org.apache.drill.common.config.DrillConfig; import org.apache.drill.common.util.DrillFileUtils; @@ -144,17 +142,4 @@ public class TestPStoreProviders extends TestWithZookeeper { } } } - - private CuratorFramework createCurator() { - String connect = zkHelper.getConnectionString(); - DrillConfig config = zkHelper.getConfig(); - - CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder() - .namespace(zkHelper.getConfig().getString(ExecConstants.ZK_ROOT)) - .retryPolicy(new RetryNTimes(1, 100)) - .connectionTimeoutMs(config.getInt(ExecConstants.ZK_TIMEOUT)) - .connectString(connect); - - return builder.build(); - } } diff --git a/exec/java-exec/src/test/resources/plugins/mock-plugin-upgrade.json b/exec/java-exec/src/test/resources/plugins/mock-plugin-upgrade.json index fd86af2..e8709bc 100644 --- a/exec/java-exec/src/test/resources/plugins/mock-plugin-upgrade.json +++ b/exec/java-exec/src/test/resources/plugins/mock-plugin-upgrade.json @@ -33,9 +33,6 @@ "type" : "json", "extensions" : [ "json" ] }, - "pcap" : { - "type" : "pcap" - }, "avro" : { "type" : "avro", "extensions" : [ "avro" ] @@ -147,9 +144,6 @@ "type" : "json", "extensions" : [ "json" ] }, - "pcap" : { - "type" : "pcap" - }, "avro" : { "type" : "avro", "extensions" : [ "avro" ] diff --git a/exec/java-exec/src/test/resources/store/pcap/data-2.pcap b/exec/java-exec/src/test/resources/store/pcap/data-2.pcap deleted file mode 100644 index 1f23ee8..0000000 Binary files a/exec/java-exec/src/test/resources/store/pcap/data-2.pcap and /dev/null differ