PHOENIX-1455 Replace org.xerial.snappy with org.iq80.snappy pure Java snappy 
implementation

Conflicts:
        phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheFactory.java


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/ab0bcb83
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/ab0bcb83
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/ab0bcb83

Branch: refs/heads/4.2
Commit: ab0bcb8393ae01c063fa30ef7902f0a1c594f804
Parents: e00763e
Author: Andrew Purtell <apurt...@apache.org>
Authored: Mon Nov 17 18:05:33 2014 -0800
Committer: Andrew Purtell <apurt...@apache.org>
Committed: Mon Nov 17 18:07:02 2014 -0800

----------------------------------------------------------------------
 phoenix-core/pom.xml                            |  6 ++---
 .../DistinctValueWithCountClientAggregator.java | 17 ++++++++++----
 .../DistinctValueWithCountServerAggregator.java | 24 ++++++--------------
 .../apache/phoenix/join/HashCacheClient.java    |  3 ++-
 .../apache/phoenix/join/HashCacheFactory.java   | 15 ++++++++----
 pom.xml                                         | 14 +++++++++---
 6 files changed, 45 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/ab0bcb83/phoenix-core/pom.xml
----------------------------------------------------------------------
diff --git a/phoenix-core/pom.xml b/phoenix-core/pom.xml
index 6b8676b..146f709 100644
--- a/phoenix-core/pom.xml
+++ b/phoenix-core/pom.xml
@@ -276,8 +276,8 @@
       <version>${slf4j.version}</version>
     </dependency>
     <dependency>
-      <groupId>org.xerial.snappy</groupId>
-      <artifactId>snappy-java</artifactId>
+      <groupId>org.iq80.snappy</groupId>
+      <artifactId>snappy</artifactId>
       <version>${snappy.version}</version>
     </dependency>
     <dependency>
@@ -399,4 +399,4 @@
       <artifactId>hadoop-minicluster</artifactId>
     </dependency>
   </dependencies>
-</project>
\ No newline at end of file
+</project>

http://git-wip-us.apache.org/repos/asf/phoenix/blob/ab0bcb83/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/DistinctValueWithCountClientAggregator.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/DistinctValueWithCountClientAggregator.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/DistinctValueWithCountClientAggregator.java
index f29f46a..56ca000 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/DistinctValueWithCountClientAggregator.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/DistinctValueWithCountClientAggregator.java
@@ -35,6 +35,7 @@ import org.apache.phoenix.schema.PDataType;
 import org.apache.phoenix.schema.SortOrder;
 import org.apache.phoenix.schema.tuple.SingleKeyValueTuple;
 import org.apache.phoenix.schema.tuple.Tuple;
