Revision: 19679
          http://sourceforge.net/p/gate/code/19679
Author:   johann_p
Date:     2016-10-14 16:40:39 +0000 (Fri, 14 Oct 2016)
Log Message:
-----------
Add support for Snappy compression/decompression and
make what we can do from the command line / gcp-direct.sh 
a bit more flexible.

Modified Paths:
--------------
    gcp/trunk/src/gate/cloud/batch/BatchRunner.java
    gcp/trunk/src/gate/cloud/io/IOConstants.java
    gcp/trunk/src/gate/cloud/io/file/AbstractFileOutputHandler.java
    gcp/trunk/src/gate/cloud/io/file/FileInputHandler.java

Added Paths:
-----------
    gcp/trunk/lib/snappy-java-1.1.2.6.jar
    gcp/trunk/src/gate/cloud/util/SnappyURLStreamHandler.java

Added: gcp/trunk/lib/snappy-java-1.1.2.6.jar
===================================================================
(Binary files differ)

Index: gcp/trunk/lib/snappy-java-1.1.2.6.jar
===================================================================
--- gcp/trunk/lib/snappy-java-1.1.2.6.jar       2016-10-14 12:03:14 UTC (rev 
19678)
+++ gcp/trunk/lib/snappy-java-1.1.2.6.jar       2016-10-14 16:40:39 UTC (rev 
19679)

Property changes on: gcp/trunk/lib/snappy-java-1.1.2.6.jar
___________________________________________________________________
Added: svn:mime-type
## -0,0 +1 ##
+application/octet-stream
\ No newline at end of property
Modified: gcp/trunk/src/gate/cloud/batch/BatchRunner.java
===================================================================
--- gcp/trunk/src/gate/cloud/batch/BatchRunner.java     2016-10-14 12:03:14 UTC 
(rev 19678)
+++ gcp/trunk/src/gate/cloud/batch/BatchRunner.java     2016-10-14 16:40:39 UTC 
(rev 19679)
@@ -56,6 +56,17 @@
 import org.apache.log4j.Logger;
 
 import com.sun.jna.Platform;
