Author: gates Date: Wed Mar 17 16:26:59 2010 New Revision: 924355 URL: http://svn.apache.org/viewvc?rev=924355&view=rev Log: PIG-1284 Added XMLLoader to piggybank.
Added: hadoop/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/XMLLoader.java hadoop/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestXMLLoader.java Modified: hadoop/pig/trunk/contrib/CHANGES.txt Modified: hadoop/pig/trunk/contrib/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/CHANGES.txt?rev=924355&r1=924354&r2=924355&view=diff ============================================================================== --- hadoop/pig/trunk/contrib/CHANGES.txt (original) +++ hadoop/pig/trunk/contrib/CHANGES.txt Wed Mar 17 16:26:59 2010 @@ -1,10 +1,111 @@ -PIG-1126: updated fieldsToRead function (olgan) +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +Pig Change Log + +Trunk (unreleased changes) + +INCOMPATIBLE CHANGES + +IMPROVEMENTS + +PIG-1284 Added XMLLoader to piggybank (aloknsingh via gates) + +OPTIMIZATIONS + +BUG FIXES + +Release 0.6.0 + +INCOMPATIBLE CHANGES + +PIG-1126: updated fieldsToRead function for piggybank loaders (olgan) + +IMPROVEMENTS + PIG-1015: [piggybank] DateExtractor should take into account timezones (dryaboy via olgan) -PIG-911: Added SequenceFileLoader (dryaboy via gates) + +OPTIMIZATIONS + +BUG FIXES + +Release 0.5.0 - Released + +INCOMPATIBLE CHANGES + +IMPROVEMENTS + +PIG-911: Added SequenceFileLoader to piggybank (dryaboy via gates) + +OPTIMIZATIONS + +BUG FIXES + +Release 0.4.0 - Released + +INCOMPATIBLE CHANGES + +IMPROVEMENTS + PIG-885: New UDFs for piggybank (Bin, Decode, LookupInFiles, RegexExtract, RegexMatch, HashFVN, DiffDate) (daijy) -PIG-868: added strin manipulation functions (bennies via olgan) -PIG-273: addition of Top and SearchQuery UDFs (ankur via olgan) + +PIG-868: added strin manipulation functions to piggybank (bennies via olgan) + +OPTIMIZATIONS + +BUG FIXES + +Release 0.3.0 - Released + +INCOMPATIBLE CHANGES + +IMPROVEMENTS + +PIG-732: addition of Top and SearchQuery UDFs to piggybank (ankur via olgan) + +OPTIMIZATIONS + +BUG FIXES + +Release 0.2.0 - Released + +INCOMPATIBLE CHANGES + +IMPROVEMENTS + +OPTIMIZATIONS + +BUG FIXES + +Release 0.1.0 - Released + +INCOMPATIBLE CHANGES + +IMPROVEMENTS + PIG-246: created UDF repository (olgan) -PIG-245: UDF wrappers for Java Math functions (ajaygarg via olgan) -PIG-277: UDF for computing correlation and covariance between data sets (ajaygarg via olgan) + +PIG-245: UDF wrappers for Java Math functions added to piggybank (ajaygarg via olgan) + +PIG-277: UDF for computing correlation and covariance between data sets added to piggybank (ajaygarg via olgan) + +OPTIMIZATIONS + +BUG FIXES + Added: hadoop/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/XMLLoader.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/XMLLoader.java?rev=924355&view=auto ============================================================================== --- hadoop/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/XMLLoader.java (added) +++ hadoop/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/XMLLoader.java Wed Mar 17 16:26:59 2010 @@ -0,0 +1,610 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.pig.piggybank.storage; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.InputFormat; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.mapreduce.lib.input.FileSplit; +import org.apache.pig.LoadFunc; +import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit; +import org.apache.pig.data.DataByteArray; +import org.apache.pig.data.Tuple; +import org.apache.pig.data.TupleFactory; +import org.apache.pig.impl.io.BufferedPositionedInputStream; + + + +/** + * A <code>XMLLoaderBufferedPositionedInputStream</code> is the package class and is the + * decorater overthe BufferedPositionedInputStream which in turn decorate + * BufferedInputStream. It contains <code>BufferedPositionedInputStream<code> + * input stream, which it uses as + * its basic source of data, possibly reading or providing additional + * functionality. The class <code>XMLLoaderBufferedPositionedInputStream</code> + * itself simply overrides the necessary medthod for reading i.e + * <code>read</code> <code>getPosition<code> with versions that + * pass all requests to the contained input + * stream or do some special processing. Subclasses of <code>XMLLoaderBufferedPositionedInputStream</code> + * may further override some of these methods + * and may also provide additional methods + * and fields. + * It also provides additional method <code>collectTag<collect> which will give the byte + * array between the tag which is a xml record. i.e <tag> .*</tag> will be returned + * + * @note we can't use the standard SAX or STAX parser as for a big xml + * the intermetant hadoop block may not be the valid xml and hence those + * parser may create pb. + * + * @since pig 2.0 + */ + +class XMLLoaderBufferedPositionedInputStream extends BufferedPositionedInputStream { + + public final static int S_START = 0; + public final static int S_MATCH_PREFIX = 1; + public final static int S_MATCH_TAG = 2; + + /** + * The input streamed to be filtered + */ + InputStream wrapperIn; + + /** + * The field to know if the underlying buffer contains any more bytes + */ + boolean _isReadable; + + /** + * Creates a <code>XMLLoaderBufferedPositionedInputStream</code> + * by assigning the argument <code>in</code> + * to the field <code>this.wrapperIn</code> so as + * to remember it for later use. + * + * @param in the underlying input stream, + */ + public XMLLoaderBufferedPositionedInputStream(InputStream in){ + super(in); + this.wrapperIn = in; + setReadable(true); + } + + /** + * Since the input stream is control by Pig or hadoop + * stream and there seems to be issue with multiple closing + * with hadoop and pig + * + * @exception IOException if an I/O error occurs. + */ + public void close() throws IOException { + // throw new IOException("Closing stream BAD"); + } + + /** + * Set the stream readable or non readable. This is needed + * to control the xml parsing. + * @param flag The boolean flag to be set + * @see XMLLoaderBufferedPositionedInputStream#isReadable + */ + private void setReadable(boolean flag) { + _isReadable = flag; + } + + /** + * See if the stream readable or non readable. This is needed + * to control the xml parsing. + * @return true if readable otherwise false + * @see XMLLoaderBufferedPositionedInputStream#setReadable + */ + public boolean isReadable() { + return _isReadable == true; + } + + /** + * @Override org.apache.pig.impl.io.BufferedPositionedInputStream.read + * It is just the wrapper for now. + * Reads the next byte of data from this input stream. The value + * byte is returned as an <code>int</code> in the range + * <code>0</code> to <code>255</code>. If no byte is available + * because the end of the stream has been reached, the value + * <code>-1</code> is returned. This method blocks until input data + * is available, the end of the stream is detected, or an exception + * is thrown. + * <p> + * This method + * simply performs <code>in.read()</code> and returns the result. + * + * @return the next byte of data, or <code>-1</code> if the end of the + * stream is reached. + * @exception IOException if an I/O error occurs. + * @see XMLLoaderBufferedPositionedInputStreamInputStream#wrapperIn + */ + public int read() throws IOException { + return wrapperIn.read(); + } + + /** + * This is collect the bytes from current position to the ending tag. + * This scans for the tags and do the pattern match byte by byte + * this must be used along with + * XMLLoaderBufferedPositionedInputStream#skipToTag + * + * @param tagName the end tag to search for + * + * @param limit the end pointer for the block for this mapper + * + * @return the byte array containing the documents until the end of tag + * + * @see loader.XMLLoaderBufferedPositionedInputStream.collectUntilEndTag + * + */ + private byte[] collectUntilEndTag(String tagName, long limit) { + + //@todo use the charset and get the charset encoding from the xml encoding. + byte[] tmp = tagName.getBytes(); + byte[] tag = new byte[tmp.length + 3]; + tag[0] = (byte)'<'; + tag[1] = (byte)'/'; + for (int i = 0; i < tmp.length; ++i) { + tag[2+i] = tmp[i]; + } + tag[tmp.length+2] = (byte)'>'; + // System.out.println("[collectUntilEndTag] TAG " + tag + tagName); // DEBUG + + ByteArrayOutputStream collectBuf = new ByteArrayOutputStream(1024); + int idxTagChar = 0; + while (true) { + int b = -1; + try { + b = this.read(); + if (b == -1) { + collectBuf.reset(); + this.setReadable(false); + break; + } + collectBuf.write((byte)(b)); + + // start to match the target close tag + if (b == tag[idxTagChar]) { + ++idxTagChar; + if (idxTagChar == tag.length) { + break; + } + } else { + idxTagChar = 0; + } + } + catch (IOException e) { + this.setReadable(false); + return null; + } + } + // DEBUG + //System.out.println("Match = " + new String(collectBuf.toByteArray())); + return collectBuf.toByteArray(); + } + + /** + * This is collect the from the matching tag. + * This scans for the tags and do the pattern match byte by byte + * This returns a part doc. it must be used along with + * XMLLoaderBufferedPositionedInputStream#collectUntilEndTag + * + * @param tagName the start tag to search for + * + * @param limit the end pointer for the block for this mapper + * + * @return the byte array containing match of the tag. + * + * @see loader.XMLLoaderBufferedPositionedInputStream.collectUntilEndTag + * + */ + private byte[] skipToTag(String tagName, long limit) { + + //@todo use the charset and get the charset encoding from the xml encoding. + byte[] tmp = tagName.getBytes(); + byte[] tag = new byte[tmp.length + 1]; + tag[0] = (byte)'<'; + for (int i = 0; i < tmp.length; ++i) { + tag[1+i] = tmp[i]; + } + //System.out.println("[skipToTag] TAG " + tag + tagName); // DEBUG + + ByteArrayOutputStream matchBuf = new ByteArrayOutputStream(512); + int idxTagChar = 0; + int state = S_START; + while (true) { + int b = -1; + try { + b = this.read(); + if (b == -1) { + state = S_START; + matchBuf.reset(); + this.setReadable(false); + break; + } + switch (state) { + case S_START: + // start to match the target open tag + if (b == tag[idxTagChar]) { + ++idxTagChar; + matchBuf.write((byte)(b)); + if (idxTagChar == tag.length) { + state = S_MATCH_PREFIX; + } + } else { // mismatch + idxTagChar = 0; + matchBuf.reset(); + } + break; + case S_MATCH_PREFIX: + // tag match iff next character is whitespaces or close tag mark + if (b == ' ' || b == '\t' || b == '>') { + matchBuf.write((byte)(b)); + state = S_MATCH_TAG; + } else { + idxTagChar = 0; + matchBuf.reset(); + state = S_START; + } + break; + case S_MATCH_TAG: + // keep copy characters until we hit the close tag mark + matchBuf.write((byte)(b)); + break; + default: + throw new IllegalArgumentException("Invalid state: " + state); + } + if (state == S_MATCH_TAG && b == '>') { + break; + } + if (state != S_MATCH_TAG && this.getPosition() > limit) { + // need to break, no record in this block + break; + } + // DEBUG + /* + if (idxTagChar > 0) { + System.out.println("Match b='" + (char)b + "'" + + ", tag='" + (char)tag[idxTagChar-1] + "'" + + ", idxTagChar=" + (idxTagChar-1) + + ", tagLength=" + tag.length); + } else { + System.out.println("Mismatch b='" + (char)b + "'" + + ", tag='" + (char)tag[idxTagChar] + "'" + + ", idxTagChar=" + (idxTagChar) + + ", tagLength=" + tag.length); + } + */ + } + catch (IOException e) { + this.setReadable(false); + return null; + } + } + // DEBUG + //System.out.println("Match = " + new String(matchBuf.toByteArray())); + return matchBuf.toByteArray(); + } + + /** + * This is collect bytes from start and end tag both inclusive + * This scans for the tags and do the pattern match byte by byte + * + * @param tagName the start tag to search for + * + * @param limit the end pointer for the block for this mapper + * + * @return the byte array containing match of the <code><tag>.*</tag><code>. + * + * @see loader.XMLLoaderBufferedPositionedInputStream.skipToTag + * + * @see loader.XMLLoaderBufferedPositionedInputStream.collectUntilEndTag + * + */ + byte[] collectTag(String tagName, long limit) throws IOException { + ByteArrayOutputStream collectBuf = new ByteArrayOutputStream(1024); + byte[] beginTag = skipToTag(tagName, limit); + byte[] untilTag = collectUntilEndTag(tagName, limit); + + if (beginTag.length > 0 && untilTag.length > 0) { + for (byte b: beginTag) { + collectBuf.write(b); + } + for (byte b: untilTag) { + collectBuf.write(b); + } + } + return collectBuf.toByteArray(); + } + +} + + +/** + * The load function to load the XML file + * This implements the LoadFunc interface which is used to parse records + * from a dataset. The various helper adaptor function is extended from loader.Utf8StorageConverter + * which included various functions to cast raw byte data into various datatypes. + * other sections of the code can call back to the loader to do the cast. + * This takes a xmlTag as the arg which it will use to split the inputdataset into + * multiple records. + * <code> + * + * For example if the input xml (input.xml) is like this + * <configuration> + * <property> + * <name> foobar </name> + * <value> barfoo </value> + * </property> + * <ignoreProperty> + * <name> foo </name> + * </ignoreProperty> + * <property> + * <name> justname </name> + * </property> + * </configuration> + * + * And your pig script is like this + * + * --load the jar files + * register /homes/aloks/pig/udfLib/loader.jar; + * -- load the dataset using XMLLoader + * -- A is the bag containing the tuple which contains one atom i.e doc see output + * A = load '/user/aloks/pig/input.xml using loader.XMLLoader('property') as (doc:chararray); + * --dump the result + * dump A; + * + * + * Then you will get the output + * + * (<property> + * <name> foobar </name> + * <value> barfoo </value> + * </property>) + * (<property> + * <name> justname </name> + * </property>) + * + * + * Where each () indicate one record + * + * + * </code> + */ + +public class XMLLoader extends LoadFunc { + + /** + * logger from pig + */ + protected final Log mLog = LogFactory.getLog(getClass()); + + private XMLFileRecordReader reader = null; + + + /** + * the tuple content which is used while returning + */ + private ArrayList<Object> mProtoTuple = null; + + /** + * The record seperated. The default value is 'document' + */ + public String recordIdentifier = "document"; + + + public XMLLoader() { + + } + + /** + * Constructs a Pig loader that uses specified string as the record seperater + * for example if the recordIdentifier is document. It will consider the record as + * <document> .* </document> + * + * @param recordIdentifier the xml tag which is used to pull records + * + */ + public XMLLoader(String recordIdentifier) { + this(); + this.recordIdentifier = recordIdentifier; + } + + /** + * Retrieves the next tuple to be processed. + * @return the next tuple to be processed or null if there are no more tuples + * to be processed. + * @throws IOException + */ + @Override + public Tuple getNext() throws IOException { + + boolean next = false; + + try { + next = reader.nextKeyValue(); + } catch (InterruptedException e) { + throw new IOException(e); + } + + if (!next) return null; + + Tuple t = null; + + try { + byte[] tagContent = (byte[]) reader.getCurrentValue(); + t = createTuple(tagContent); + } catch (Exception e) { + throw new IOException(e); + } + + return t; + + } + + public Tuple createTuple(byte[] content) throws Exception { + if (mProtoTuple == null) { + mProtoTuple = new ArrayList<Object>(); + } + if (content.length > 0) { + mProtoTuple.add(new DataByteArray(content)); + } + Tuple t = TupleFactory.getInstance().newTupleNoCopy(mProtoTuple); + mProtoTuple = null; + + return t; + } + + /** + * to check for equality + * @param object + */ + public boolean equals(Object obj) { + return equals((XMLLoader)obj); + } + + /** + * to check for equality + * @param XMLLoader object + */ + public boolean equals(XMLLoader other) { + return this.recordIdentifier.equals(other.recordIdentifier); + } + + @SuppressWarnings("unchecked") + @Override + public InputFormat getInputFormat() throws IOException { + return new XMLFileInputFormat(recordIdentifier); + } + + @SuppressWarnings("unchecked") + @Override + public void prepareToRead(RecordReader reader, PigSplit split) + throws IOException { + this.reader = (XMLFileRecordReader) reader; + } + + @Override + public void setLocation(String location, Job job) throws IOException { + FileInputFormat.setInputPaths(job, location); + } + + //------------------------------------------------------------------------ + // Implementation of InputFormat + + public static class XMLFileInputFormat extends FileInputFormat { + + private String recordIdentifier; + + public XMLFileInputFormat(String recordIdentifier) { + this.recordIdentifier = recordIdentifier; + } + + @SuppressWarnings("unchecked") + @Override + public RecordReader createRecordReader(InputSplit split, + TaskAttemptContext context) throws IOException, + InterruptedException { + + return new XMLFileRecordReader(recordIdentifier); + } + } + + //------------------------------------------------------------------------ + // Implementation of RecordReader + + public static class XMLFileRecordReader extends RecordReader { + + private long start; + private long end; + private String recordIdentifier; + + /* + * xmlloader input stream which has the ability to split the input + * dataset into records by the specified tag + */ + private XMLLoaderBufferedPositionedInputStream xmlLoaderBPIS = null; + + public XMLFileRecordReader(String recordIdentifier) { + this.recordIdentifier = recordIdentifier; + } + + @Override + public void initialize(InputSplit genericSplit, TaskAttemptContext context) + throws IOException, InterruptedException { + FileSplit split = (FileSplit) genericSplit; + Configuration job = context.getConfiguration(); + + start = split.getStart(); + end = start + split.getLength(); + final Path file = split.getPath(); + + // open the file and seek to the start of the split + FileSystem fs = file.getFileSystem(job); + FSDataInputStream fileIn = fs.open(split.getPath()); + + this.xmlLoaderBPIS = new XMLLoaderBufferedPositionedInputStream(fileIn); + } + + + @Override + public void close() throws IOException { + xmlLoaderBPIS.close(); + } + + @Override + public Object getCurrentKey() throws IOException, InterruptedException { + return null; + } + + @Override + public Object getCurrentValue() throws IOException, + InterruptedException { + return xmlLoaderBPIS.collectTag(recordIdentifier, end); + } + + @Override + public float getProgress() throws IOException, InterruptedException { + + return 0; + } + + + @Override + public boolean nextKeyValue() throws IOException, InterruptedException { + return xmlLoaderBPIS.isReadable(); + } + + } +} Added: hadoop/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestXMLLoader.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestXMLLoader.java?rev=924355&view=auto ============================================================================== --- hadoop/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestXMLLoader.java (added) +++ hadoop/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestXMLLoader.java Wed Mar 17 16:26:59 2010 @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the + * NOTICE file distributed with this work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and limitations under the License. + */ + +package org.apache.pig.piggybank.test.storage; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.regex.Pattern; + +import junit.framework.TestCase; + +import org.apache.pig.PigServer; +import org.apache.pig.data.DataByteArray; +import org.apache.pig.data.Tuple; +import static org.apache.pig.ExecType.LOCAL; + +public class TestXMLLoader extends TestCase { + private static String patternString = "(\\d+)!+(\\w+)~+(\\w+)"; + private final static Pattern pattern = Pattern.compile(patternString); + public static ArrayList<String[]> data = new ArrayList<String[]>(); + static { + data.add(new String[] { "<configuration>"}); + data.add(new String[] { "<property>"}); + data.add(new String[] { "<name> foobar </name>"}); + data.add(new String[] { "<value> barfoo </value>"}); + data.add(new String[] { "</property>"}); + data.add(new String[] { "<ignoreProperty>"}); + data.add(new String[] { "<name> foo </name>"}); + data.add(new String[] { "</ignoreProperty>"}); + data.add(new String[] { "<property>"}); + data.add(new String[] { "<name> justname </name>"}); + data.add(new String[] { "</property>"}); + data.add(new String[] { "</configuration>"}); + } + + public void testLoadXMLLoader() throws Exception { + //ArrayList<DataByteArray[]> expected = TestHelper.getExpected(data, pattern); + String filename = TestHelper.createTempFile(data, ""); + PigServer pig = new PigServer(LOCAL); + filename = filename.replace("\\", "\\\\"); + patternString = patternString.replace("\\", "\\\\"); + String query = "A = LOAD 'file:" + filename + "' USING org.apache.pig.piggybank.storage.XMLLoader('property') as (doc:chararray);"; + pig.registerQuery(query); + Iterator<?> it = pig.openIterator("A"); + int tupleCount = 0; + while (it.hasNext()) { + Tuple tuple = (Tuple) it.next(); + if (tuple == null) + break; + else { + //TestHelper.examineTuple(expected, tuple, tupleCount); + if (tuple.size() > 0) { + tupleCount++; + //System.out.println("tuple=" + tuple+":"+tuple.size()); + } + } + } + assertEquals(3, tupleCount); // pig adds extra + } +}