+import org.iq80.snappy.Snappy;
 
 /**
  * Client side Aggregator which will aggregate data and find distinct values 
with number of occurrences for each.
@@ -59,14 +60,20 @@ public abstract class 
DistinctValueWithCountClientAggregator extends BaseAggrega
             PDataType resultDataType = getResultDataType();
             cachedResult = resultDataType.toObject(ptr, resultDataType, 
sortOrder);
         } else {
-            InputStream is = new ByteArrayInputStream(ptr.get(), 
ptr.getOffset() + 1, ptr.getLength() - 1);
+            InputStream is;
             try {
                 if (Bytes.equals(ptr.get(), ptr.getOffset(), 1, 
DistinctValueWithCountServerAggregator.COMPRESS_MARKER,
                         0, 1)) {
-                    InputStream decompressionStream = 
DistinctValueWithCountServerAggregator.COMPRESS_ALGO
-                            .createDecompressionStream(is,
-                                    
DistinctValueWithCountServerAggregator.COMPRESS_ALGO.getDecompressor(), 0);
-                    is = decompressionStream;
+                    // This reads the uncompressed length from the front of 
the compressed input
+                    int uncompressedLength = 
Snappy.getUncompressedLength(ptr.get(), ptr.getOffset() + 1);
+                    byte[] uncompressed = new byte[uncompressedLength];
+                    // This will throw CorruptionException, a RuntimeException 
if the snappy data is invalid.
+                    // We're making a RuntimeException out of a checked 
IOException below so assume it's ok
+                    // to let any CorruptionException escape.
+                    Snappy.uncompress(ptr.get(), ptr.getOffset() + 1, 
ptr.getLength() - 1, uncompressed, 0);
+                    is = new ByteArrayInputStream(uncompressed, 0, 
uncompressedLength);
+                } else {
+                    is = new ByteArrayInputStream(ptr.get(), ptr.getOffset() + 
1, ptr.getLength() - 1);
                 }
                 DataInputStream in = new DataInputStream(is);
                 int mapSize = WritableUtils.readVInt(in);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/ab0bcb83/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/DistinctValueWithCountServerAggregator.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/DistinctValueWithCountServerAggregator.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/DistinctValueWithCountServerAggregator.java
index 281879e..a3141b1 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/DistinctValueWithCountServerAggregator.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/DistinctValueWithCountServerAggregator.java
@@ -17,16 +17,12 @@
  */
 package org.apache.phoenix.expression.aggregator;
 
-import java.io.ByteArrayOutputStream;
-import java.io.OutputStream;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Map.Entry;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.io.compress.Compression;
-import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -40,6 +36,8 @@ import org.apache.phoenix.schema.tuple.Tuple;
 import org.apache.phoenix.util.ByteUtil;
 import org.apache.phoenix.util.SizedUtil;
 
