Modified: pig/branches/spark/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/MaxTupleBy1stField.java URL: http://svn.apache.org/viewvc/pig/branches/spark/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/MaxTupleBy1stField.java?rev=1784237&r1=1784236&r2=1784237&view=diff ============================================================================== --- pig/branches/spark/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/MaxTupleBy1stField.java (original) +++ pig/branches/spark/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/MaxTupleBy1stField.java Fri Feb 24 08:19:42 2017 @@ -21,6 +21,7 @@ package org.apache.pig.piggybank.evaluat import java.io.IOException; import java.util.Iterator; +import org.apache.pig.Accumulator; import org.apache.pig.Algebraic; import org.apache.pig.EvalFunc; import org.apache.pig.backend.executionengine.ExecException; @@ -43,7 +44,7 @@ import org.apache.pig.impl.logicalLayer. * * @author Vadim Zaliva <l...@codemindes.com> */ -public class MaxTupleBy1stField extends EvalFunc<Tuple> implements Algebraic +public class MaxTupleBy1stField extends EvalFunc<Tuple> implements Algebraic, Accumulator<Tuple> { /** * Indicates once for how many items progress hartbeat should be sent. @@ -131,6 +132,11 @@ public class MaxTupleBy1stField extends protected static Tuple max(Tuple input, PigProgressable reporter) throws ExecException { DataBag values = (DataBag) input.get(0); + return max(values,reporter); + } + + protected static Tuple max(DataBag values, PigProgressable reporter) throws ExecException + { // if we were handed an empty bag, return NULL // this is in compliance with SQL standard @@ -183,4 +189,44 @@ public class MaxTupleBy1stField extends return Final.class.getName(); } + + /** + * Accumulator implementation + */ + + private Tuple intermediate = null; + + /** + * Accumulate implementation - calls max() on the incoming tuple set including intermediate tuple if already exists + * @param b A tuple containing a single field, which is a bag. The bag will contain the set + * @throws IOException + */ + @Override + public void accumulate(Tuple b) throws IOException { + try{ + DataBag values = BagFactory.getInstance().newDefaultBag(); + values.addAll((DataBag) b.get(0)); + + if (intermediate != null) { + values.add(intermediate); + } + intermediate = max(values,reporter); + + }catch (ExecException ee){ + IOException oughtToBeEE = new IOException(); + oughtToBeEE.initCause(ee); + throw oughtToBeEE; + } + } + + @Override + public Tuple getValue() { + return intermediate; + } + + @Override + public void cleanup() { + intermediate = null; + } + }
Modified: pig/branches/spark/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/Over.java URL: http://svn.apache.org/viewvc/pig/branches/spark/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/Over.java?rev=1784237&r1=1784236&r2=1784237&view=diff ============================================================================== --- pig/branches/spark/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/Over.java (original) +++ pig/branches/spark/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/Over.java Fri Feb 24 08:19:42 2017 @@ -23,10 +23,13 @@ import java.util.Iterator; import java.util.List; import org.apache.pig.EvalFunc; -import org.apache.pig.FuncSpec; import org.apache.pig.PigException; import org.apache.pig.backend.executionengine.ExecException; import org.apache.pig.builtin.AVG; +import org.apache.pig.builtin.BigDecimalAvg; +import org.apache.pig.builtin.BigDecimalMax; +import org.apache.pig.builtin.BigDecimalMin; +import org.apache.pig.builtin.BigDecimalSum; import org.apache.pig.builtin.COUNT; import org.apache.pig.builtin.DoubleAvg; import org.apache.pig.builtin.DoubleMax; @@ -54,6 +57,7 @@ import org.apache.pig.data.Tuple; import org.apache.pig.data.TupleFactory; import org.apache.pig.impl.logicalLayer.FrontendException; import org.apache.pig.impl.logicalLayer.schema.Schema; +import org.apache.pig.impl.logicalLayer.schema.Schema.FieldSchema; /** * Given an aggregate function, a bag, and possibly a window definition, @@ -73,23 +77,27 @@ import org.apache.pig.impl.logicalLayer. * <li>sum(int)</li> * <li>sum(long)</li> * <li>sum(bytearray)</li> + * <li>sum(bigdecimal)</li> * <li>avg(double)</li> * <li>avg(float)</li> * <li>avg(long)</li> * <li>avg(int)</li> * <li>avg(bytearray)</li> + * <li>avg(bigdecimal)</li> * <li>min(double)</li> * <li>min(float)</li> * <li>min(long)</li> * <li>min(int)</li> * <li>min(chararray)</li> * <li>min(bytearray)</li> + * <li>min(bigdecimal)</li> * <li>max(double)</li> * <li>max(float)</li> * <li>max(long)</li> * <li>max(int)</li> * <li>max(chararray)</li> * <li>max(bytearray)</li> + * <li>max(bigdecimal)</li> * <li>row_number</li> * <li>first_value</li> * <li>last_value</li> @@ -153,7 +161,8 @@ import org.apache.pig.impl.logicalLayer. * current row and 3 following) over T;</tt> * * <p>Over accepts a constructor argument specifying the name and type, - * colon-separated, of its return schema.</p> + * colon-separated, of its return schema. If the argument option is 'true' use the inner-search, + * take the name and type of bag and return a schema with alias+'_over' and the same type</p> * * <p><pre> * DEFINE IOver org.apache.pig.piggybank.evaluation.Over('state_rk:int'); @@ -188,12 +197,14 @@ public class Over extends EvalFunc<DataB private Object[] udfArgs; private byte returnType; private String returnName; + private boolean searchInnerType; public Over() { initialized = false; udfArgs = null; func = null; returnType = DataType.UNKNOWN; + searchInnerType = false; } public Over(String typespec) { @@ -202,12 +213,16 @@ public class Over extends EvalFunc<DataB String[] fn_tn = typespec.split(":", 2); this.returnName = fn_tn[0]; this.returnType = DataType.findTypeByName(fn_tn[1]); - } else { + } else if(Boolean.parseBoolean(typespec)) { + searchInnerType = Boolean.parseBoolean(typespec); + }else{ this.returnName = "result"; this.returnType = DataType.findTypeByName(typespec); - } + } } + + @Override public DataBag exec(Tuple input) throws IOException { if (input == null || input.size() < 2) { @@ -255,19 +270,42 @@ public class Over extends EvalFunc<DataB @Override public Schema outputSchema(Schema inputSch) { try { - if (returnType == DataType.UNKNOWN) { + FieldSchema field; + + if (searchInnerType) { + field = new FieldSchema(inputSch.getField(0)); + while (searchInnerType) { + if (field.schema != null + && field.schema.getFields().size() > 1) { + searchInnerType = false; + } else { + if (field.type == DataType.TUPLE + || field.type == DataType.BAG) { + field = new FieldSchema(field.schema.getField(0)); + } else { + field.alias = field.alias + "_over"; + searchInnerType = false; + } + } + } + + searchInnerType = true; + } else if (returnType == DataType.UNKNOWN) { return Schema.generateNestedSchema(DataType.BAG, DataType.NULL); } else { - Schema outputTupleSchema = new Schema(new Schema.FieldSchema(returnName, returnType)); - return new Schema(new Schema.FieldSchema( - getSchemaName(this.getClass().getName().toLowerCase(), inputSch), - outputTupleSchema, - DataType.BAG)); + field = new Schema.FieldSchema(returnName, returnType); } + + Schema outputTupleSchema = new Schema(field); + return new Schema(new Schema.FieldSchema(getSchemaName(this + .getClass().getName().toLowerCase(), inputSch), + outputTupleSchema, DataType.BAG)); + } catch (FrontendException fe) { throw new RuntimeException("Unable to create nested schema", fe); } } + private void init(Tuple input) throws IOException { initialized = true; @@ -329,6 +367,8 @@ public class Over extends EvalFunc<DataB func = new LongSum(); } else if ("sum(bytearray)".equalsIgnoreCase(agg)) { func = new SUM(); + } else if ("sum(bigdecimal)".equalsIgnoreCase(agg)) { + func = new BigDecimalSum(); } else if ("avg(double)".equalsIgnoreCase(agg)) { func = new DoubleAvg(); } else if ("avg(float)".equalsIgnoreCase(agg)) { @@ -339,6 +379,8 @@ public class Over extends EvalFunc<DataB func = new IntAvg(); } else if ("avg(bytearray)".equalsIgnoreCase(agg)) { func = new AVG(); + } else if ("avg(bigdecimal)".equalsIgnoreCase(agg)) { + func = new BigDecimalAvg(); } else if ("min(double)".equalsIgnoreCase(agg)) { func = new DoubleMin(); } else if ("min(float)".equalsIgnoreCase(agg)) { @@ -351,6 +393,8 @@ public class Over extends EvalFunc<DataB func = new StringMin(); } else if ("min(bytearray)".equalsIgnoreCase(agg)) { func = new MIN(); + } else if ("min(bigdecimal)".equalsIgnoreCase(agg)) { + func = new BigDecimalMin(); } else if ("max(double)".equalsIgnoreCase(agg)) { func = new DoubleMax(); } else if ("max(float)".equalsIgnoreCase(agg)) { @@ -363,6 +407,8 @@ public class Over extends EvalFunc<DataB func = new StringMax(); } else if ("max(bytearray)".equalsIgnoreCase(agg)) { func = new MAX(); + } else if ("max(bigdecimal)".equalsIgnoreCase(agg)) { + func = new BigDecimalMax(); } else if ("row_number".equalsIgnoreCase(agg)) { func = new RowNumber(); } else if ("first_value".equalsIgnoreCase(agg)) { Modified: pig/branches/spark/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/util/apachelogparser/SearchEngineExtractor.java URL: http://svn.apache.org/viewvc/pig/branches/spark/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/util/apachelogparser/SearchEngineExtractor.java?rev=1784237&r1=1784236&r2=1784237&view=diff ============================================================================== --- pig/branches/spark/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/util/apachelogparser/SearchEngineExtractor.java (original) +++ pig/branches/spark/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/util/apachelogparser/SearchEngineExtractor.java Fri Feb 24 08:19:42 2017 @@ -363,6 +363,15 @@ public class SearchEngineExtractor exten searchEngines.put("search.lycos.com", "Lycos"); searchEngines.put("search.msn.co.uk", "MSN UK"); searchEngines.put("search.msn.com", "MSN"); + searchEngines.put("bing.com", "Bing"); + searchEngines.put("ssl.bing.com", "Bing"); + searchEngines.put("cn.bing.com", "Bing China"); + searchEngines.put("br.bing.com", "Bing Brazil"); + searchEngines.put("it.bing.com", "Bing Italy"); + searchEngines.put("be.bing.com", "Bing Netherlands"); + searchEngines.put("uk.bing.com", "Bing UK"); + searchEngines.put("hk.bing.com", "Bing Hong Kong"); + searchEngines.put("nz.bing.com", "Bing New Zeland"); searchEngines.put("search.myway.com", "MyWay"); searchEngines.put("search.mywebsearch.com", "My Web Search"); searchEngines.put("search.ntlworld.com", "NTLWorld"); Modified: pig/branches/spark/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/xml/XPath.java URL: http://svn.apache.org/viewvc/pig/branches/spark/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/xml/XPath.java?rev=1784237&r1=1784236&r2=1784237&view=diff ============================================================================== --- pig/branches/spark/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/xml/XPath.java (original) +++ pig/branches/spark/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/xml/XPath.java Fri Feb 24 08:19:42 2017 @@ -16,8 +16,11 @@ package org.apache.pig.piggybank.evaluat import java.io.IOException; import java.io.StringReader; import java.util.ArrayList; +import java.util.Iterator; import java.util.List; +import javax.xml.XMLConstants; +import javax.xml.namespace.NamespaceContext; import javax.xml.parsers.DocumentBuilder; import javax.xml.parsers.DocumentBuilderFactory; import javax.xml.xpath.XPathFactory; @@ -49,8 +52,7 @@ public class XPath extends EvalFunc<Stri private static boolean cache = true; private static boolean ignoreNamespace = true; - public static final String EMPTY_STRING = ""; - + /** * input should contain: 1) xml 2) xpath * 3) optional cache xml doc flag @@ -95,8 +97,13 @@ public class XPath extends EvalFunc<Stri return null; } - if(input.size() > 2) + if(input.size() > 2) { cache = (Boolean) input.get(2); + } + + if (input.size() > 3) { + ignoreNamespace = (Boolean) input.get(3); + } if (!cache || xpath == null || !xml.equals(this.xml)) { final InputSource source = new InputSource(new StringReader(xml)); @@ -104,6 +111,7 @@ public class XPath extends EvalFunc<Stri this.xml = xml; // track the xml for subsequent calls to this udf final DocumentBuilderFactory dbf = DocumentBuilderFactory.newInstance(); + dbf.setNamespaceAware(!ignoreNamespace); final DocumentBuilder db = dbf.newDocumentBuilder(); this.document = db.parse(source); @@ -112,14 +120,32 @@ public class XPath extends EvalFunc<Stri this.xpath = xpathFactory.newXPath(); + if (!ignoreNamespace) { + xpath.setNamespaceContext(new NamespaceContext() { + @Override + public String getNamespaceURI(String prefix) { + if (prefix.equals(XMLConstants.DEFAULT_NS_PREFIX)) { + return document.lookupNamespaceURI(null); + } else { + return document.lookupNamespaceURI(prefix); + } + } + + @Override + public String getPrefix(String namespaceURI) { + return document.lookupPrefix(namespaceURI); + } + + @Override + public Iterator getPrefixes(String namespaceURI) { + return null; + } + }); + } } String xpathString = (String) input.get(1); - if (ignoreNamespace) { - xpathString = createNameSpaceIgnoreXpathString(xpathString); - } - final String value = xpath.evaluate(xpathString, document); return value; @@ -165,34 +191,6 @@ public class XPath extends EvalFunc<Stri } return true; } - - - /** - * Returns a new the xPathString by adding additional parameters - * in the existing xPathString for ignoring the namespace during compilation. - * - * @param String xpathString - * @return String modified xpathString - */ - private String createNameSpaceIgnoreXpathString(final String xpathString) { - final String QUERY_PREFIX = "//*"; - final String LOCAL_PREFIX = "[local-name()='"; - final String LOCAL_POSTFIX = "']"; - final String SPLITTER = "/"; - - try { - String xpathStringWithLocalName = EMPTY_STRING; - String[] individualNodes = xpathString.split(SPLITTER); - - for (String node : individualNodes) { - xpathStringWithLocalName = xpathStringWithLocalName.concat(QUERY_PREFIX + LOCAL_PREFIX + node - + LOCAL_POSTFIX); - } - return xpathStringWithLocalName; - } catch (Exception ex) { - return xpathString; - } - } /** * Returns argument schemas of the UDF. Modified: pig/branches/spark/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/CSVExcelStorage.java URL: http://svn.apache.org/viewvc/pig/branches/spark/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/CSVExcelStorage.java?rev=1784237&r1=1784236&r2=1784237&view=diff ============================================================================== --- pig/branches/spark/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/CSVExcelStorage.java (original) +++ pig/branches/spark/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/CSVExcelStorage.java Fri Feb 24 08:19:42 2017 @@ -580,7 +580,7 @@ public class CSVExcelStorage extends Pig } } else if (b == DOUBLE_QUOTE) { // Does a double quote immediately follow? - if ((i < recordLen-1) && (buf[i+1] == DOUBLE_QUOTE)) { + if ((i < recordLen-1) && (buf[i+1] == DOUBLE_QUOTE) && (fieldBuffer.position() != 0)) { fieldBuffer.put(b); nextTupleSkipChar = true; continue; Modified: pig/branches/spark/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/DBStorage.java URL: http://svn.apache.org/viewvc/pig/branches/spark/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/DBStorage.java?rev=1784237&r1=1784236&r2=1784237&view=diff ============================================================================== --- pig/branches/spark/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/DBStorage.java (original) +++ pig/branches/spark/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/DBStorage.java Fri Feb 24 08:19:42 2017 @@ -91,6 +91,7 @@ public class DBStorage extends StoreFunc /** * Write the tuple to Database directly here. */ + @Override public void putNext(Tuple tuple) throws IOException { int sqlPos = 1; try { @@ -373,4 +374,9 @@ public class DBStorage extends StoreFunc p.setProperty(SCHEMA_SIGNATURE, s.toString()); } + @Override + public Boolean supportsParallelWriteToStoreLocation() { + return false; + } + } Modified: pig/branches/spark/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/IndexedStorage.java URL: http://svn.apache.org/viewvc/pig/branches/spark/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/IndexedStorage.java?rev=1784237&r1=1784236&r2=1784237&view=diff ============================================================================== --- pig/branches/spark/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/IndexedStorage.java (original) +++ pig/branches/spark/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/IndexedStorage.java Fri Feb 24 08:19:42 2017 @@ -60,7 +60,6 @@ import org.apache.pig.data.TupleFactory; import org.apache.pig.impl.util.StorageUtil; import org.apache.pig.data.DataType; import org.apache.pig.data.DataByteArray; -import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims; /** * <code>IndexedStorage</code> is a form of <code>PigStorage</code> that supports a Modified: pig/branches/spark/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/MultiStorage.java URL: http://svn.apache.org/viewvc/pig/branches/spark/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/MultiStorage.java?rev=1784237&r1=1784236&r2=1784237&view=diff ============================================================================== --- pig/branches/spark/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/MultiStorage.java (original) +++ pig/branches/spark/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/MultiStorage.java Fri Feb 24 08:19:42 2017 @@ -16,7 +16,9 @@ import java.io.ByteArrayOutputStream; import java.io.DataOutputStream; import java.io.IOException; import java.text.NumberFormat; +import java.util.ArrayList; import java.util.HashMap; +import java.util.List; import java.util.Map; import org.apache.hadoop.conf.Configuration; @@ -42,6 +44,9 @@ import org.apache.pig.backend.executione import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration; import org.apache.pig.data.Tuple; import org.apache.pig.impl.util.StorageUtil; +import org.apache.xml.utils.StringBufferPool; + +import com.google.common.base.Strings; /** * The UDF is useful for splitting the output data into a bunch of directories @@ -73,13 +78,21 @@ import org.apache.pig.impl.util.StorageU * If the output is compressed,then the sub directories and the output files will * be having the extension. Say for example in the above case if bz2 is used one file * will look like ;/my/home/output.bz2/a1.bz2/a1-0000.bz2 + * + * Key field can also be a comma separated list of indices e.g. '0,1' - in this case + * storage will be multi-level: + * /my/home/output/a1/b1/a1-b1-0000 + * /my/home/output/a1/b2/a1-b2-0000 + * There is also an option to leave key values out of storage, see isRemoveKeys. */ public class MultiStorage extends StoreFunc { + private static final String KEYFIELD_DELIMETER = ","; private Path outputPath; // User specified output Path - private int splitFieldIndex = -1; // Index of the key field + private final List<Integer> splitFieldIndices= new ArrayList<Integer>(); // Indices of the key fields private String fieldDel; // delimiter of the output record. private Compression comp; // Compression type of output data. + private boolean isRemoveKeys = false; // Compression types supported by this store enum Compression { @@ -95,9 +108,14 @@ public class MultiStorage extends StoreF this(parentPathStr, splitFieldIndex, compression, "\\t"); } + public MultiStorage(String parentPathStr, String splitFieldIndex, + String compression, String fieldDel) { + this(parentPathStr, splitFieldIndex, compression, fieldDel, "false"); + } + /** * Constructor - * + * * @param parentPathStr * Parent output dir path (this will be specified in store statement, * so MultiStorage don't use this parameter in reality. However, we don't @@ -108,18 +126,26 @@ public class MultiStorage extends StoreF * 'bz2', 'bz', 'gz' or 'none' * @param fieldDel * Output record field delimiter. + * @param isRemoveKeys + * Removes key columns from result during write. */ public MultiStorage(String parentPathStr, String splitFieldIndex, - String compression, String fieldDel) { + String compression, String fieldDel, String isRemoveKeys) { + this.isRemoveKeys = Boolean.parseBoolean(isRemoveKeys); this.outputPath = new Path(parentPathStr); - this.splitFieldIndex = Integer.parseInt(splitFieldIndex); + + String[] splitFieldIndices = splitFieldIndex.split(KEYFIELD_DELIMETER); + for (String splitFieldIndexString : splitFieldIndices){ + this.splitFieldIndices.add(Integer.parseInt(splitFieldIndexString)); + } + this.fieldDel = fieldDel; try { this.comp = (compression == null) ? Compression.none : Compression - .valueOf(compression.toLowerCase()); + .valueOf(compression.toLowerCase()); } catch (IllegalArgumentException e) { System.err.println("Exception when converting compression string: " - + compression + " to enum. No compression will be used"); + + compression + " to enum. No compression will be used"); this.comp = Compression.none; } } @@ -127,22 +153,26 @@ public class MultiStorage extends StoreF //-------------------------------------------------------------------------- // Implementation of StoreFunc - private RecordWriter<String, Tuple> writer; + private RecordWriter<List<String>, Tuple> writer; @Override public void putNext(Tuple tuple) throws IOException { - if (tuple.size() <= splitFieldIndex) { - throw new IOException("split field index:" + this.splitFieldIndex - + " >= tuple size:" + tuple.size()); + for (int splitFieldIndex : this.splitFieldIndices) { + if (tuple.size() <= splitFieldIndex) { + throw new IOException("split field index:" + splitFieldIndex + + " >= tuple size:" + tuple.size()); + } } - Object field = null; - try { - field = tuple.get(splitFieldIndex); - } catch (ExecException exec) { - throw new IOException(exec); + List<String> fields = new ArrayList<String>(); + for (int splitFieldIndex : this.splitFieldIndices){ + try { + fields.add(String.valueOf(tuple.get(splitFieldIndex))); + } catch (ExecException exec) { + throw new IOException(exec); + } } try { - writer.write(String.valueOf(field), tuple); + writer.write(fields, tuple); } catch (InterruptedException e) { throw new IOException(e); } @@ -153,6 +183,9 @@ public class MultiStorage extends StoreF public OutputFormat getOutputFormat() throws IOException { MultiStorageOutputFormat format = new MultiStorageOutputFormat(); format.setKeyValueSeparator(fieldDel); + if (this.isRemoveKeys){ + format.setSkipIndices(this.splitFieldIndices); + } return format; } @@ -174,27 +207,33 @@ public class MultiStorage extends StoreF FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class); } } - + + @Override + public Boolean supportsParallelWriteToStoreLocation() { + return false; + } + //-------------------------------------------------------------------------- // Implementation of OutputFormat public static class MultiStorageOutputFormat extends - TextOutputFormat<String, Tuple> { + TextOutputFormat<List<String>, Tuple> { private String keyValueSeparator = "\\t"; private byte fieldDel = '\t'; - + private List<Integer> skipIndices = null; + @Override - public RecordWriter<String, Tuple> + public RecordWriter<List<String>, Tuple> getRecordWriter(TaskAttemptContext context ) throws IOException, InterruptedException { final TaskAttemptContext ctx = context; - return new RecordWriter<String, Tuple>() { + return new RecordWriter<List<String>, Tuple>() { - private Map<String, MyLineRecordWriter> storeMap = - new HashMap<String, MyLineRecordWriter>(); + private Map<List<String>, MyLineRecordWriter> storeMap = + new HashMap<List<String>, MyLineRecordWriter>(); private static final int BUFFER_SIZE = 1024; @@ -202,7 +241,7 @@ public class MultiStorage extends StoreF new ByteArrayOutputStream(BUFFER_SIZE); @Override - public void write(String key, Tuple val) throws IOException { + public void write(List<String> key, Tuple val) throws IOException { int sz = val.size(); for (int i = 0; i < sz; i++) { Object field; @@ -212,9 +251,13 @@ public class MultiStorage extends StoreF throw ee; } - StorageUtil.putField(mOut, field); + boolean skipCurrentField = skipIndices != null && skipIndices.contains(i); - if (i != sz - 1) { + if (!skipCurrentField) { + StorageUtil.putField(mOut, field); + } + + if (i != sz - 1 && !skipCurrentField) { mOut.write(fieldDel); } } @@ -231,17 +274,17 @@ public class MultiStorage extends StoreF } } - private MyLineRecordWriter getStore(String fieldValue) throws IOException { - MyLineRecordWriter store = storeMap.get(fieldValue); + private MyLineRecordWriter getStore(List<String> fieldValues) throws IOException { + MyLineRecordWriter store = storeMap.get(fieldValues); if (store == null) { - DataOutputStream os = createOutputStream(fieldValue); + DataOutputStream os = createOutputStream(fieldValues); store = new MyLineRecordWriter(os, keyValueSeparator); - storeMap.put(fieldValue, store); + storeMap.put(fieldValues, store); } return store; } - private DataOutputStream createOutputStream(String fieldValue) throws IOException { + private DataOutputStream createOutputStream(List<String> fieldValues) throws IOException { Configuration conf = ctx.getConfiguration(); TaskID taskId = ctx.getTaskAttemptID().getTaskID(); @@ -259,7 +302,21 @@ public class MultiStorage extends StoreF NumberFormat nf = NumberFormat.getInstance(); nf.setMinimumIntegerDigits(4); - Path path = new Path(fieldValue+extension, fieldValue + '-' + StringBuffer pathStringBuffer = new StringBuffer(); + for (String fieldValue : fieldValues){ + String safeFieldValue = fieldValue.replaceAll("\\/","-"); + pathStringBuffer.append(safeFieldValue); + pathStringBuffer.append("/"); + } + pathStringBuffer.deleteCharAt(pathStringBuffer.length()-1); + String pathString = pathStringBuffer.toString(); + String idString = pathString.replaceAll("\\/","-"); + + if (!Strings.isNullOrEmpty(extension)){ + pathString = pathString.replaceAll("\\/",extension+"\\/"); + } + + Path path = new Path(pathString+extension, idString + '-' + nf.format(taskId.getId())+extension); Path workOutputPath = ((FileOutputCommitter)getOutputCommitter(ctx)).getWorkPath(); Path file = new Path(workOutputPath, path); @@ -279,8 +336,12 @@ public class MultiStorage extends StoreF keyValueSeparator = sep; fieldDel = StorageUtil.parseFieldDel(keyValueSeparator); } - - //------------------------------------------------------------------------ + + public void setSkipIndices(List<Integer> skipIndices) { + this.skipIndices = skipIndices; + } + + //------------------------------------------------------------------------ // protected static class MyLineRecordWriter Modified: pig/branches/spark/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/evaluation/TestOver.java URL: http://svn.apache.org/viewvc/pig/branches/spark/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/evaluation/TestOver.java?rev=1784237&r1=1784236&r2=1784237&view=diff ============================================================================== --- pig/branches/spark/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/evaluation/TestOver.java (original) +++ pig/branches/spark/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/evaluation/TestOver.java Fri Feb 24 08:19:42 2017 @@ -18,12 +18,11 @@ package org.apache.pig.piggybank.evaluation; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; +import java.math.BigDecimal; import java.util.Iterator; -import java.util.List; import java.util.Random; import org.apache.pig.backend.executionengine.ExecException; @@ -34,8 +33,6 @@ import org.apache.pig.data.DataType; import org.apache.pig.data.Tuple; import org.apache.pig.data.TupleFactory; import org.apache.pig.impl.logicalLayer.schema.Schema; - -import org.junit.Before; import org.junit.Test; public class TestOver { @@ -66,11 +63,25 @@ public class TestOver { out = func.outputSchema(in); assertEquals("{org.apache.pig.piggybank.evaluation.over_3: {result: double}}", out.toString()); + // bigdecimal + func = new Over("BIGDECIMAL"); + in = Schema.generateNestedSchema(DataType.BAG, DataType.INTEGER); + out = func.outputSchema(in); + assertEquals("{org.apache.pig.piggybank.evaluation.over_4: {result: bigdecimal}}", out.toString()); + // named func = new Over("bob:chararray"); in = Schema.generateNestedSchema(DataType.BAG, DataType.INTEGER); out = func.outputSchema(in); - assertEquals("{org.apache.pig.piggybank.evaluation.over_4: {bob: chararray}}", out.toString()); + assertEquals("{org.apache.pig.piggybank.evaluation.over_5: {bob: chararray}}", out.toString()); + + + // Search inner alias and type + func = new Over("true"); + in = Schema.generateNestedSchema(DataType.BAG, DataType.BIGDECIMAL); + in.getField(0).schema.getField(0).alias="test"; + out = func.outputSchema(in); + assertEquals("{org.apache.pig.piggybank.evaluation.over_6: {test_over: bigdecimal}}", out.toString()); } @Test @@ -397,6 +408,28 @@ public class TestOver { assertEquals(new Long(10), to.get(0)); } } + + @Test + public void testSumBigDecimal() throws Exception { + Over func = new Over(); + DataBag inbag = BagFactory.getInstance().newDefaultBag(); + for (int i = 0; i < 10; i++) { + Tuple t = TupleFactory.getInstance().newTuple(1); + t.set(0, new BigDecimal(1)); + inbag.add(t); + } + Tuple t = TupleFactory.getInstance().newTuple(4); + t.set(0, inbag); + t.set(1, "sum(bigdecimal)"); + t.set(2, -1); + t.set(3, -1); + DataBag outbag = func.exec(t); + assertEquals(10, outbag.size()); + for (Tuple to : outbag) { + assertEquals(1, to.size()); + assertEquals(new BigDecimal(10), to.get(0)); + } + } @Test public void testAvgDouble() throws Exception { @@ -509,6 +542,29 @@ public class TestOver { } @Test + public void testAvgBigDecimal() throws Exception { + Over func = new Over(); + DataBag inbag = BagFactory.getInstance().newDefaultBag(); + for (int i = 0; i < 10; i++) { + Tuple t = TupleFactory.getInstance().newTuple(1); + t.set(0, new BigDecimal(i)); + inbag.add(t); + } + Tuple t = TupleFactory.getInstance().newTuple(4); + t.set(0, inbag); + t.set(1, "avg(bigdecimal)"); + t.set(2, -1); + t.set(3, -1); + DataBag outbag = func.exec(t); + assertEquals(10, outbag.size()); + for (Tuple to : outbag) { + assertEquals(1, to.size()); + assertEquals(new BigDecimal(4.5), to.get(0)); + } + } + + + @Test public void testMinDouble() throws Exception { Over func = new Over(); DataBag inbag = BagFactory.getInstance().newDefaultBag(); @@ -627,6 +683,26 @@ public class TestOver { assertEquals("0", to.get(0)); } } + + @Test + public void testMinBigDecimal() throws Exception { + Over func = new Over(); + DataBag inbag = BagFactory.getInstance().newDefaultBag(); + for (int i = 0; i < 10; i++) { + Tuple t = TupleFactory.getInstance().newTuple(1); + t.set(0, new BigDecimal(i)); + inbag.add(t); + } + Tuple t = TupleFactory.getInstance().newTuple(2); + t.set(0, inbag); + t.set(1, "min(bigdecimal)"); + DataBag outbag = func.exec(t); + assertEquals(10, outbag.size()); + for (Tuple to : outbag) { + assertEquals(1, to.size()); + assertEquals(new BigDecimal(0), to.get(0)); + } + } @Test public void testMaxDouble() throws Exception { @@ -754,6 +830,28 @@ public class TestOver { assertEquals("9", to.get(0)); } } + + @Test + public void testMaxBigDecimal() throws Exception { + Over func = new Over(); + DataBag inbag = BagFactory.getInstance().newDefaultBag(); + for (int i = 0; i < 10; i++) { + Tuple t = TupleFactory.getInstance().newTuple(1); + t.set(0, new BigDecimal(i)); + inbag.add(t); + } + Tuple t = TupleFactory.getInstance().newTuple(2); + t.set(0, inbag); + t.set(1, "max(bigdecimal)"); + DataBag outbag = func.exec(t); + assertEquals(10, outbag.size()); + int count = 0; + for (Tuple to : outbag) { + assertEquals(1, to.size()); + assertEquals(new BigDecimal(count++), to.get(0)); + } + } + @Test public void testRowNumber() throws Exception { Added: pig/branches/spark/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/evaluation/TestMaxTupleBy1stField.java URL: http://svn.apache.org/viewvc/pig/branches/spark/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/evaluation/TestMaxTupleBy1stField.java?rev=1784237&view=auto ============================================================================== --- pig/branches/spark/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/evaluation/TestMaxTupleBy1stField.java (added) +++ pig/branches/spark/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/evaluation/TestMaxTupleBy1stField.java Fri Feb 24 08:19:42 2017 @@ -0,0 +1,95 @@ +package org.apache.pig.piggybank.test.evaluation; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import com.google.common.collect.Lists; +import org.apache.pig.data.BagFactory; +import org.apache.pig.data.DataBag; +import org.apache.pig.data.Tuple; +import org.apache.pig.data.TupleFactory; +import org.apache.pig.piggybank.evaluation.MaxTupleBy1stField; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.List; + +public class TestMaxTupleBy1stField { + + private static List<Tuple> inputTuples = new ArrayList<>(); + private static final TupleFactory TUPLE_FACTORY = TupleFactory.getInstance(); + private static final BagFactory BAG_FACTORY = BagFactory.getInstance(); + + @BeforeClass + public static void setup() throws Exception { + inputTuples.add(TUPLE_FACTORY.newTuple(Lists.newArrayList(0L, "Fruit", "orange", 21F))); + inputTuples.add(TUPLE_FACTORY.newTuple(Lists.newArrayList(1L, "Fruit", "apple", 9.9F))); + inputTuples.add(TUPLE_FACTORY.newTuple(Lists.newArrayList(2L, "Vegetable", "paprika", 30F))); + inputTuples.add(TUPLE_FACTORY.newTuple(Lists.newArrayList(3L, "Fruit", "blueberry", 40F))); + inputTuples.add(TUPLE_FACTORY.newTuple(Lists.newArrayList(4L, "Vegetable", "carrot", 50F))); + inputTuples.add(TUPLE_FACTORY.newTuple(Lists.newArrayList(5L, "Fruit", "blueberry", 41F))); + inputTuples.add(TUPLE_FACTORY.newTuple(Lists.newArrayList(6L, "Vegetable", "paprika", 31F))); + inputTuples.add(TUPLE_FACTORY.newTuple(Lists.newArrayList(7L, "Fruit", "orange", 20.5F))); + inputTuples.add(TUPLE_FACTORY.newTuple(Lists.newArrayList(8L, "Fruit", "apple", 10.1F))); + inputTuples.add(TUPLE_FACTORY.newTuple(Lists.newArrayList(9L, "Fruit", "apple", 10.2F))); + } + + + @Test + public void testExecFunc() throws Exception { + MaxTupleBy1stField udf = new MaxTupleBy1stField(); + Tuple inputTuple = createTupleFromInputList(0,inputTuples.size()); + + Tuple result = udf.exec(inputTuple); + Assert.assertEquals("apple", result.get(2)); + Assert.assertEquals(10.2F, (Float) result.get(3), 1E-8); + } + + @Test + public void testAccumulator() throws Exception { + MaxTupleBy1stField udf = new MaxTupleBy1stField(); + + Tuple inputTuple = createTupleFromInputList(0, 3); + udf.accumulate(inputTuple); + Tuple result = udf.getValue(); + Assert.assertEquals("paprika", result.get(2)); + Assert.assertEquals(30F, (Float) result.get(3), 1E-6); + + inputTuple = createTupleFromInputList(3, 6); + udf.accumulate(inputTuple); + result = udf.getValue(); + Assert.assertEquals("apple", result.get(2)); + Assert.assertEquals(10.1F, (Float) result.get(3), 1E-6); + + udf.cleanup(); + Assert.assertEquals(null,udf.getValue()); + } + + private static Tuple createTupleFromInputList(int offset, int length) { + DataBag inputBag = BAG_FACTORY.newDefaultBag(); + for (int i = offset; i < offset+length; ++i) { + inputBag.add(inputTuples.get(i)); + } + Tuple inputTuple = TUPLE_FACTORY.newTuple(); + inputTuple.append(inputBag); + return inputTuple; + } + +} Modified: pig/branches/spark/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/evaluation/xml/XPathTest.java URL: http://svn.apache.org/viewvc/pig/branches/spark/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/evaluation/xml/XPathTest.java?rev=1784237&r1=1784236&r2=1784237&view=diff ============================================================================== --- pig/branches/spark/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/evaluation/xml/XPathTest.java (original) +++ pig/branches/spark/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/evaluation/xml/XPathTest.java Fri Feb 24 08:19:42 2017 @@ -151,6 +151,27 @@ public class XPathTest { } @Test + public void testExecTupleWithDontIgnoreNamespace() throws Exception { + + final XPath xpath = new XPath(); + + final Tuple tuple = mock(Tuple.class); + + when(tuple.get(0)).thenReturn("<?xml version='1.0'?>\n" + + "<foo:document xmlns:foo=\"http://apache.org/foo\" xmlns:bar=\"http://apache.org/bar\">" + + "<bar:element>MyBar</bar:element>" + + "</foo:document>"); + + when(tuple.size()).thenReturn(4); + when(tuple.get(2)).thenReturn(true); + when(tuple.get(3)).thenReturn(false); + + when(tuple.get(1)).thenReturn("/foo:document/bar:element"); + assertEquals("MyBar", xpath.exec(tuple)); + + } + + @Test public void testExecTupleWithElementNodeWithComplexNameSpace() throws Exception { final XPath xpath = new XPath(); @@ -210,7 +231,31 @@ public class XPathTest { assertEquals("4 stars3.5 stars4 stars4.2 stars3.5 stars", xpath.exec(tuple)); } - + + @Test + public void testFunctionInXPath() throws Exception { + + final XPath xpath = new XPath(); + + final Tuple tuple = mock(Tuple.class); + + when(tuple.get(0)).thenReturn("<Aa name=\"test1\">" + + "<Bb Cc=\"1\"/>" + + "<Bb Cc=\"1\"/>" + + "<Bb Cc=\"1\"/>" + + "<Bb Cc=\"1\"/>" + + "<Dd>test2</Dd>" + + "</Aa>"); + + when(tuple.size()).thenReturn(4); + when(tuple.get(1)).thenReturn("sum(Aa/Bb/@Cc)"); + when(tuple.get(2)).thenReturn(true); + when(tuple.get(3)).thenReturn(true); + + assertEquals("4", xpath.exec(tuple)); + + } + @Ignore //--optional test @Test public void testCacheBenefit() throws Exception{ Modified: pig/branches/spark/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestCSVExcelStorage.java URL: http://svn.apache.org/viewvc/pig/branches/spark/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestCSVExcelStorage.java?rev=1784237&r1=1784236&r2=1784237&view=diff ============================================================================== --- pig/branches/spark/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestCSVExcelStorage.java (original) +++ pig/branches/spark/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestCSVExcelStorage.java Fri Feb 24 08:19:42 2017 @@ -218,7 +218,7 @@ public class TestCSVExcelStorage { Util.registerMultiLineQuery(pig, script); Iterator<Tuple> it = pig.openIterator("a"); Assert.assertEquals(Util.createTuple(new String[] {"foo,\"bar\",baz"}), it.next()); - Assert.assertEquals(Util.createTuple(new String[] {"\"\"\"\""}), it.next()); + Assert.assertEquals(Util.createTuple(new String[] {"\"\"\""}), it.next()); } // Handle newlines in fields Modified: pig/branches/spark/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestLogFormatLoader.java URL: http://svn.apache.org/viewvc/pig/branches/spark/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestLogFormatLoader.java?rev=1784237&r1=1784236&r2=1784237&view=diff ============================================================================== --- pig/branches/spark/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestLogFormatLoader.java (original) +++ pig/branches/spark/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestLogFormatLoader.java Fri Feb 24 08:19:42 2017 @@ -109,7 +109,7 @@ public class TestLogFormatLoader { Tuple actual = out.get(0); Tuple expected = tuple( "2001:980:91c0:1:8d31:a232:25e5:85d", - "[05/Sep/2010:11:27:50 +0200]", + "05/Sep/2010:11:27:50 +0200", "koken-pannen_303_hs-koken-pannen-afj-120601_B3_product_1_9200000002876066", map( "promo" , "koken-pannen_303_hs-koken-pannen-afj-120601_B3_product_1_9200000002876066", Modified: pig/branches/spark/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestMultiStorage.java URL: http://svn.apache.org/viewvc/pig/branches/spark/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestMultiStorage.java?rev=1784237&r1=1784236&r2=1784237&view=diff ============================================================================== --- pig/branches/spark/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestMultiStorage.java (original) +++ pig/branches/spark/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestMultiStorage.java Fri Feb 24 08:19:42 2017 @@ -18,34 +18,41 @@ import java.io.FileWriter; import java.io.IOException; import java.io.InputStreamReader; import java.io.PrintWriter; +import java.util.List; +import java.util.Map; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; -import org.apache.pig.ExecType; import org.apache.pig.PigServer; -import org.apache.pig.backend.executionengine.ExecException; -import org.apache.pig.test.MiniCluster; +import org.apache.pig.backend.executionengine.ExecJob; +import org.apache.pig.test.MiniGenericCluster; import org.apache.pig.test.Util; +import org.apache.pig.tools.pigstats.JobStats; +import org.apache.pig.tools.pigstats.OutputStats; +import org.apache.pig.tools.pigstats.PigStats; +import org.apache.pig.tools.pigstats.mapreduce.SimplePigStats; import org.junit.After; +import org.junit.AfterClass; import org.junit.Before; import org.junit.Test; -import junit.framework.Assert; -import junit.framework.TestCase; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; -public class TestMultiStorage extends TestCase { +public class TestMultiStorage { private static final String INPUT_FILE = "MultiStorageInput.txt"; private PigServer pigServer; private PigServer pigServerLocal; - private MiniCluster cluster = MiniCluster.buildCluster(); + private static MiniGenericCluster cluster = MiniGenericCluster.buildCluster(); - public TestMultiStorage() throws ExecException, IOException { - pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties()); - pigServerLocal = new PigServer(ExecType.LOCAL); + public TestMultiStorage() throws Exception { + pigServer = new PigServer(cluster.getExecType(), cluster.getProperties()); + pigServerLocal = new PigServer(Util.getLocalTestMode()); } public static final PathFilter hiddenPathFilter = new PathFilter() { @@ -74,59 +81,83 @@ public class TestMultiStorage extends Te Util.copyFromLocalToCluster(cluster, INPUT_FILE, INPUT_FILE); } - @Override @Before public void setUp() throws Exception { createFile(); FileSystem fs = FileSystem.getLocal(new Configuration()); Path localOut = new Path("local-out"); - Path dummy = new Path("dummy"); if (fs.exists(localOut)) { fs.delete(localOut, true); } - if (fs.exists(dummy)) { - fs.delete(dummy, true); - } } - @Override @After public void tearDown() throws Exception { new File(INPUT_FILE).delete(); Util.deleteFile(cluster, INPUT_FILE); + } + + @AfterClass + public static void shutdown() { cluster.shutDown(); } enum Mode { local, cluster - }; + } @Test public void testMultiStorage() throws IOException { final String LOAD = "A = LOAD '" + INPUT_FILE + "' as (id, name, n);"; final String MULTI_STORE_CLUSTER = "STORE A INTO 'mr-out' USING " + "org.apache.pig.piggybank.storage.MultiStorage('mr-out', '1');"; - final String MULTI_STORE_LOCAL = "STORE A INTO 'dummy' USING " + final String MULTI_STORE_LOCAL = "STORE A INTO 'local-out' USING " + "org.apache.pig.piggybank.storage.MultiStorage('local-out', '1');"; System.out.print("Testing in LOCAL mode: ..."); - //testMultiStorage(Mode.local, "local-out", LOAD, MULTI_STORE_LOCAL); + testMultiStorage(Mode.local, "local-out", LOAD, MULTI_STORE_LOCAL); System.out.println("Succeeded!"); - + System.out.print("Testing in CLUSTER mode: ..."); testMultiStorage( Mode.cluster, "mr-out", LOAD, MULTI_STORE_CLUSTER); System.out.println("Succeeded!"); - - } - /** - * The actual method that run the test in local or cluster mode. - * - * @param pigServer - * @param mode - * @param queries - * @throws IOException + @Test + public void testOutputStats() throws IOException { + FileSystem fs = cluster.getFileSystem(); + + pigServer.setBatchOn(); + pigServer.registerQuery("A = LOAD '" + INPUT_FILE + "' as (id, name, n);"); + pigServer.registerQuery("B = FILTER A BY name == 'apple';"); + pigServer.registerQuery("STORE A INTO 'out1' USING org.apache.pig.piggybank.storage.MultiStorage('out1', '1');"); //153 bytes + pigServer.registerQuery("STORE B INTO 'out2' USING org.apache.pig.piggybank.storage.MultiStorage('out2', '1');"); // 45 bytes + + ExecJob job = pigServer.executeBatch().get(0); + + PigStats stats = job.getStatistics(); + PigStats.JobGraph jobGraph = stats.getJobGraph(); + JobStats jobStats = (JobStats) jobGraph.getSinks().get(0); + Map<String, Long> multiStoreCounters = jobStats.getMultiStoreCounters(); + List<OutputStats> outputStats = SimplePigStats.get().getOutputStats(); + OutputStats outputStats1 = "out1".equals(outputStats.get(0).getName()) ? outputStats.get(0) : outputStats.get(1); + OutputStats outputStats2 = "out2".equals(outputStats.get(0).getName()) ? outputStats.get(0) : outputStats.get(1); + + assertEquals(153 + 45, stats.getBytesWritten()); + assertEquals(2, outputStats.size()); // 2 split conditions + assertEquals(153, outputStats1.getBytes()); + assertEquals(45, outputStats2.getBytes()); + assertEquals(9, outputStats1.getRecords()); + assertEquals(3, outputStats2.getRecords()); + assertEquals(3L, multiStoreCounters.get("Output records in _1_out2").longValue()); + assertEquals(9L, multiStoreCounters.get("Output records in _0_out1").longValue()); + + fs.delete(new Path("out1"), true); + fs.delete(new Path("out2"), true); + } + + /** + * The actual method that run the test in local or cluster mode. */ private void testMultiStorage( Mode mode, String outPath, String... queries) throws IOException { @@ -142,42 +173,38 @@ public class TestMultiStorage extends Te /** * Test if records are split into directories corresponding to split field * values - * - * @param mode - * @throws IOException */ private void verifyResults(Mode mode, String outPath) throws IOException { FileSystem fs = (Mode.local == mode ? FileSystem .getLocal(new Configuration()) : cluster.getFileSystem()); Path output = new Path(outPath); - Assert.assertTrue("Output dir does not exists!", fs.exists(output) + assertTrue("Output dir does not exists!", fs.exists(output) && fs.getFileStatus(output).isDir()); Path[] paths = FileUtil.stat2Paths(fs.listStatus(output, hiddenPathFilter)); - Assert.assertTrue("Split field dirs not found!", paths != null); + assertTrue("Split field dirs not found!", paths != null); for (Path path : paths) { String splitField = path.getName(); Path[] files = FileUtil.stat2Paths(fs.listStatus(path, hiddenPathFilter)); - Assert.assertTrue("No files found for path: " + path.toUri().getPath(), + assertTrue("No files found for path: " + path.toUri().getPath(), files != null); for (Path filePath : files) { - Assert.assertTrue("This shouldn't be a directory", fs.isFile(filePath)); - + assertTrue("This shouldn't be a directory", fs.isFile(filePath)); BufferedReader reader = new BufferedReader(new InputStreamReader(fs .open(filePath))); String line = ""; int count = 0; while ((line = reader.readLine()) != null) { String[] fields = line.split("\\t"); - Assert.assertEquals(fields.length, 3); - Assert.assertEquals("Unexpected field value in the output record", + assertEquals(fields.length, 3); + assertEquals("Unexpected field value in the output record", splitField, fields[1]); count++; System.out.println("field: " + fields[1]); - } + } reader.close(); - Assert.assertEquals(count, 3); + assertEquals(count, 3); } } } Modified: pig/branches/spark/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestMultiStorageCompression.java URL: http://svn.apache.org/viewvc/pig/branches/spark/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestMultiStorageCompression.java?rev=1784237&r1=1784236&r2=1784237&view=diff ============================================================================== --- pig/branches/spark/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestMultiStorageCompression.java (original) +++ pig/branches/spark/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestMultiStorageCompression.java Fri Feb 24 08:19:42 2017 @@ -21,11 +21,14 @@ import java.io.FileNotFoundException; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.List; +import java.util.Set; import junit.framework.TestCase; +import org.apache.commons.io.FileUtils; import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.compress.BZip2Codec; @@ -37,6 +40,10 @@ import org.apache.pig.backend.executione import org.apache.pig.impl.logicalLayer.FrontendException; import org.apache.pig.test.Util; +import com.google.common.collect.Sets; + +import org.junit.Assert; + public class TestMultiStorageCompression extends TestCase { private static String patternString = "(\\d+)!+(\\w+)~+(\\w+)"; @@ -59,8 +66,8 @@ public class TestMultiStorageCompression filesToDelete.add(outputPath); try { - runQuery(outputPath, type); - verifyResults(type, filesToDelete, outputPath); + runQuery(outputPath, "0", type); + verifyResults(type, outputPath); } finally { cleanUpDirs(filesToDelete); } @@ -77,22 +84,22 @@ public class TestMultiStorageCompression filesToDelete.add(outputPath); try { - runQuery(outputPath, type); - verifyResults(type, filesToDelete, outputPath); + runQuery(outputPath, "0", type); + verifyResults(type, outputPath); } finally { cleanUpDirs(filesToDelete); } } - private void cleanUpDirs(List<String> filesToDelete) { + private void cleanUpDirs(List<String> filesToDelete) throws IOException { // Delete files recursively Collections.reverse(filesToDelete); for (String string : filesToDelete) - new File(string).delete(); + FileUtils.deleteDirectory(new File(string)); } - private void verifyResults(String type, List<String> filesToDelete, + private void verifyResults(String type, String outputPath) throws IOException, FileNotFoundException { // Verify the output File outputDir = new File(outputPath); @@ -114,12 +121,10 @@ public class TestMultiStorageCompression continue; String topFolder = outputPath + File.separator + indexFolder; File indexFolderFile = new File(topFolder); - filesToDelete.add(topFolder); String[] list = indexFolderFile.list(); for (String outputFile : list) { String file = topFolder + File.separator + outputFile; - filesToDelete.add(file); // Skip off any file starting with . if (outputFile.startsWith(".")) @@ -159,7 +164,7 @@ public class TestMultiStorageCompression } } - private void runQuery(String outputPath, String compressionType) + private void runQuery(String outputPath, String keyColIndices, String compressionType) throws Exception, ExecException, IOException, FrontendException { // create a data file @@ -172,7 +177,7 @@ public class TestMultiStorageCompression String query2 = "STORE A INTO '" + Util.encodeEscape(outputPath) + "' USING org.apache.pig.piggybank.storage.MultiStorage" + "('" - + Util.encodeEscape(outputPath) + "','0', '" + compressionType + "', '\\t');"; + + Util.encodeEscape(outputPath) + "','"+keyColIndices+"', '" + compressionType + "', '\\t');"; // Run Pig pig.setBatchOn(); @@ -182,5 +187,32 @@ public class TestMultiStorageCompression pig.executeBatch(); } + public void testMultiStorageShouldSupportMultiLevelAndGz() throws Exception { + String type = "gz"; + String outputDir = "output001.multi." + type; + List<String> filesToDelete = new ArrayList<String>(); + + String tmpDir = System.getProperty("java.io.tmpdir"); + String outputPath = tmpDir + File.separator + outputDir; + + filesToDelete.add(outputPath); + try { + runQuery(outputPath, "1,0", type); + Collection<File> fileList = FileUtils.listFiles(new File(outputPath),null,true); + Set<String> expectedPaths = Sets.newHashSet( "output001.multi.gz/a.gz/f1.gz/a-f1-0,000.gz", + "output001.multi.gz/b.gz/f2.gz/b-f2-0,000.gz", + "output001.multi.gz/c.gz/f3.gz/c-f3-0,000.gz", + "output001.multi.gz/d.gz/f4.gz/d-f4-0,000.gz"); + for (File file : fileList){ + String foundPath = file.getAbsolutePath().substring(file.getAbsolutePath().indexOf(outputDir)); + if (expectedPaths.contains(foundPath)){ + expectedPaths.remove(foundPath); + } + } + Assert.assertTrue(expectedPaths.isEmpty()); + } finally { + cleanUpDirs(filesToDelete); + } + } } Added: pig/branches/spark/dev-support/docker/Dockerfile URL: http://svn.apache.org/viewvc/pig/branches/spark/dev-support/docker/Dockerfile?rev=1784237&view=auto ============================================================================== --- pig/branches/spark/dev-support/docker/Dockerfile (added) +++ pig/branches/spark/dev-support/docker/Dockerfile Fri Feb 24 08:19:42 2017 @@ -0,0 +1,94 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Dockerfile for installing the necessary dependencies for building Apache Pig. +# See BUILDING.md. + +FROM ubuntu:trusty + +# Define working directory. +WORKDIR /root + +RUN apt-get update + +# Install dependencies from packages +RUN sed -i 's/# \(.*multiverse$\)/\1/g' /etc/apt/sources.list && \ + apt-get install -y build-essential && \ + apt-get install -y software-properties-common && \ + apt-get install --no-install-recommends -y \ + git subversion \ + byobu htop man unzip vim \ + cabal-install \ + curl wget \ + openjdk-7-jdk \ + ant ant-contrib ant-optional make maven \ + cmake gcc g++ protobuf-compiler \ + build-essential libtool \ + zlib1g-dev pkg-config libssl-dev \ + snappy libsnappy-dev \ + bzip2 libbz2-dev \ + libjansson-dev \ + fuse libfuse-dev \ + libcurl4-openssl-dev \ + python python2.7 && \ + rm -rf /var/lib/apt/lists/* + +# Define commonly used JAVA_HOME variable +ENV JAVA_HOME /usr/lib/jvm/java-7-openjdk-amd64 + +# Fixing the Apache commons / Maven dependency problem under Ubuntu: +# See http://wiki.apache.org/commons/VfsProblems +RUN cd /usr/share/maven/lib && ln -s ../../java/commons-lang.jar . + +# Avoid out of memory errors in builds +ENV MAVEN_OPTS -Xms256m -Xmx512m + +# Install findbugs +RUN mkdir -p /opt/findbugs && \ + wget http://sourceforge.net/projects/findbugs/files/findbugs/3.0.1/findbugs-noUpdateChecks-3.0.1.tar.gz/download \ + -O /opt/findbugs.tar.gz && \ + tar xzf /opt/findbugs.tar.gz --strip-components 1 -C /opt/findbugs +ENV FINDBUGS_HOME /opt/findbugs + +# Install Forrest in /usr/local/apache-forrest +# Screenscrape the download page for a local mirror URL +RUN cd /usr/local/ && \ + curl https://forrest.apache.org/mirrors.cgi | \ + fgrep href | fgrep apache-forrest-0.9 | \ + sed 's@^.*"\(http[^"]*apache-forrest-[^"]*.tar.gz\)".*@\1@' | \ + xargs -n1 -r wget + +# Unpack Apache Forrest +RUN cd /usr/local/ && \ + tar xzf apache-forrest-0.9-sources.tar.gz && \ + tar xzf apache-forrest-0.9-dependencies.tar.gz && \ + mv apache-forrest-0.9 apache-forrest +RUN cd /usr/local/apache-forrest/main && ./build.sh + +# The solution for https://issues.apache.org/jira/browse/PIG-3906 +RUN mkdir -p /usr/local/apache-forrest/plugins && chmod a+rwX -R /usr/local/apache-forrest/plugins +RUN mkdir -p /usr/local/apache-forrest/build/plugins && chmod a+rwX -R /usr/local/apache-forrest/build/plugins + +# Configure where forrest can be found +RUN echo 'forrest.home=/usr/local/apache-forrest' > build.properties +ENV FORREST_HOME /usr/local/apache-forrest + +# Add a welcome message and environment checks. +ADD build_env_checks.sh /root/build_env_checks.sh +RUN chmod 755 /root/build_env_checks.sh +ADD configure-for-user.sh /root/configure-for-user.sh +RUN chmod 755 /root/configure-for-user.sh +RUN echo '~/build_env_checks.sh' >> /root/.bashrc Added: pig/branches/spark/dev-support/docker/build_env_checks.sh URL: http://svn.apache.org/viewvc/pig/branches/spark/dev-support/docker/build_env_checks.sh?rev=1784237&view=auto ============================================================================== --- pig/branches/spark/dev-support/docker/build_env_checks.sh (added) +++ pig/branches/spark/dev-support/docker/build_env_checks.sh Fri Feb 24 08:19:42 2017 @@ -0,0 +1,120 @@ +#!/bin/bash + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# ------------------------------------------------------- +function showWelcome { + +# http://patorjk.com/software/taag/#p=display&f=Doom&t=Pig%20Builder +cat <<Welcome-message + +______ _ ______ _ _ _ +| ___ (_) | ___ \\ (_) | | | +| |_/ /_ __ _ | |_/ /_ _ _| | __| | ___ _ __ +| __/| |/ _\` | | ___ \\ | | | | |/ _\` |/ _ \\ '__| +| | | | (_| | | |_/ / |_| | | | (_| | __/ |_ +\\_| |_|\\__, | \\____/ \\__,_|_|_|\\__,_|\\___|_| + __/ | + |___/ + +This is the standard Apache Pig Developer build environment. +This has all the right tools installed required to build +Pig from source. + +Welcome-message +} + +# ------------------------------------------------------- + +function showAbort { + cat <<Abort-message + + ___ _ _ _ + / _ \\| | | | (_) +/ /_\\ \\ |__ ___ _ __| |_ _ _ __ __ _ +| _ | '_ \\ / _ \\| '__| __| | '_ \\ / _\` | +| | | | |_) | (_) | | | |_| | | | | (_| | +\\_| |_/_.__/ \\___/|_| \\__|_|_| |_|\\__, | + __/ | + |___/ + +Abort-message +} + +# ------------------------------------------------------- + +function failIfUserIsRoot { + if [ "$(id -u)" -eq "0" ]; # If you are root then something went wrong. + then + cat <<End-of-message + +Apparently you are inside this docker container as the user root. +Putting it simply: + + This should not occur. + +Known possible causes of this are: +1) Running this script as the root user ( Just don't ) +2) Running an old docker version ( upgrade to 1.4.1 or higher ) + +End-of-message + + showAbort + + logout + + fi +} + +# ------------------------------------------------------- + +# Configurable low water mark in GiB +MINIMAL_MEMORY_GiB=2 + +function warnIfLowMemory { + MINIMAL_MEMORY=$((MINIMAL_MEMORY_GiB*1024*1024)) # Convert to KiB + INSTALLED_MEMORY=$(fgrep MemTotal /proc/meminfo | awk '{print $2}') + if [ $((INSTALLED_MEMORY)) -le $((MINIMAL_MEMORY)) ]; + then + cat <<End-of-message + + _ ___ ___ +| | | \\/ | +| | _____ __ | . . | ___ _ __ ___ ___ _ __ _ _ +| | / _ \\ \\ /\\ / / | |\\/| |/ _ \\ '_ \` _ \\ / _ \\| '__| | | | +| |___| (_) \\ V V / | | | | __/ | | | | | (_) | | | |_| | +\\_____/\\___/ \\_/\\_/ \\_| |_/\\___|_| |_| |_|\\___/|_| \\__, | + __/ | + |___/ + +Your system is running on very little memory. +This means it may work but it wil most likely be slower than needed. + +If you are running this via boot2docker you can simply increase +the available memory to atleast ${MINIMAL_MEMORY_GiB} GiB (you have $((INSTALLED_MEMORY/(1024*1024))) GiB ) + +End-of-message + fi +} + +# ------------------------------------------------------- + +showWelcome +warnIfLowMemory +failIfUserIsRoot + +# ------------------------------------------------------- Added: pig/branches/spark/dev-support/docker/configure-for-user.sh URL: http://svn.apache.org/viewvc/pig/branches/spark/dev-support/docker/configure-for-user.sh?rev=1784237&view=auto ============================================================================== --- pig/branches/spark/dev-support/docker/configure-for-user.sh (added) +++ pig/branches/spark/dev-support/docker/configure-for-user.sh Fri Feb 24 08:19:42 2017 @@ -0,0 +1,40 @@ +#!/bin/sh + +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# This script is used to tweak the environment at the moment we know +# the real username of the person using this. +# By making this script a part of the image we can extend and update +# it to fit future needs more easily. + +# Native Linux (direct or via sudo) +USER_NAME=$1 +USER_ID=$2 +GROUP_ID=$3 + +groupadd --non-unique -g ${GROUP_ID} ${USER_NAME} +useradd -g ${GROUP_ID} -u ${USER_ID} -k /root -m ${USER_NAME} +echo "export HOME=/home/${USER_NAME}" >> ~/.bashrc +echo "export USER=${USER_NAME}" >> ~/.bashrc + +VBOXSF_GROUP_LINE=$4 +if [ -n ${VBOXSF_GROUP_LINE} ]; +then + echo ${VBOXSF_GROUP_LINE} >> /etc/group + usermod -aG vboxsf ${USER_NAME} +fi + +echo "${USER_NAME} ALL=(ALL) NOPASSWD: ALL" >> /etc/sudoers.d/${USER_NAME}