Revision: 19679 http://sourceforge.net/p/gate/code/19679 Author: johann_p Date: 2016-10-14 16:40:39 +0000 (Fri, 14 Oct 2016) Log Message: ----------- Add support for Snappy compression/decompression and make what we can do from the command line / gcp-direct.sh a bit more flexible.
Modified Paths: -------------- gcp/trunk/src/gate/cloud/batch/BatchRunner.java gcp/trunk/src/gate/cloud/io/IOConstants.java gcp/trunk/src/gate/cloud/io/file/AbstractFileOutputHandler.java gcp/trunk/src/gate/cloud/io/file/FileInputHandler.java Added Paths: ----------- gcp/trunk/lib/snappy-java-1.1.2.6.jar gcp/trunk/src/gate/cloud/util/SnappyURLStreamHandler.java Added: gcp/trunk/lib/snappy-java-1.1.2.6.jar =================================================================== (Binary files differ) Index: gcp/trunk/lib/snappy-java-1.1.2.6.jar =================================================================== --- gcp/trunk/lib/snappy-java-1.1.2.6.jar 2016-10-14 12:03:14 UTC (rev 19678) +++ gcp/trunk/lib/snappy-java-1.1.2.6.jar 2016-10-14 16:40:39 UTC (rev 19679) Property changes on: gcp/trunk/lib/snappy-java-1.1.2.6.jar ___________________________________________________________________ Added: svn:mime-type ## -0,0 +1 ## +application/octet-stream \ No newline at end of property Modified: gcp/trunk/src/gate/cloud/batch/BatchRunner.java =================================================================== --- gcp/trunk/src/gate/cloud/batch/BatchRunner.java 2016-10-14 12:03:14 UTC (rev 19678) +++ gcp/trunk/src/gate/cloud/batch/BatchRunner.java 2016-10-14 16:40:39 UTC (rev 19679) @@ -56,6 +56,17 @@ import org.apache.log4j.Logger; import com.sun.jna.Platform; +import static gate.cloud.io.IOConstants.PARAM_COMPRESSION; +import static gate.cloud.io.IOConstants.PARAM_DOCUMENT_ROOT; +import static gate.cloud.io.IOConstants.PARAM_ENCODING; +import static gate.cloud.io.IOConstants.PARAM_FILE_EXTENSION; +import static gate.cloud.io.IOConstants.PARAM_REPLACE_EXTENSION; +import static gate.cloud.io.IOConstants.VALUE_COMPRESSION_GZIP; +import static gate.cloud.io.IOConstants.VALUE_COMPRESSION_NONE; +import static gate.cloud.io.IOConstants.VALUE_COMPRESSION_SNAPPY; +import gate.cloud.io.file.JSONOutputHandler; +import static gate.cloud.io.file.JSONOutputHandler.PARAM_ANNOTATION_TYPE_PROPERTY; +import static gate.cloud.io.file.JSONOutputHandler.PARAM_GROUP_ENTITIES_BY; /** * This class is a Batch Runner, i.e. it manages the execution of a batch job, @@ -510,14 +521,16 @@ // session files here? options.addOption("b","batchFile",true,"Batch file (required, replaces -i, -o, -x, -r, -I)"); options.addOption("i","inputDirectory",true,"Input directory (required, unless -b given)"); - options.addOption("f","outputFormat",true,"Output format, optional, one of 'xml' or 'finf', default is 'finf'"); + options.addOption("f","outputFormat",true,"Output format, optional, one of 'xml', 'finf', 'ser', 'json', default is 'finf'"); options.addOption("o","outputDirectory",true,"Output directory (not output if missing)"); options.addOption("x","executePipeline",true,"Pipeline/application file to execute (required, unless -b given)"); options.addOption("r","reportFile",true,"Report file (optional, default: report.xml"); options.addOption("t","numberThreads",true,"Number of threads to use (required)"); options.addOption("I","batchId",true,"Batch ID (optional, default: GCP"); - options.addOption("ci","compressedInput",false,"Input files are gzip-compressed"); - options.addOption("co","compressedOutput",false,"Output files are gzip-compressed"); + options.addOption("ci","compressedInput",false,"Input files are gzip-compressed (.gz)"); + options.addOption("co","compressedOutput",false,"Output files are gzip-compressed (.gz)"); + options.addOption("so","snappyOutput",false,"Output files are snappy-compressed (.snappy)"); + options.addOption("si","snappyInput",false,"Input files are snappy-compressed (.snappy)"); options.addOption("h","help",false,"Print this help information"); BasicParser parser = new BasicParser(); @@ -563,8 +576,9 @@ } if(line.hasOption('f')) { outFormat = line.getOptionValue('f'); - if(!outFormat.equals("xml") && !outFormat.equals("finf")) { - log.error("Output format (option 'f') must be either 'xml' or 'finf'"); + if(!outFormat.equals("xml") && !outFormat.equals("finf") && + !outFormat.equals("ser") && !outFormat.equals("json")) { + log.error("Output format (option 'f') must be either 'json', 'ser', xml' or 'finf'"); System.exit(1); } } // if we have option 'f', otherwise use the preset default @@ -698,14 +712,16 @@ // set the input Handler String inputHandlerClassName = "gate.cloud.io.file.FileInputHandler"; Map<String,String> configData = new HashMap<String, String>(); - configData.put(IOConstants.PARAM_DOCUMENT_ROOT, line.getOptionValue('i')); + configData.put(PARAM_DOCUMENT_ROOT, line.getOptionValue('i')); if(line.hasOption("ci")) { - configData.put(IOConstants.PARAM_COMPRESSION,"gzip"); - } else { - configData.put(IOConstants.PARAM_COMPRESSION,"none"); + configData.put(PARAM_COMPRESSION,VALUE_COMPRESSION_GZIP); + } else if(line.hasOption("si")) { + configData.put(PARAM_COMPRESSION,VALUE_COMPRESSION_SNAPPY); + } else { + configData.put(PARAM_COMPRESSION,VALUE_COMPRESSION_NONE); } - configData.put(IOConstants.PARAM_ENCODING, "UTF-8"); - configData.put(IOConstants.PARAM_FILE_EXTENSION,""); + configData.put(PARAM_ENCODING, "UTF-8"); + configData.put(PARAM_FILE_EXTENSION,""); Class<? extends InputHandler> inputHandlerClass = Class.forName(inputHandlerClassName, true, Gate.getClassLoader()) .asSubclass(InputHandler.class); @@ -719,26 +735,38 @@ List<OutputHandler> outHandlers = new ArrayList<OutputHandler>(); if(line.hasOption('o')) { String outputHandlerClassName = null; + configData = new HashMap<String, String>(); + String outExt = ".finf"; if(outFormat.equals("finf")) { outputHandlerClassName = "gate.cloud.io.file.FastInfosetOutputHandler"; } else if(outFormat.equals("xml")) { + outExt = ".xml"; outputHandlerClassName = "gate.cloud.io.file.GATEStandOffFileOutputHandler"; - } - configData = new HashMap<String, String>(); + } else if(outFormat.equals("ser")) { + outExt = ".ser"; + outputHandlerClassName = "gate.cloud.io.file.SerializedObjectOutputHandler"; + } else if(outFormat.equals("json")) { + outExt = ".json"; + outputHandlerClassName = "gate.cloud.io.file.JSONOutputHandler"; + configData.put(PARAM_GROUP_ENTITIES_BY, "set"); + configData.put(PARAM_ANNOTATION_TYPE_PROPERTY, "annType"); + } else { + // cannot get here, option contents is checked earlier... + } configData.put(IOConstants.PARAM_DOCUMENT_ROOT, line.getOptionValue('o')); - String outExt = ".finf"; - if(outFormat.equals("xml")) { - outExt = ".xml"; - } + if(line.hasOption("co")) { - configData.put(IOConstants.PARAM_COMPRESSION,"gzip"); + configData.put(PARAM_COMPRESSION,VALUE_COMPRESSION_GZIP); outExt = outExt + ".gz"; + } else if(line.hasOption("so")) { + configData.put(PARAM_COMPRESSION,VALUE_COMPRESSION_SNAPPY); + outExt = outExt + ".snappy"; } else { - configData.put(IOConstants.PARAM_COMPRESSION,"none"); + configData.put(PARAM_COMPRESSION,VALUE_COMPRESSION_NONE); } - configData.put(IOConstants.PARAM_FILE_EXTENSION,outExt); - configData.put(IOConstants.PARAM_ENCODING, "UTF-8"); - configData.put(IOConstants.PARAM_REPLACE_EXTENSION, "true"); + configData.put(PARAM_FILE_EXTENSION,outExt); + configData.put(PARAM_ENCODING, "UTF-8"); + configData.put(PARAM_REPLACE_EXTENSION, "true"); Class<? extends OutputHandler> ouputHandlerClass = Class.forName(outputHandlerClassName, true, Gate.getClassLoader()) .asSubclass(OutputHandler.class); @@ -756,7 +784,7 @@ Class.forName(enumeratorClassName, true, Gate.getClassLoader()) .asSubclass(DocumentEnumerator.class); configData = new HashMap<String, String>(); - configData.put(IOConstants.PARAM_DOCUMENT_ROOT, line.getOptionValue('i')); + configData.put(PARAM_DOCUMENT_ROOT, line.getOptionValue('i')); List<DocumentID> docIds = new LinkedList<DocumentID>(); DocumentEnumerator enumerator = enumeratorClass.newInstance(); enumerator.config(configData); Modified: gcp/trunk/src/gate/cloud/io/IOConstants.java =================================================================== --- gcp/trunk/src/gate/cloud/io/IOConstants.java 2016-10-14 12:03:14 UTC (rev 19678) +++ gcp/trunk/src/gate/cloud/io/IOConstants.java 2016-10-14 16:40:39 UTC (rev 19679) @@ -95,6 +95,12 @@ public static final String VALUE_COMPRESSION_GZIP = "gzip"; /** + * The files are Snappy-compressed (and have the extension ".snappy" + * appended to the normal extension specific to their mime type). + */ + public static final String VALUE_COMPRESSION_SNAPPY = "snappy"; + + /** * The location of an ARC file. */ @Deprecated Modified: gcp/trunk/src/gate/cloud/io/file/AbstractFileOutputHandler.java =================================================================== --- gcp/trunk/src/gate/cloud/io/file/AbstractFileOutputHandler.java 2016-10-14 12:03:14 UTC (rev 19678) +++ gcp/trunk/src/gate/cloud/io/file/AbstractFileOutputHandler.java 2016-10-14 16:40:39 UTC (rev 19679) @@ -26,6 +26,7 @@ import java.util.zip.GZIPOutputStream; import static gate.cloud.io.IOConstants.*; +import org.xerial.snappy.SnappyOutputStream; public abstract class AbstractFileOutputHandler extends AbstractOutputHandler { /** @@ -127,6 +128,8 @@ if(compression != null) { if(compression.equals(VALUE_COMPRESSION_GZIP)) { os = new GZIPOutputStream(os); + } else if(compression.equals(VALUE_COMPRESSION_SNAPPY)) { + os = new SnappyOutputStream(os); } } return new BufferedOutputStream(os); Modified: gcp/trunk/src/gate/cloud/io/file/FileInputHandler.java =================================================================== --- gcp/trunk/src/gate/cloud/io/file/FileInputHandler.java 2016-10-14 12:03:14 UTC (rev 19678) +++ gcp/trunk/src/gate/cloud/io/file/FileInputHandler.java 2016-10-14 16:40:39 UTC (rev 19679) @@ -17,11 +17,11 @@ import gate.FeatureMap; import gate.Gate; import gate.cloud.batch.DocumentID; -import gate.cloud.batch.PooledDocumentProcessor; import gate.cloud.io.DocumentData; import gate.cloud.io.IOConstants; import gate.cloud.io.InputHandler; import gate.cloud.util.GZIPURLStreamHandler; +import gate.cloud.util.SnappyURLStreamHandler; import gate.util.GateException; import java.io.File; @@ -101,6 +101,8 @@ URL docUrl = docFile.toURI().toURL(); if(compression.equals(VALUE_COMPRESSION_GZIP)){ docUrl = new URL(docUrl, "", new GZIPURLStreamHandler(docUrl)); + } else if(compression.equals(VALUE_COMPRESSION_SNAPPY)) { + docUrl = new URL(docUrl, "", new SnappyURLStreamHandler(docUrl)); } params.put(Document.DOCUMENT_URL_PARAMETER_NAME, docUrl); Copied: gcp/trunk/src/gate/cloud/util/SnappyURLStreamHandler.java (from rev 19673, gcp/trunk/src/gate/cloud/util/GZIPURLStreamHandler.java) =================================================================== --- gcp/trunk/src/gate/cloud/util/SnappyURLStreamHandler.java (rev 0) +++ gcp/trunk/src/gate/cloud/util/SnappyURLStreamHandler.java 2016-10-14 16:40:39 UTC (rev 19679) @@ -0,0 +1,86 @@ +/* + * SnappyURLStreamHandler.java + * Copyright (c) 2016, The University of Sheffield. + * + * This file is part of GCP (see http://gate.ac.uk/), and is free + * software, licenced under the GNU Affero General Public License, + * Version 3, November 2007. + * + */ +package gate.cloud.util; + +import java.io.IOException; +import java.io.InputStream; +import java.net.URL; +import java.net.URLConnection; +import java.net.URLStreamHandler; +import org.xerial.snappy.SnappyInputStream; + +/** + * A URL stream handler that can be used to read data compressed in Snappy format. + * This implementation is not feature complete - it only has the functionality + * required to open GATE documents from snappy-compressed files! + */ +public class SnappyURLStreamHandler extends URLStreamHandler { + + /** + * A URL connection that has the minimal implementation for uncompressing + * snappy content. + */ + public class SnappyURLConnection extends URLConnection{ + public SnappyURLConnection() throws IOException { + super(originalURL); + } + + /** + * A URLConnection from the original URL. + */ + protected URLConnection originalConnection; + + + /* (non-Javadoc) + * @see java.net.URLConnection#connect() + */ + @Override + public void connect() throws IOException { + if(!connected){ + this.originalConnection = originalURL.openConnection(); + connected = true; + } + } + + + /* (non-Javadoc) + * @see java.net.URLConnection#getInputStream() + */ + @Override + public InputStream getInputStream() throws IOException { + if(!connected) connect(); + return new SnappyInputStream(originalConnection.getInputStream()); + } + + } + + + /* (non-Javadoc) + * @see java.net.URLStreamHandler#openConnection(java.net.URL) + */ + @Override + protected URLConnection openConnection(URL u) throws IOException { + return new SnappyURLConnection(); + } + + + /** + * The URL we are wrapping. + */ + protected URL originalURL; + + + public SnappyURLStreamHandler(URL wrappedUrl) { + super(); + this.originalURL = wrappedUrl; + } + + +} This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. ------------------------------------------------------------------------------ Check out the vibrant tech community on one of the world's most engaging tech sites, SlashDot.org! http://sdm.link/slashdot _______________________________________________ GATE-cvs mailing list GATE-cvs@lists.sourceforge.net https://lists.sourceforge.net/lists/listinfo/gate-cvs