+import org.iq80.snappy.Snappy;
+
 /**
  * Server side Aggregator which will aggregate data and find distinct values 
with number of occurrences for each.
  * 
@@ -50,7 +48,6 @@ public class DistinctValueWithCountServerAggregator extends 
BaseAggregator {
     private static final Logger LOG = 
LoggerFactory.getLogger(DistinctValueWithCountServerAggregator.class);
     public static final int DEFAULT_ESTIMATED_DISTINCT_VALUES = 10000;
     public static final byte[] COMPRESS_MARKER = new byte[] { (byte)1 };
-    public static final Algorithm COMPRESS_ALGO = Compression.Algorithm.SNAPPY;
 
     private int compressThreshold;
     private byte[] buffer = null;
@@ -101,18 +98,11 @@ public class DistinctValueWithCountServerAggregator 
extends BaseAggregator {
         }
         if (serializationSize > compressThreshold) {
             // The size for the map serialization is above the threshold. We 
will do the Snappy compression here.
-            ByteArrayOutputStream compressedByteStream = new 
ByteArrayOutputStream();
-            try {
-                compressedByteStream.write(COMPRESS_MARKER);
-                OutputStream compressionStream = 
COMPRESS_ALGO.createCompressionStream(compressedByteStream,
-                        COMPRESS_ALGO.getCompressor(), 0);
-                compressionStream.write(buffer, 1, buffer.length - 1);
-                compressionStream.flush();
-                ptr.set(compressedByteStream.toByteArray(), 0, 
compressedByteStream.size());
-                return true;
-            } catch (Exception e) {
-                LOG.error("Exception while Snappy compression of data.", e);
-            }
+            byte[] compressed = new byte[COMPRESS_MARKER.length + 
Snappy.maxCompressedLength(buffer.length)];
+            System.arraycopy(COMPRESS_MARKER, 0, compressed, 0, 
COMPRESS_MARKER.length);
+            int compressedLen = Snappy.compress(buffer, 1, buffer.length - 1, 
compressed, COMPRESS_MARKER.length);
+            ptr.set(compressed, 0, compressedLen + 1);
+            return true;
         }
         ptr.set(buffer, 0, offset);
         return true;

http://git-wip-us.apache.org/repos/asf/phoenix/blob/ab0bcb83/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheClient.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheClient.java 
b/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheClient.java
index e2f57df..6494603 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheClient.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheClient.java
@@ -39,7 +39,8 @@ import org.apache.phoenix.schema.tuple.Tuple;
 import org.apache.phoenix.util.ServerUtil;
 import org.apache.phoenix.util.TrustedByteArrayOutputStream;
 import org.apache.phoenix.util.TupleUtil;
-import org.xerial.snappy.Snappy;
+
+import org.iq80.snappy.Snappy;
 
 /**
  * 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/ab0bcb83/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheFactory.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheFactory.java 
b/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheFactory.java
index cfe064c..f1f6e22 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheFactory.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheFactory.java
@@ -36,6 +36,7 @@ import net.jcip.annotations.Immutable;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.io.WritableUtils;
+
 import org.apache.phoenix.cache.HashCache;
 import org.apache.phoenix.coprocessor.ServerCachingProtocol.ServerCacheFactory;
 import org.apache.phoenix.exception.SQLExceptionCode;
@@ -50,7 +51,9 @@ import org.apache.phoenix.util.ResultUtil;
 import org.apache.phoenix.util.ServerUtil;
 import org.apache.phoenix.util.SizedUtil;
 import org.apache.phoenix.util.TupleUtil;
-import org.xerial.snappy.Snappy;
+
+import org.iq80.snappy.CorruptionException;
+import org.iq80.snappy.Snappy;
 
 public class HashCacheFactory implements ServerCacheFactory {
 
@@ -68,11 +71,13 @@ public class HashCacheFactory implements ServerCacheFactory 
{
     @Override
     public Closeable newCache(ImmutableBytesWritable cachePtr, MemoryChunk 
chunk) throws SQLException {
         try {
-            int size = Snappy.uncompressedLength(cachePtr.get());
-            byte[] uncompressed = new byte[size];
-            Snappy.uncompress(cachePtr.get(), 0, cachePtr.getLength(), 
uncompressed, 0);
+            // This reads the uncompressed length from the front of the 
compressed input
+            int uncompressedLen = Snappy.getUncompressedLength(cachePtr.get(), 
cachePtr.getOffset());
+            byte[] uncompressed = new byte[uncompressedLen];
+            Snappy.uncompress(cachePtr.get(), cachePtr.getOffset(), 
cachePtr.getLength(),
+                uncompressed, 0);
             return new HashCacheImpl(uncompressed, chunk);
-        } catch (IOException e) {
+        } catch (CorruptionException e) {
             throw ServerUtil.parseServerException(e);
         }
     }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/ab0bcb83/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index dc6bd1f..e97957e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -45,6 +45,14 @@
         <enabled>true</enabled>
       </snapshots>
     </repository>
+    <repository>
+      <id>sonatype-nexus-snapshots</id>
+      <name>Sonatype Nexus Snapshots</name>
+      <url>https://oss.sonatype.org/content/repositories/snapshots</url>
+      <snapshots>
+        <enabled>true</enabled>
+      </snapshots>
+    </repository>
   </repositories>
 
   <parent>
@@ -89,7 +97,7 @@
     <flume.version>1.4.0</flume.version>
     <findbugs.version>1.3.2</findbugs.version>
     <jline.version>2.11</jline.version>
-    <snappy.version>1.1.0.1</snappy.version>
+    <snappy.version>0.3</snappy.version>
     <netty.version>3.6.6.Final</netty.version>
     <commons-codec.version>1.7</commons-codec.version>
     <htrace.version>2.04</htrace.version>
@@ -533,8 +541,8 @@
         <version>${findbugs.version}</version>
       </dependency>
       <dependency>
-        <groupId>org.xerial.snappy</groupId>
-        <artifactId>snappy-java</artifactId>
+        <groupId>org.iq80.snappy</groupId>
+        <artifactId>snappy</artifactId>
         <version>${snappy.version}</version>
       </dependency>
       <dependency>

Reply via email to