+import static gate.cloud.io.IOConstants.PARAM_COMPRESSION;
+import static gate.cloud.io.IOConstants.PARAM_DOCUMENT_ROOT;
+import static gate.cloud.io.IOConstants.PARAM_ENCODING;
+import static gate.cloud.io.IOConstants.PARAM_FILE_EXTENSION;
+import static gate.cloud.io.IOConstants.PARAM_REPLACE_EXTENSION;
+import static gate.cloud.io.IOConstants.VALUE_COMPRESSION_GZIP;
+import static gate.cloud.io.IOConstants.VALUE_COMPRESSION_NONE;
+import static gate.cloud.io.IOConstants.VALUE_COMPRESSION_SNAPPY;
+import gate.cloud.io.file.JSONOutputHandler;
+import static 
gate.cloud.io.file.JSONOutputHandler.PARAM_ANNOTATION_TYPE_PROPERTY;
+import static gate.cloud.io.file.JSONOutputHandler.PARAM_GROUP_ENTITIES_BY;
 
 /**
 * This class is a Batch Runner, i.e. it manages the execution of a batch job,
@@ -510,14 +521,16 @@
     // session files here?
     options.addOption("b","batchFile",true,"Batch file (required, replaces -i, 
-o, -x, -r, -I)");
     options.addOption("i","inputDirectory",true,"Input directory (required, 
unless -b given)");
-    options.addOption("f","outputFormat",true,"Output format, optional, one of 
'xml' or 'finf', default is 'finf'");
+    options.addOption("f","outputFormat",true,"Output format, optional, one of 
'xml', 'finf', 'ser', 'json', default is 'finf'");
     options.addOption("o","outputDirectory",true,"Output directory (not output 
if missing)");
     options.addOption("x","executePipeline",true,"Pipeline/application file to 
execute (required, unless -b given)");
     options.addOption("r","reportFile",true,"Report file (optional, default: 
report.xml");
     options.addOption("t","numberThreads",true,"Number of threads to use 
(required)");
     options.addOption("I","batchId",true,"Batch ID (optional, default: GCP");
-    options.addOption("ci","compressedInput",false,"Input files are 
gzip-compressed");
-    options.addOption("co","compressedOutput",false,"Output files are 
gzip-compressed");
+    options.addOption("ci","compressedInput",false,"Input files are 
gzip-compressed (.gz)");
+    options.addOption("co","compressedOutput",false,"Output files are 
gzip-compressed (.gz)");
+    options.addOption("so","snappyOutput",false,"Output files are 
snappy-compressed (.snappy)");
+    options.addOption("si","snappyInput",false,"Input files are 
snappy-compressed (.snappy)");
     options.addOption("h","help",false,"Print this help information");
     BasicParser parser = new BasicParser();
     
@@ -563,8 +576,9 @@
       }
       if(line.hasOption('f')) {
         outFormat = line.getOptionValue('f');
-        if(!outFormat.equals("xml") && !outFormat.equals("finf")) {
-          log.error("Output format (option 'f') must be either 'xml' or 
'finf'");
+        if(!outFormat.equals("xml") && !outFormat.equals("finf") && 
+           !outFormat.equals("ser") && !outFormat.equals("json")) {
+          log.error("Output format (option 'f') must be either 'json', 'ser', 
xml' or 'finf'");
           System.exit(1);
         }
       } // if we have option 'f', otherwise use the preset default
@@ -698,14 +712,16 @@
           // set the input Handler
           String inputHandlerClassName = "gate.cloud.io.file.FileInputHandler";
           Map<String,String> configData = new HashMap<String, String>();
-          configData.put(IOConstants.PARAM_DOCUMENT_ROOT, 
line.getOptionValue('i'));
+          configData.put(PARAM_DOCUMENT_ROOT, line.getOptionValue('i'));
           if(line.hasOption("ci")) {
-            configData.put(IOConstants.PARAM_COMPRESSION,"gzip");            
-          } else {
-            configData.put(IOConstants.PARAM_COMPRESSION,"none");
+            configData.put(PARAM_COMPRESSION,VALUE_COMPRESSION_GZIP);          
  
+          } else if(line.hasOption("si"))  {
+            configData.put(PARAM_COMPRESSION,VALUE_COMPRESSION_SNAPPY);
+          } else  {
+            configData.put(PARAM_COMPRESSION,VALUE_COMPRESSION_NONE);
           }
-          configData.put(IOConstants.PARAM_ENCODING, "UTF-8");
-          configData.put(IOConstants.PARAM_FILE_EXTENSION,"");
+          configData.put(PARAM_ENCODING, "UTF-8");
+          configData.put(PARAM_FILE_EXTENSION,"");
           Class<? extends InputHandler> inputHandlerClass =
                 Class.forName(inputHandlerClassName, true, 
Gate.getClassLoader())
                         .asSubclass(InputHandler.class);
@@ -719,26 +735,38 @@
           List<OutputHandler> outHandlers = new ArrayList<OutputHandler>();
           if(line.hasOption('o')) {
             String outputHandlerClassName = null;
+            configData = new HashMap<String, String>();
+            String outExt = ".finf";
             if(outFormat.equals("finf")) {
               outputHandlerClassName = 
"gate.cloud.io.file.FastInfosetOutputHandler";
             } else if(outFormat.equals("xml")) {
+              outExt = ".xml";
               outputHandlerClassName = 
"gate.cloud.io.file.GATEStandOffFileOutputHandler";
-            } 
-            configData = new HashMap<String, String>();
+            } else if(outFormat.equals("ser")) {
+              outExt = ".ser";
+              outputHandlerClassName = 
"gate.cloud.io.file.SerializedObjectOutputHandler";
+            } else if(outFormat.equals("json")) {
+              outExt = ".json";
+              outputHandlerClassName = "gate.cloud.io.file.JSONOutputHandler";
+              configData.put(PARAM_GROUP_ENTITIES_BY, "set");
+              configData.put(PARAM_ANNOTATION_TYPE_PROPERTY, "annType");
+            } else {
+              // cannot get here, option contents is checked earlier...
+            }
             configData.put(IOConstants.PARAM_DOCUMENT_ROOT, 
line.getOptionValue('o'));
-            String outExt = ".finf";
-            if(outFormat.equals("xml")) {
-              outExt = ".xml";
-            }
+            
             if(line.hasOption("co")) {
-              configData.put(IOConstants.PARAM_COMPRESSION,"gzip");            
+              configData.put(PARAM_COMPRESSION,VALUE_COMPRESSION_GZIP);        
    
               outExt = outExt + ".gz";
+            } else if(line.hasOption("so")) {
+              configData.put(PARAM_COMPRESSION,VALUE_COMPRESSION_SNAPPY);
+              outExt = outExt + ".snappy";
             } else {
-              configData.put(IOConstants.PARAM_COMPRESSION,"none");
+              configData.put(PARAM_COMPRESSION,VALUE_COMPRESSION_NONE);
             }
-            configData.put(IOConstants.PARAM_FILE_EXTENSION,outExt);
-            configData.put(IOConstants.PARAM_ENCODING, "UTF-8");
-            configData.put(IOConstants.PARAM_REPLACE_EXTENSION, "true");
+            configData.put(PARAM_FILE_EXTENSION,outExt);
+            configData.put(PARAM_ENCODING, "UTF-8");
+            configData.put(PARAM_REPLACE_EXTENSION, "true");
             Class<? extends OutputHandler> ouputHandlerClass =
             Class.forName(outputHandlerClassName, true, Gate.getClassLoader())
                  .asSubclass(OutputHandler.class);
@@ -756,7 +784,7 @@
                 Class.forName(enumeratorClassName, true, Gate.getClassLoader())
                         .asSubclass(DocumentEnumerator.class);
           configData = new HashMap<String, String>();
-          configData.put(IOConstants.PARAM_DOCUMENT_ROOT, 
line.getOptionValue('i'));          
+          configData.put(PARAM_DOCUMENT_ROOT, line.getOptionValue('i'));       
   
           List<DocumentID> docIds = new LinkedList<DocumentID>();
           DocumentEnumerator enumerator = enumeratorClass.newInstance();
           enumerator.config(configData);

Modified: gcp/trunk/src/gate/cloud/io/IOConstants.java
===================================================================
--- gcp/trunk/src/gate/cloud/io/IOConstants.java        2016-10-14 12:03:14 UTC 
(rev 19678)
+++ gcp/trunk/src/gate/cloud/io/IOConstants.java        2016-10-14 16:40:39 UTC 
(rev 19679)
@@ -95,6 +95,12 @@
   public static final String VALUE_COMPRESSION_GZIP = "gzip";
   
   /**
+   * The files are Snappy-compressed (and have the extension 
&quot;.snappy&quot;
+   * appended to the normal extension specific to their mime type).
+   */
+  public static final String VALUE_COMPRESSION_SNAPPY = "snappy";
+
+  /**
    * The location of an ARC file.
    */
   @Deprecated

