Author: pradeepkth
Date: Thu Oct 29 18:00:26 2009
New Revision: 831051
URL: http://svn.apache.org/viewvc?rev=831051&view=rev
Log:
added missing files from previous commit for PIG-953
Added:
hadoop/pig/trunk/src/org/apache/pig/CommittableStoreFunc.java
hadoop/pig/trunk/src/org/apache/pig/IndexableLoadFunc.java
hadoop/pig/trunk/src/org/apache/pig/SortColInfo.java
hadoop/pig/trunk/src/org/apache/pig/SortInfo.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputCommitter.java
hadoop/pig/trunk/src/org/apache/pig/impl/builtin/DefaultIndexableLoader.java
hadoop/pig/trunk/src/org/apache/pig/impl/util/Utils.java
Added: hadoop/pig/trunk/src/org/apache/pig/CommittableStoreFunc.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/CommittableStoreFunc.java?rev=831051&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/CommittableStoreFunc.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/CommittableStoreFunc.java Thu Oct 29
18:00:26 2009
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil;
+
+/**
+ * A storefunc which has an extra commit() method which is called
+ * when all mappers (when the storefunc is part of map) or reducers (when the
+ * storefunc is part of reduce) are finished. Currently this will allow
storefuncs
+ * to do any cleanup/finalizing activities knowing that all the maps/reducers
+ * have finished - one such use case is for zebra storage to build an index
+ * for sorted files once all writes are done.
+ */
+public interface CommittableStoreFunc extends StoreFunc {
+ /**
+ * This method is called when all mappers (when the storefunc is part of
+ * map) or reducers (when the storefunc is part of reduce) are finished.
+ * This allows the storeFunc to do any global commit actions - only called
+ * when all mappers/reducers successfully complete.
+ *
+ * If the StoreFunc needs to get hold of StoreConfig object for the store
+ * it can call {...@link
MapRedUtil#getStoreConfig(org.apache.hadoop.mapred.JobConf)} where
+ * conf is the Configuration object passed in the commit() call.
+ *
+ * @param conf Configuration object for the job
+ */
+ public void commit(Configuration conf) throws IOException;
+}
Added: hadoop/pig/trunk/src/org/apache/pig/IndexableLoadFunc.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/IndexableLoadFunc.java?rev=831051&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/IndexableLoadFunc.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/IndexableLoadFunc.java Thu Oct 29
18:00:26 2009
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.pig.data.Tuple;
+
+/**
+ * This interface is intended for use by LoadFunc implementations
+ * which have an internal index for sorted data and can use the index
+ * to support merge join in pig. Interaction with the index
+ * is abstracted away by the methods in this interface which the pig
+ * runtime will call in a particular sequence to get the records it
+ * needs to perform the merge based join.
+ *
+ * The sequence of calls made from the pig runtime are:
+ *
+ * {...@link IndexableLoadFunc#initialize(Configuration)}
+ * IndexableLoadFunc.bindTo(filename, bufferedPositionedInputStream, 0,
LONG.MAX_VALUE);
+ * (the bufferedPositionedInputStream is a decorator around the underlying
+ * DFS input stream)
+ * IndexableLoadFunc.seekNear(keys);
+ * A series of IndexableLoadFunc.getNext(); calls to perform the join
+ * IndexableLoadFunc.close();
+ *
+ */
+public interface IndexableLoadFunc extends LoadFunc {
+
+ /**
+ * This method is called by pig run time to allow the
+ * IndexableLoadFunc to perform any initialization actions
+ * @param conf The job configuration object
+ */
+ public void initialize(Configuration conf) throws IOException;
+
+ /**
+ * This method is called by the pig runtime to indicate
+ * to the LoadFunc to position its underlying input stream
+ * near the keys supplied as the argument. Specifically:
+ * 1) if the keys are present in the input stream, the loadfunc
+ * implementation should position its read position to
+ * a record where the key(s) is/are the biggest key(s) less than
+ * the key(s) supplied in the argument OR to the record with the
+ * first occurrence of the keys(s) supplied.
+ * 2) if the key(s) are absent in the input stream, the implementation
+ * should position its read position to a record where the key(s)
+ * is/are the biggest key(s) less than the key(s) supplied OR to the
+ * first record where the key(s) is/are the smallest key(s) greater
+ * than the keys(s) supplied.
+ * The description above holds for descending order data in
+ * a similar manner with "biggest" and "less than" replaced with
+ * "smallest" and "greater than" and vice versa.
+ *
+ * @param keys Tuple with join keys (which are a prefix of the sort
+ * keys of the input data). For example if the data is sorted on
+ * columns in position 2,4,5 any of the following Tuples are
+ * valid as an argument value:
+ * (fieldAt(2))
+ * (fieldAt(2), fieldAt(4))
+ * (fieldAt(2), fieldAt(4), fieldAt(5))
+ *
+ * The following are some invalid cases:
+ * (fieldAt(4))
+ * (fieldAt(2), fieldAt(5))
+ * (fieldAt(4), fieldAt(5))
+ *
+ * @throws IOException When the loadFunc is unable to position
+ * to the required point in its input stream
+ */
+ public void seekNear(Tuple keys) throws IOException;
+
+
+ /**
+ * A method called by the pig runtime to give an opportunity
+ * for implementations to perform cleanup actions like closing
+ * the underlying input stream. This is necessary since while
+ * performing a join the pig run time may determine than no further
+ * join is possible with remaining records and may indicate to the
+ * IndexableLoader to cleanup by calling this method.
+ *
+ * @throws IOException if the loadfunc is unable to perform
+ * its close actions.
+ */
+ public void close() throws IOException;
+}
Added: hadoop/pig/trunk/src/org/apache/pig/SortColInfo.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/SortColInfo.java?rev=831051&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/SortColInfo.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/SortColInfo.java Thu Oct 29 18:00:26
2009
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig;
+
+import java.io.Serializable;
+
+import org.apache.pig.impl.util.Utils;
+
+/**
+ * A class representing information about a sort column to pass
+ * in {...@link SortInfo} to storage functions in {...@link StoreConfig}
+ */
+public class SortColInfo implements Serializable {
+
+ /**
+ *
+ */
+ private static final long serialVersionUID = 1L;
+
+ // name of sort column
+ private String colName;
+
+ // index position (0 based) of sort column
+ private int colIndex;
+
+ public enum Order { ASCENDING, DESCENDING }
+
+ private Order sortOrder;
+
+
+ /**
+ * @param colName sort column name
+ * @param colIndex index position (0 based) of sort column
+ * @param orderingType whether the column is sorted ascending or descending
+ */
+ public SortColInfo(String colName, int colIndex, Order orderingType) {
+ this.colName = colName;
+ this.colIndex = colIndex;
+ this.sortOrder = orderingType;
+ }
+
+ /**
+ * @return the sort column name - could be null or empty string if
+ * column name could not be determined either because of the absence of
+ * a schema or because the schema had the column name as null or empty
+ * string - caller should check for these conditions.
+ */
+ public String getColName() {
+ return colName;
+ }
+
+ /**
+ * @return index position (0 based) of sort column
+ */
+ public int getColIndex() {
+ return colIndex;
+ }
+
+ /**
+ * @return whether the column is sorted ascending or descending
+ */
+ public Order getSortOrder() {
+ return sortOrder;
+ }
+
+ /* (non-Javadoc)
+ * @see java.lang.Object#hashCode()
+ */
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + ((colName == null) ? 0 : colName.hashCode());
+ result = prime * result + colIndex;
+ result = prime * result + ((sortOrder == Order.ASCENDING) ? 1 : 2);
+ return result;
+ }
+
+ /* (non-Javadoc)
+ * @see java.lang.Object#equals(java.lang.Object)
+ */
+ @Override
+ public boolean equals(Object obj) {
+ if(!Utils.checkNullAndClass(this, obj)) {
+ return false;
+ }
+ SortColInfo other = (SortColInfo)obj;
+ return Utils.checkNullEquals(this.colName, other.colName, true) &&
+ this.colIndex == other.colIndex &&
+ this.sortOrder == other.sortOrder;
+ }
+
+ /* (non-Javadoc)
+ * @see java.lang.Object#toString()
+ */
+ @Override
+ public String toString() {
+ return "colname:" + colName +",colindex:" + colIndex +
",orderingType:"
+ + (sortOrder == Order.ASCENDING ? "ascending" : "descending");
+ }
+
+
+
+
+}
Added: hadoop/pig/trunk/src/org/apache/pig/SortInfo.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/SortInfo.java?rev=831051&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/SortInfo.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/SortInfo.java Thu Oct 29 18:00:26 2009
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.pig.impl.util.Utils;
+
+/**
+ * Class to communicate sort column information based on
+ * order by statment's sort columns and schema
+ */
+public class SortInfo implements Serializable {
+
+ /**
+ *
+ */
+ private static final long serialVersionUID = 1L;
+
+ boolean isGloballySorted = true; // in pig this is the default
+
+ List<SortColInfo> sortColInfoList;
+
+ /**
+ * @param sortColInfoList
+ */
+ public SortInfo(List<SortColInfo> sortColInfoList){
+ this.sortColInfoList = sortColInfoList;
+ }
+
+ /**
+ * @return the sortColInfoList
+ */
+ public List<SortColInfo> getSortColInfoList() {
+ return new ArrayList<SortColInfo>(sortColInfoList);
+ }
+
+ /* (non-Javadoc)
+ * @see java.lang.Object#hashCode()
+ */
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + ((sortColInfoList == null) ? 0 :
+ sortColInfoList.hashCode());
+ result = prime * result + (isGloballySorted ? 1: 0);
+ return result;
+ }
+
+ /**
+ * @return the isGloballySorted
+ */
+ public boolean isGloballySorted() {
+ return isGloballySorted;
+ }
+
+
+ /* (non-Javadoc)
+ * @see java.lang.Object#equals(java.lang.Object)
+ */
+ @Override
+ public boolean equals(Object obj) {
+ if(!Utils.checkNullAndClass(this, obj)) {
+ return false;
+ }
+ SortInfo other = (SortInfo)obj;
+ return (
+ isGloballySorted == other.isGloballySorted &&
+ Utils.checkNullEquals(sortColInfoList, other.sortColInfoList,
true));
+ }
+
+ /* (non-Javadoc)
+ * @see java.lang.Object#toString()
+ */
+ @Override
+ public String toString() {
+ return "GlobalSort:" + isGloballySorted +", sort column info list:" +
sortColInfoList;
+ }
+
+
+
+}
Added:
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputCommitter.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputCommitter.java?rev=831051&view=auto
==============================================================================
---
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputCommitter.java
(added)
+++
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputCommitter.java
Thu Oct 29 18:00:26 2009
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.FileOutputCommitter;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobContext;
+import org.apache.pig.CommittableStoreFunc;
+import org.apache.pig.StoreConfig;
+import org.apache.pig.StoreFunc;
+import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
+import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
+import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.util.ObjectSerializer;
+
+/**
+ * A specialization of the default FileOutputCommitter to allow
+ * pig to call commit() on the StoreFunc's associated with the stores
+ * in a job IF the StoreFunc's are CommittableStoreFunc's
+ */
+...@suppresswarnings("deprecation")
+public class PigOutputCommitter extends FileOutputCommitter {
+
+ /* (non-Javadoc)
+ * @see
org.apache.hadoop.mapred.FileOutputCommitter#cleanupJob(org.apache.hadoop.mapred.JobContext)
+ */
+ @SuppressWarnings({ "unchecked", "deprecation" })
+ @Override
+ public void cleanupJob(JobContext context) throws IOException {
+ Configuration conf = context.getConfiguration();
+ // the following is needed to correctly deserialize udfs in
+ // the map and reduce plans below
+ PigContext.setPackageImportList((ArrayList<String>)ObjectSerializer.
+ deserialize(conf.get("udf.import.list")));
+ super.cleanupJob(context);
+
+
+ // call commit() on the StoreFunc's associated with the stores
+ // in the job IF the StoreFunc's are CommittableStoreFunc's
+ // look for storeFuncs in the conf - there are two cases
+ // 1) Job with single store - in this case, there would be storefunc
+ // stored in the conf which we can use
+ // 2) Multi store case - in this case, there is no single storefunc
+ // in the conf - instead we would need to look at the
+ // map and reduce plans and get the POStores out of it and then get
hold
+ // of the respective StoreFuncs
+ String sFuncString = conf.get("pig.storeFunc");
+ PhysicalPlan mp = (PhysicalPlan) ObjectSerializer.deserialize(
+ conf.get("pig.mapPlan"));
+ List<POStore> mapStores = PlanHelper.getStores(mp);
+ PhysicalPlan rp = (PhysicalPlan) ObjectSerializer.deserialize(
+ conf.get("pig.reducePlan"));
+ List<POStore> reduceStores = new ArrayList<POStore>();
+ if(rp != null) {
+ reduceStores = PlanHelper.getStores(rp);
+ }
+ // In single store case, we would have removed the store from the
+ // plan in JobControlCompiler
+ if(sFuncString != null && (mapStores.size() + reduceStores.size() ==
0)) {
+ // single store case
+ StoreFunc sFunc = MapRedUtil.getStoreFunc(new JobConf(conf));
+ commit(sFunc, conf, conf.get(JobControlCompiler.PIG_STORE_CONFIG),
+ sFuncString);
+ } else {
+ // multi store case
+ commitStores(mapStores, conf);
+ commitStores(reduceStores, conf);
+
+ }
+ }
+
+ private void commit(StoreFunc sFunc, Configuration conf,
+ StoreConfig storeConfig, String sFuncString) throws IOException {
+ if(sFunc != null && CommittableStoreFunc.class.isAssignableFrom(
+ sFunc.getClass())) {
+ CommittableStoreFunc csFunc = (CommittableStoreFunc)sFunc;
+ // make a copy of the conf since we may be committing multiple
+ // stores and set storeFunc and StoreConfig
+ // pertaining to this store in the copy and use it
+ Configuration confCopy = new Configuration(conf);
+ confCopy.set("pig.storeFunc", ObjectSerializer.serialize(
+ sFuncString));
+ confCopy.set(JobControlCompiler.PIG_STORE_CONFIG,
+ ObjectSerializer.serialize(storeConfig));
+
+ csFunc.commit(confCopy);
+ }
+ }
+
+ private void commit(StoreFunc sFunc, Configuration conf,
+ String storeConfigSerializedString, String sFuncString) throws
IOException {
+ if(sFunc != null && CommittableStoreFunc.class.isAssignableFrom(
+ sFunc.getClass())) {
+ CommittableStoreFunc csFunc = (CommittableStoreFunc)sFunc;
+ // make a copy of the conf since we may be committing multple
+ // sores and set storeFunc and StoreConfig
+ // pertaining to this store in the copy and use it
+ Configuration confCopy = new Configuration(conf);
+ confCopy.set("pig.storeFunc", ObjectSerializer.serialize(
+ sFuncString));
+ confCopy.set(JobControlCompiler.PIG_STORE_CONFIG,
+ storeConfigSerializedString);
+
+ csFunc.commit(confCopy);
+ }
+ }
+
+ private void commitStores(List<POStore> stores, Configuration conf)
+ throws IOException {
+ for (POStore store : stores) {
+ StoreFunc sFunc = (StoreFunc)PigContext.instantiateFuncFromSpec(
+ store.getSFile().getFuncSpec());
+ StoreConfig storeConfig = new StoreConfig(store.getSFile().
+ getFileName(), store.getSchema(), store.getSortInfo());
+ commit(sFunc, conf, storeConfig,
+ store.getSFile().getFuncSpec().toString());
+ }
+ }
+}
Added:
hadoop/pig/trunk/src/org/apache/pig/impl/builtin/DefaultIndexableLoader.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/builtin/DefaultIndexableLoader.java?rev=831051&view=auto
==============================================================================
---
hadoop/pig/trunk/src/org/apache/pig/impl/builtin/DefaultIndexableLoader.java
(added)
+++
hadoop/pig/trunk/src/org/apache/pig/impl/builtin/DefaultIndexableLoader.java
Thu Oct 29 18:00:26 2009
@@ -0,0 +1,342 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.impl.builtin;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.zip.GZIPInputStream;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.pig.ExecType;
+import org.apache.pig.FuncSpec;
+import org.apache.pig.IndexableLoadFunc;
+import org.apache.pig.LoadFunc;
+import org.apache.pig.PigException;
+import org.apache.pig.backend.datastorage.DataStorage;
+import org.apache.pig.backend.datastorage.SeekableInputStream;
+import org.apache.pig.backend.executionengine.ExecException;
+import
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
+import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.io.BufferedPositionedInputStream;
+import org.apache.pig.impl.io.FileLocalizer;
+import org.apache.pig.impl.io.FileSpec;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.impl.plan.NodeIdGenerator;
+import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.impl.util.ObjectSerializer;
+import org.apache.tools.bzip2r.CBZip2InputStream;
+
+/**
+ *
+ */
+public class DefaultIndexableLoader implements IndexableLoadFunc {
+
+
+ // FileSpec of index file which will be read from HDFS.
+ private String indexFile;
+ private String indexFileLoadFuncSpec;
+
+ private LoadFunc loader;
+ // Index is modeled as FIFO queue and LinkedList implements java Queue
interface.
+ private LinkedList<Tuple> index;
+ private FuncSpec rightLoaderFuncSpec;
+ private PigContext pc;
+ private String scope;
+ private Tuple dummyTuple = null;
+ private transient TupleFactory mTupleFactory;
+ private InputStream is;
+ private String currentFileName;
+
+ public DefaultIndexableLoader(String loaderFuncSpec, String indexFile,
String indexFileLoadFuncSpec, String scope) {
+ this.rightLoaderFuncSpec = new FuncSpec(loaderFuncSpec);
+ this.indexFile = indexFile;
+ this.indexFileLoadFuncSpec = indexFileLoadFuncSpec;
+ this.scope = scope;
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void seekNear(Tuple keys) throws IOException{
+
+ // some setup
+ mTupleFactory = TupleFactory.getInstance();
+
+ /* Currently whole of index is read into memory. Typically, index is
small. Usually
+ few KBs in size. So, this should not be an issue.
+ However, reading whole index at startup time is not required. So,
this can be improved upon.
+ Assumption: Index being read is sorted on keys followed by
filename, followed by offset.
+ */
+
+ // Index is modeled as FIFO Queue, that frees us from keeping track of
which index entry should be read next.
+
+ // the keys are sent in a tuple. If there is really only
+ // 1 join key, it would be the first field of the tuple. If
+ // there are multiple Join keys, the tuple itself represents
+ // the join key
+ Object firstLeftKey = (keys.size() == 1 ? keys.get(0): keys);
+
+ POLoad ld = new POLoad(genKey(), new FileSpec(indexFile, new
FuncSpec(indexFileLoadFuncSpec)), false);
+ try {
+ pc =
(PigContext)ObjectSerializer.deserialize(PigMapReduce.sJobConf.get("pig.pigContext"));
+ } catch (IOException e) {
+ int errCode = 2094;
+ String msg = "Unable to deserialize pig context.";
+ throw new ExecException(msg,errCode,e);
+ }
+ pc.connect();
+ ld.setPc(pc);
+ index = new LinkedList<Tuple>();
+ for(Result
res=ld.getNext(dummyTuple);res.returnStatus!=POStatus.STATUS_EOP;res=ld.getNext(dummyTuple))
+ index.offer((Tuple) res.result);
+
+ Tuple prevIdxEntry = null;
+ Tuple matchedEntry;
+
+ // When the first call is made, we need to seek into right input at
correct offset.
+ while(true){
+ // Keep looping till we find first entry in index >= left key
+ // then return the prev idx entry.
+
+ Tuple curIdxEntry = index.poll();
+ if(null == curIdxEntry){
+ // Its possible that we hit end of index and still doesn't
encounter
+ // idx entry >= left key, in that case return last index entry.
+ matchedEntry = prevIdxEntry;
+ break;
+ }
+ Object extractedKey = extractKeysFromIdxTuple(curIdxEntry);
+ if(extractedKey == null){
+ prevIdxEntry = curIdxEntry;
+ continue;
+ }
+
+ if(((Comparable)extractedKey).compareTo(firstLeftKey) >= 0){
+ if(null == prevIdxEntry) // very first entry in index.
+ matchedEntry = curIdxEntry;
+ else{
+ matchedEntry = prevIdxEntry;
+ index.addFirst(curIdxEntry); // We need to add back the
current index Entry because we are reading ahead.
+ }
+ break;
+ }
+ else
+ prevIdxEntry = curIdxEntry;
+ }
+
+ if(matchedEntry == null){
+
+ int errCode = 2165;
+ String errMsg = "Problem in index construction.";
+ throw new ExecException(errMsg,errCode,PigException.BUG);
+ }
+
+ Object extractedKey = extractKeysFromIdxTuple(matchedEntry);
+
+ if(extractedKey != null){
+ Class idxKeyClass = extractedKey.getClass();
+ if( ! firstLeftKey.getClass().equals(idxKeyClass)){
+
+ // This check should indeed be done on compile time. But to be
on safe side, we do it on runtime also.
+ int errCode = 2166;
+ String errMsg = "Key type mismatch. Found key of type
"+firstLeftKey.getClass().getCanonicalName()+" on left side. But, found key of
type "+ idxKeyClass.getCanonicalName()+" in index built for right side.";
+ throw new ExecException(errMsg,errCode,PigException.BUG);
+ }
+ }
+ initRightLoader(matchedEntry);
+ }
+
+ private void initRightLoader(Tuple idxEntry) throws IOException{
+
+ // bind loader to file pointed by this index Entry.
+ int keysCnt = idxEntry.size();
+ Long offset = (Long)idxEntry.get(keysCnt-1);
+ if(offset > 0)
+ // Loader will throw away one tuple if we are in the middle of the
block. We don't want that.
+ offset -= 1 ;
+ FileSpec lFile = new
FileSpec((String)idxEntry.get(keysCnt-2),this.rightLoaderFuncSpec);
+ currentFileName = lFile.getFileName();
+ loader =
(LoadFunc)PigContext.instantiateFuncFromSpec(lFile.getFuncSpec());
+ is = FileLocalizer.open(currentFileName, offset, pc);
+ if (currentFileName.endsWith(".bz") ||
currentFileName.endsWith(".bz2")) {
+ is = new CBZip2InputStream((SeekableInputStream)is, 9);
+ } else if (currentFileName.endsWith(".gz")) {
+ is = new GZIPInputStream(is);
+ }
+
+
+ loader.bindTo(currentFileName , new BufferedPositionedInputStream(is),
offset, Long.MAX_VALUE);
+ }
+
+ private Object extractKeysFromIdxTuple(Tuple idxTuple) throws
ExecException{
+
+ int idxTupSize = idxTuple.size();
+
+ if(idxTupSize == 3)
+ return idxTuple.get(0);
+
+ List<Object> list = new ArrayList<Object>(idxTupSize-2);
+ for(int i=0; i<idxTupSize-2;i++)
+ list.add(idxTuple.get(i));
+
+ return mTupleFactory.newTupleNoCopy(list);
+ }
+
+ private OperatorKey genKey(){
+ return new
OperatorKey(scope,NodeIdGenerator.getGenerator().getNextNodeId(scope));
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.pig.LoadFunc#bindTo(java.lang.String,
org.apache.pig.impl.io.BufferedPositionedInputStream, long, long)
+ */
+ @Override
+ public void bindTo(String fileName, BufferedPositionedInputStream is,
+ long offset, long end) throws IOException {
+
+
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.pig.LoadFunc#bytesToBag(byte[])
+ */
+ @Override
+ public DataBag bytesToBag(byte[] b) throws IOException {
+ throw new IOException("Method not implemented by design");
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.pig.LoadFunc#bytesToCharArray(byte[])
+ */
+ @Override
+ public String bytesToCharArray(byte[] b) throws IOException {
+ throw new IOException("Method not implemented by design");
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.pig.LoadFunc#bytesToDouble(byte[])
+ */
+ @Override
+ public Double bytesToDouble(byte[] b) throws IOException {
+ throw new IOException("Method not implemented by design");
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.pig.LoadFunc#bytesToFloat(byte[])
+ */
+ @Override
+ public Float bytesToFloat(byte[] b) throws IOException {
+ throw new IOException("Method not implemented by design");
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.pig.LoadFunc#bytesToInteger(byte[])
+ */
+ @Override
+ public Integer bytesToInteger(byte[] b) throws IOException {
+ throw new IOException("Method not implemented by design");
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.pig.LoadFunc#bytesToLong(byte[])
+ */
+ @Override
+ public Long bytesToLong(byte[] b) throws IOException {
+ throw new IOException("Method not implemented by design");
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.pig.LoadFunc#bytesToMap(byte[])
+ */
+ @Override
+ public Map<String, Object> bytesToMap(byte[] b) throws IOException {
+ throw new IOException("Method not implemented by design");
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.pig.LoadFunc#bytesToTuple(byte[])
+ */
+ @Override
+ public Tuple bytesToTuple(byte[] b) throws IOException {
+ throw new IOException("Method not implemented by design");
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.pig.LoadFunc#determineSchema(java.lang.String,
org.apache.pig.ExecType, org.apache.pig.backend.datastorage.DataStorage)
+ */
+ @Override
+ public Schema determineSchema(String fileName, ExecType execType,
+ DataStorage storage) throws IOException {
+ throw new IOException("Method not implemented by design");
+ }
+
+ /* (non-Javadoc)
+ * @see
org.apache.pig.LoadFunc#fieldsToRead(org.apache.pig.impl.logicalLayer.schema.Schema)
+ */
+ @Override
+ public void fieldsToRead(Schema schema) {
+
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.pig.LoadFunc#getNext()
+ */
+ @Override
+ public Tuple getNext() throws IOException {
+ Tuple t = loader.getNext();
+ if(t == null) {
+ while(true){ // But next file may be same
as previous one, because index may contain multiple entries for same file.
+ Tuple idxEntry = index.poll();
+ if(null == idxEntry) { // Index is finished too.
Right stream is finished. No more tuples.
+ return null;
+ } else {
+
if(currentFileName.equals((String)idxEntry.get(idxEntry.size()-2))) {
+ continue;
+ } else {
+ initRightLoader(idxEntry); // bind loader to file
and get tuple from it.
+ return loader.getNext();
+ }
+ }
+ }
+ }
+ return t;
+ }
+
+ @Override
+ public void close() throws IOException {
+ is.close();
+ }
+
+ /* (non-Javadoc)
+ * @see
org.apache.pig.IndexableLoadFunc#initialize(org.apache.hadoop.conf.Configuration)
+ */
+ @Override
+ public void initialize(Configuration conf) throws IOException {
+ // nothing to do
+
+ }
+
+}
Added: hadoop/pig/trunk/src/org/apache/pig/impl/util/Utils.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/util/Utils.java?rev=831051&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/util/Utils.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/util/Utils.java Thu Oct 29
18:00:26 2009
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.impl.util;
+
+/**
+ * Class with utility static methods
+ */
+public class Utils {
+
+ /**
+ * This method is a helper for classes to implement {...@link
java.lang.Object#equals(java.lang.Object)}
+ * checks if two objects are equals - two levels of checks are
+ * made - first if both are null or not null. If either is null,
+ * check is made whether both are null.
+ * If both are non null, equality also is checked if so indicated
+ * @param obj1 first object to be compared
+ * @param obj2 second object to be compared
+ * @param checkEquality flag to indicate whether object equality should
+ * be checked if obj1 and obj2 are non-null
+ * @return true if the two objects are equal
+ * false otherwise
+ */
+ public static boolean checkNullEquals(Object obj1, Object obj2, boolean
checkEquality) {
+ if(obj1 == null || obj2 == null) {
+ return obj1 == obj2;
+ }
+ if(checkEquality) {
+ if(!obj1.equals(obj2)) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+
+ /**
+ * This method is a helper for classes to implement {...@link
java.lang.Object#equals(java.lang.Object)}
+ * The method checks whether the two arguments are both null or both not
null and
+ * whether they are of the same class
+ * @param obj1 first object to compare
+ * @param obj2 second object to compare
+ * @return true if both objects are null or both are not null
+ * and if both are of the same class if not null
+ * false otherwise
+ */
+ public static boolean checkNullAndClass(Object obj1, Object obj2) {
+ if(checkNullEquals(obj1, obj2, false)) {
+ if(obj1 != null) {
+ return obj1.getClass() == obj2.getClass();
+ } else {
+ return true; // both obj1 and obj2 should be null
+ }
+ } else {
+ return false;
+ }
+ }
+
+
+}