Author: rohini Date: Fri Apr 13 22:01:58 2018 New Revision: 1829112 URL: http://svn.apache.org/viewvc?rev=1829112&view=rev Log: PIG-4092: Predicate pushdown for Parquet (nkollar via rohini)
Modified: pig/trunk/CHANGES.txt pig/trunk/ivy.xml pig/trunk/ivy/libraries.properties pig/trunk/src/org/apache/pig/builtin/ParquetLoader.java pig/trunk/src/org/apache/pig/builtin/ParquetStorer.java pig/trunk/test/org/apache/pig/test/TestSplitCombine.java Modified: pig/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1829112&r1=1829111&r2=1829112&view=diff ============================================================================== --- pig/trunk/CHANGES.txt (original) +++ pig/trunk/CHANGES.txt Fri Apr 13 22:01:58 2018 @@ -26,6 +26,8 @@ PIG-5282: Upgade to Java 8 (satishsaley IMPROVEMENTS +PIG-4092: Predicate pushdown for Parquet (nkollar via rohini) + PIG-5317: Upgrade old dependencies: commons-lang, hsqldb, commons-logging (nkollar via rohini) PIG-5322: ConstantCalculator optimizer is not applied for split (rohini) Modified: pig/trunk/ivy.xml URL: http://svn.apache.org/viewvc/pig/trunk/ivy.xml?rev=1829112&r1=1829111&r2=1829112&view=diff ============================================================================== --- pig/trunk/ivy.xml (original) +++ pig/trunk/ivy.xml Fri Apr 13 22:01:58 2018 @@ -390,7 +390,7 @@ <dependency org="org.mockito" name="mockito-all" rev="${mockito.version}" conf="test->default"/> - <dependency org="com.twitter" name="parquet-pig-bundle" rev="${parquet-pig-bundle.version}" conf="compile->master"/> + <dependency org="org.apache.parquet" name="parquet-pig-bundle" rev="${parquet-pig-bundle.version}" conf="compile->master"/> <!-- for Spark 1.x integration --> <dependency org="org.apache.spark" name="spark-core_2.11" rev="${spark1.version}" conf="spark1->default"> Modified: pig/trunk/ivy/libraries.properties URL: http://svn.apache.org/viewvc/pig/trunk/ivy/libraries.properties?rev=1829112&r1=1829111&r2=1829112&view=diff ============================================================================== --- pig/trunk/ivy/libraries.properties (original) +++ pig/trunk/ivy/libraries.properties Fri Apr 13 22:01:58 2018 @@ -87,7 +87,7 @@ jansi.version=1.9 asm.version=3.3.1 snappy-java.version=1.1.1.3 tez.version=0.7.0 -parquet-pig-bundle.version=1.2.3 +parquet-pig-bundle.version=1.9.0 snappy.version=0.2 leveldbjni.version=1.8 curator.version=2.6.0 Modified: pig/trunk/src/org/apache/pig/builtin/ParquetLoader.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/builtin/ParquetLoader.java?rev=1829112&r1=1829111&r2=1829112&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/builtin/ParquetLoader.java (original) +++ pig/trunk/src/org/apache/pig/builtin/ParquetLoader.java Fri Apr 13 22:01:58 2018 @@ -20,8 +20,10 @@ import java.io.IOException; import java.util.List; import org.apache.hadoop.mapreduce.Job; +import org.apache.pig.Expression; import org.apache.pig.LoadFuncMetadataWrapper; import org.apache.pig.LoadMetadata; +import org.apache.pig.LoadPredicatePushdown; import org.apache.pig.LoadPushDown; import org.apache.pig.impl.logicalLayer.FrontendException; import org.apache.pig.impl.util.JarManager; @@ -29,7 +31,7 @@ import org.apache.pig.impl.util.JarManag /** * Wrapper class which will delegate calls to parquet.pig.ParquetLoader */ -public class ParquetLoader extends LoadFuncMetadataWrapper implements LoadPushDown { +public class ParquetLoader extends LoadFuncMetadataWrapper implements LoadPushDown, LoadPredicatePushdown { public ParquetLoader() throws FrontendException { this(null); @@ -37,12 +39,12 @@ public class ParquetLoader extends LoadF public ParquetLoader(String requestedSchemaStr) throws FrontendException { try { - init(new parquet.pig.ParquetLoader(requestedSchemaStr)); + init(new org.apache.parquet.pig.ParquetLoader(requestedSchemaStr)); } // if compile time dependency not found at runtime catch (NoClassDefFoundError e) { throw new FrontendException(String.format("Cannot instantiate class %s (%s)", - getClass().getName(), "parquet.pig.ParquetLoader"), 2259, e); + getClass().getName(), "org.apache.parquet.ParquetLoader"), 2259, e); } } @@ -52,7 +54,7 @@ public class ParquetLoader extends LoadF @Override public void setLocation(String location, Job job) throws IOException { - JarManager.addDependencyJars(job, parquet.Version.class); + JarManager.addDependencyJars(job, org.apache.parquet.Version.class); super.setLocation(location, job); } @@ -66,5 +68,19 @@ public class ParquetLoader extends LoadF throws FrontendException { return ((LoadPushDown)super.loadFunc()).pushProjection(requiredFieldList); } - + + @Override + public List<String> getPredicateFields(String location, Job job) throws IOException { + return ((LoadPredicatePushdown)super.loadFunc()).getPredicateFields(location, job); + } + + @Override + public List<Expression.OpType> getSupportedExpressionTypes() { + return ((LoadPredicatePushdown)super.loadFunc()).getSupportedExpressionTypes(); + } + + @Override + public void setPushdownPredicate(Expression predicate) throws IOException { + ((LoadPredicatePushdown)super.loadFunc()).setPushdownPredicate(predicate); + } } Modified: pig/trunk/src/org/apache/pig/builtin/ParquetStorer.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/builtin/ParquetStorer.java?rev=1829112&r1=1829111&r2=1829112&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/builtin/ParquetStorer.java (original) +++ pig/trunk/src/org/apache/pig/builtin/ParquetStorer.java Fri Apr 13 22:01:58 2018 @@ -31,12 +31,12 @@ public class ParquetStorer extends Store public ParquetStorer() throws FrontendException { try { - init(new parquet.pig.ParquetStorer()); + init(new org.apache.parquet.pig.ParquetStorer()); } // if compile time dependency not found at runtime catch (NoClassDefFoundError e) { throw new FrontendException(String.format("Cannot instantiate class %s (%s)", - getClass().getName(), "parquet.pig.ParquetStorer"), 2259, e); + getClass().getName(), "org.apache.parquet.pig.ParquetStorer"), 2259, e); } } @@ -49,7 +49,7 @@ public class ParquetStorer extends Store */ @Override public void setStoreLocation(String location, Job job) throws IOException { - JarManager.addDependencyJars(job, parquet.Version.class); + JarManager.addDependencyJars(job, org.apache.parquet.Version.class); super.setStoreLocation(location, job); } Modified: pig/trunk/test/org/apache/pig/test/TestSplitCombine.java URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestSplitCombine.java?rev=1829112&r1=1829111&r2=1829112&view=diff ============================================================================== --- pig/trunk/test/org/apache/pig/test/TestSplitCombine.java (original) +++ pig/trunk/test/org/apache/pig/test/TestSplitCombine.java Fri Apr 13 22:01:58 2018 @@ -41,8 +41,8 @@ import org.apache.pig.impl.plan.Operator import org.junit.Before; import org.junit.Test; -import parquet.hadoop.ParquetInputSplit; -import parquet.hadoop.metadata.BlockMetaData; +import org.apache.parquet.hadoop.ParquetInputSplit; +import org.apache.parquet.hadoop.metadata.BlockMetaData; public class TestSplitCombine { private Configuration conf; @@ -527,7 +527,7 @@ public class TestSplitCombine { // first split is parquetinputsplit rawSplits.add(new ParquetInputSplit(new Path("path1"), 0, 100, new String[] { "l1", "l2", "l3" }, - new ArrayList<BlockMetaData>(), "", "", + new ArrayList<BlockMetaData>(), "message dummy {}", "", new HashMap<String, String>(), new HashMap<String, String>())); // second split is file split rawSplits.add(new FileSplit(new Path("path2"), 0, 400, new String[] { @@ -559,7 +559,7 @@ public class TestSplitCombine { Assert.assertEquals(500, anotherSplit.getLength()); Assert.assertEquals(2, anotherSplit.getNumPaths()); - Assert.assertEquals("parquet.hadoop.ParquetInputSplit", + Assert.assertEquals("org.apache.parquet.hadoop.ParquetInputSplit", (anotherSplit.getWrappedSplit(0).getClass().getName())); Assert.assertEquals( "org.apache.hadoop.mapreduce.lib.input.FileSplit",