Modified: gcp/trunk/src/gate/cloud/io/file/AbstractFileOutputHandler.java
===================================================================
--- gcp/trunk/src/gate/cloud/io/file/AbstractFileOutputHandler.java     
2016-10-14 12:03:14 UTC (rev 19678)
+++ gcp/trunk/src/gate/cloud/io/file/AbstractFileOutputHandler.java     
2016-10-14 16:40:39 UTC (rev 19679)
@@ -26,6 +26,7 @@
 import java.util.zip.GZIPOutputStream;
 
 import static gate.cloud.io.IOConstants.*;
+import org.xerial.snappy.SnappyOutputStream;
 
 public abstract class AbstractFileOutputHandler extends AbstractOutputHandler {
   /**
@@ -127,6 +128,8 @@
     if(compression != null) {
       if(compression.equals(VALUE_COMPRESSION_GZIP)) {
         os = new GZIPOutputStream(os);
+      } else if(compression.equals(VALUE_COMPRESSION_SNAPPY)) {
+        os = new SnappyOutputStream(os);
       }
     }
     return new BufferedOutputStream(os);

Modified: gcp/trunk/src/gate/cloud/io/file/FileInputHandler.java
===================================================================
--- gcp/trunk/src/gate/cloud/io/file/FileInputHandler.java      2016-10-14 
12:03:14 UTC (rev 19678)
+++ gcp/trunk/src/gate/cloud/io/file/FileInputHandler.java      2016-10-14 
16:40:39 UTC (rev 19679)
@@ -17,11 +17,11 @@
 import gate.FeatureMap;
 import gate.Gate;
 import gate.cloud.batch.DocumentID;
-import gate.cloud.batch.PooledDocumentProcessor;
 import gate.cloud.io.DocumentData;
 import gate.cloud.io.IOConstants;
 import gate.cloud.io.InputHandler;
 import gate.cloud.util.GZIPURLStreamHandler;
+import gate.cloud.util.SnappyURLStreamHandler;
 import gate.util.GateException;
 
 import java.io.File;
@@ -101,6 +101,8 @@
     URL docUrl = docFile.toURI().toURL();
     if(compression.equals(VALUE_COMPRESSION_GZIP)){
       docUrl = new URL(docUrl, "", new GZIPURLStreamHandler(docUrl));
+    } else if(compression.equals(VALUE_COMPRESSION_SNAPPY)) {
+      docUrl = new URL(docUrl, "", new SnappyURLStreamHandler(docUrl));      
     }
     
     params.put(Document.DOCUMENT_URL_PARAMETER_NAME, docUrl);

Copied: gcp/trunk/src/gate/cloud/util/SnappyURLStreamHandler.java (from rev 
19673, gcp/trunk/src/gate/cloud/util/GZIPURLStreamHandler.java)
===================================================================
--- gcp/trunk/src/gate/cloud/util/SnappyURLStreamHandler.java                   
        (rev 0)
+++ gcp/trunk/src/gate/cloud/util/SnappyURLStreamHandler.java   2016-10-14 
16:40:39 UTC (rev 19679)
@@ -0,0 +1,86 @@
+/*
+ *  SnappyURLStreamHandler.java
+ *  Copyright (c) 2016, The University of Sheffield.
+ *
+ *  This file is part of GCP (see http://gate.ac.uk/), and is free
+ *  software, licenced under the GNU Affero General Public License,
+ *  Version 3, November 2007.
+ *
+ */
+package gate.cloud.util;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URL;
+import java.net.URLConnection;
+import java.net.URLStreamHandler;
+import org.xerial.snappy.SnappyInputStream;
+
+/**
+ * A URL stream handler that can be used to read data compressed in Snappy 
format.
+ * This implementation is not feature complete - it only has the functionality 
+ * required to open GATE documents from snappy-compressed files! 
+ */
+public class SnappyURLStreamHandler extends URLStreamHandler {
+
+  /**
+   * A URL connection that has the minimal implementation for uncompressing 
+   * snappy content. 
+   */
+  public class SnappyURLConnection extends URLConnection{
+    public SnappyURLConnection() throws IOException {
+      super(originalURL);
+    }
+    
+    /**
+     * A URLConnection from the original URL.
+     */
+    protected URLConnection originalConnection;
+    
+    
+    /* (non-Javadoc)
+     * @see java.net.URLConnection#connect()
+     */
+    @Override
+    public void connect() throws IOException {
+      if(!connected){
+        this.originalConnection = originalURL.openConnection();
+        connected = true;
+      }
+    }
+
+
+    /* (non-Javadoc)
+     * @see java.net.URLConnection#getInputStream()
+     */
+    @Override
+    public InputStream getInputStream() throws IOException {
+      if(!connected) connect();
+      return new SnappyInputStream(originalConnection.getInputStream());
+    }
+    
+  }
+  
+  
+  /* (non-Javadoc)
+   * @see java.net.URLStreamHandler#openConnection(java.net.URL)
+   */
+  @Override
+  protected URLConnection openConnection(URL u) throws IOException {
+    return new SnappyURLConnection();
+  }
+
+  
+  /**
+   * The URL we are wrapping.
+   */
+  protected URL originalURL;
+  
+  
+  public SnappyURLStreamHandler(URL wrappedUrl) {
+    super();
+    this.originalURL = wrappedUrl;
+  }
+  
+  
+}

This was sent by the SourceForge.net collaborative development platform, the 
world's largest Open Source development site.


------------------------------------------------------------------------------
Check out the vibrant tech community on one of the world's most 
engaging tech sites, SlashDot.org! http://sdm.link/slashdot
_______________________________________________
GATE-cvs mailing list
GATE-cvs@lists.sourceforge.net
https://lists.sourceforge.net/lists/listinfo/gate-cvs

Reply via email to