Modified: hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestMergeJoin.java URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestMergeJoin.java?rev=831443&r1=831442&r2=831443&view=diff ============================================================================== --- hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestMergeJoin.java (original) +++ hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestMergeJoin.java Fri Oct 30 20:23:54 2009 @@ -19,18 +19,23 @@ import java.io.IOException; import java.util.Iterator; +import java.util.Map; import junit.framework.Assert; +import org.apache.hadoop.conf.Configuration; import org.apache.pig.ExecType; +import org.apache.pig.IndexableLoadFunc; import org.apache.pig.PigException; import org.apache.pig.PigServer; +import org.apache.pig.backend.datastorage.DataStorage; import org.apache.pig.backend.executionengine.ExecException; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan; import org.apache.pig.data.BagFactory; import org.apache.pig.data.DataBag; import org.apache.pig.data.Tuple; import org.apache.pig.impl.PigContext; +import org.apache.pig.impl.io.BufferedPositionedInputStream; import org.apache.pig.impl.logicalLayer.LogicalPlan; import org.apache.pig.impl.logicalLayer.schema.Schema; import org.apache.pig.impl.util.LogUtils; @@ -49,6 +54,7 @@ public TestMergeJoin() throws ExecException, IOException{ pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties()); +// pigServer = new PigServer(ExecType.LOCAL); } /** * @throws java.lang.Exception @@ -456,6 +462,58 @@ } @Test + public void testExpression() throws IOException{ + Util.createInputFile(cluster, "temp_file1", new String[]{"8", "9"}); + Util.createInputFile(cluster, "temp_file2", new String[]{"10", "11"}); + Util.createInputFile(cluster, "temp_file3", new String[]{"20"}); + Util.createInputFile(cluster, "leftinput", new String[] { "9", "11"}); + pigServer.registerQuery("A = LOAD 'leftinput' as (a:int);"); + pigServer.registerQuery("B = LOAD 'temp_file*' as (a:int);"); + DataBag dbmrj = BagFactory.getInstance().newDefaultBag(), dbshj = BagFactory.getInstance().newDefaultBag(); + { + pigServer.registerQuery("C = join A by $0 + 10, B by $0 + 10 using \"merge\";"); + Iterator<Tuple> iter = pigServer.openIterator("C"); + + while(iter.hasNext()) { + dbmrj.add(iter.next()); + } + } + { + pigServer.registerQuery("C = join A by $0 + 10, B by $0 + 10;"); + Iterator<Tuple> iter = pigServer.openIterator("C"); + + while(iter.hasNext()) { + dbshj.add(iter.next()); + } + } + Util.deleteFile(cluster, "temp_file1"); + Util.deleteFile(cluster, "temp_file2"); + Util.deleteFile(cluster, "temp_file3"); + Util.deleteFile(cluster, "leftinput"); + Assert.assertEquals(dbmrj.size(),dbshj.size()); + Assert.assertEquals(true, TestHelper.compareBags(dbmrj, dbshj)); + } + + @Test + public void testExpressionFail() throws IOException{ + pigServer.registerQuery("A = LOAD 'leftinput' as (a:int);"); + pigServer.registerQuery("B = LOAD 'temp_file*' using " + + DummyIndexableLoader.class.getName() + "() as (a:int);"); + boolean exceptionThrown = false; + try { + pigServer.registerQuery("C = join A by $0 + 10, B by $0 + 10 using \"merge\";"); + Iterator<Tuple> iter = pigServer.openIterator("C"); + + }catch (Exception e) { + e.printStackTrace(); + PigException pe = LogUtils.getPigException(e); + Assert.assertEquals(1106, pe.getErrorCode()); + exceptionThrown = true; + } + Assert.assertEquals(true, exceptionThrown); + } + + @Test public void testMergeJoinSch1() throws IOException{ pigServer.registerQuery("A = LOAD '" + INPUT_FILE + "' as (x:int,y:int);"); pigServer.registerQuery("B = LOAD '" + INPUT_FILE + "' as (x:int,y:int);"); @@ -478,4 +536,158 @@ shjSch = pigServer.dumpSchema("C"); Assert.assertTrue(shjSch == null); } + + + /** + * A dummy loader which implements {...@link IndexableLoadFunc} to test + * that expressions are not allowed as merge join keys when the right input's + * loader implements {...@link IndexableLoadFunc} + */ + public static class DummyIndexableLoader implements IndexableLoadFunc { + + /** + * + */ + public DummyIndexableLoader() { + // TODO Auto-generated constructor stub + } + + /* (non-Javadoc) + * @see org.apache.pig.IndexableLoadFunc#close() + */ + @Override + public void close() throws IOException { + // TODO Auto-generated method stub + + } + + /* (non-Javadoc) + * @see org.apache.pig.IndexableLoadFunc#seekNear(org.apache.pig.data.Tuple) + */ + @Override + public void seekNear(Tuple keys) throws IOException { + // TODO Auto-generated method stub + + } + + /* (non-Javadoc) + * @see org.apache.pig.LoadFunc#bindTo(java.lang.String, org.apache.pig.impl.io.BufferedPositionedInputStream, long, long) + */ + @Override + public void bindTo(String fileName, BufferedPositionedInputStream is, + long offset, long end) throws IOException { + // TODO Auto-generated method stub + + } + + /* (non-Javadoc) + * @see org.apache.pig.LoadFunc#bytesToBag(byte[]) + */ + @Override + public DataBag bytesToBag(byte[] b) throws IOException { + // TODO Auto-generated method stub + return null; + } + + /* (non-Javadoc) + * @see org.apache.pig.LoadFunc#bytesToCharArray(byte[]) + */ + @Override + public String bytesToCharArray(byte[] b) throws IOException { + // TODO Auto-generated method stub + return null; + } + + /* (non-Javadoc) + * @see org.apache.pig.LoadFunc#bytesToDouble(byte[]) + */ + @Override + public Double bytesToDouble(byte[] b) throws IOException { + // TODO Auto-generated method stub + return null; + } + + /* (non-Javadoc) + * @see org.apache.pig.LoadFunc#bytesToFloat(byte[]) + */ + @Override + public Float bytesToFloat(byte[] b) throws IOException { + // TODO Auto-generated method stub + return null; + } + + /* (non-Javadoc) + * @see org.apache.pig.LoadFunc#bytesToInteger(byte[]) + */ + @Override + public Integer bytesToInteger(byte[] b) throws IOException { + // TODO Auto-generated method stub + return null; + } + + /* (non-Javadoc) + * @see org.apache.pig.LoadFunc#bytesToLong(byte[]) + */ + @Override + public Long bytesToLong(byte[] b) throws IOException { + // TODO Auto-generated method stub + return null; + } + + /* (non-Javadoc) + * @see org.apache.pig.LoadFunc#bytesToMap(byte[]) + */ + @Override + public Map<String, Object> bytesToMap(byte[] b) throws IOException { + // TODO Auto-generated method stub + return null; + } + + /* (non-Javadoc) + * @see org.apache.pig.LoadFunc#bytesToTuple(byte[]) + */ + @Override + public Tuple bytesToTuple(byte[] b) throws IOException { + // TODO Auto-generated method stub + return null; + } + + /* (non-Javadoc) + * @see org.apache.pig.LoadFunc#determineSchema(java.lang.String, org.apache.pig.ExecType, org.apache.pig.backend.datastorage.DataStorage) + */ + @Override + public Schema determineSchema(String fileName, ExecType execType, + DataStorage storage) throws IOException { + // TODO Auto-generated method stub + return null; + } + + /* (non-Javadoc) + * @see org.apache.pig.LoadFunc#fieldsToRead(org.apache.pig.impl.logicalLayer.schema.Schema) + */ + @Override + public void fieldsToRead(Schema schema) { + // TODO Auto-generated method stub + + } + + /* (non-Javadoc) + * @see org.apache.pig.LoadFunc#getNext() + */ + @Override + public Tuple getNext() throws IOException { + // TODO Auto-generated method stub + return null; + } + + /* (non-Javadoc) + * @see org.apache.pig.IndexableLoadFunc#initialize(org.apache.hadoop.conf.Configuration) + */ + @Override + public void initialize(Configuration conf) throws IOException { + // TODO Auto-generated method stub + + } + + } }
