Dear Wiki user,

You have subscribed to a wiki page or wiki category on "Pig Wiki" for change 
notification.

The following page has been changed by AntonioMagnaghi:
http://wiki.apache.org/pig/PigAbstractionHadoopBackEnd

New page:
= Data-Statorage Hadoop Back-End =

These are code fragments where new classes implement the Data Storage API on 
top of the Hadoop file system.

{{{
package org.apache.pig.hadoop.fs;

import org.apache.pig.datastorage.*;
import org.apache.hadoop.fs.FileSystem;
import java.net.URI;
import java.io.IOException;

public class HadoopFileSystem implements DataStorage {

        protected FileSystem hfs;  
        protected HadoopDataStorageConfiguration conf;
        
        public HadoopFileSystem(URI uri, HadoopDataStorageConfiguration 
configuration) throws IOException {
                conf = configuration;
                hfs = FileSystem.get(uri, configuration.getHadoopConf());
        }
        
        public void init() {
         ...
        }

}
}}}

{{{
package org.apache.pig.hadoop.fs;

import java.io.IOException;

import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.pig.datastorage.*;

public abstract class HadoopPath implements DataStorageElementDescriptor {

        protected Path path;
        protected FileSystem fs;

        public int compareTo(Object obj) {
                return path.compareTo(((HadoopDirectory) obj).path);
        }

        public boolean exists() {
                try {
                        return fs.exists(path);
                }
                catch (IOException e) {
                        return false;
                }
        }
      
      ...
}
}}}

{{{
package org.apache.pig.hadoop.fs;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.FileSystem;

import org.apache.pig.datastorage.*;


public class HadoopFile extends HadoopPath
                        implements DataStorageElementDescriptor {

        public HadoopFile(FileSystem fs, String parent, String child) {
                this.fs = fs;
                path = new Path(parent, child);
        }
        
        public HadoopFile(FileSystem fs, HadoopDirectory parent, String child) {
                this.fs = fs;
                path = new Path(parent.toString(), child);
        }
        
        public HadoopFile(FileSystem fs, String pathString) {
                this.fs = fs;
                path = new Path(pathString);
        }
        
        public DataStorageProperties getConfiguration() {
                // TODO: file specific conf goes here
                return null;
        }
        
        public DataStorageProperties getStatistics() {
                // TODO: file specific stats go here
                return null;
        }

      ...
}
}}}

{{{
package org.apache.pig.hadoop.fs;

import java.util.Iterator;
import java.io.IOException;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.FileSystem;

import org.apache.pig.datastorage.*;

public class HadoopDirectory extends HadoopPath
                             implements DataStorageContainerDescriptor {
        
        public HadoopDirectory(FileSystem fs, String parent, String child) {
                this.fs = fs;
                path = new Path(parent, child);
        }
        
        public HadoopDirectory(FileSystem fs, HadoopDirectory parent, String 
child) {
                this.fs = fs;
                path = new Path(parent.toString(), child);
        }
        
        public HadoopDirectory(FileSystem fs, String pathString) {
                this.fs = fs;
                path = new Path(pathString);
        }


        public Iterator<DataStorageElementDescriptor> iterator() {
                Path[] allPaths;
                
                try {
                        allPaths = fs.listPaths(path);
                }
                catch (IOException e) {
                        allPaths = new Path[ 0 ];
                }
                
                Vector<DataStorageElementDescriptor> descriptors = 
                        new Vector<DataStorageElementDescriptor>();
                
                for (int i = 0; i < allPaths.length; ++i) {
                        if (fs.isFile(allPaths[ i ])) {
                                descriptors.add(new HadoopFile(fs,allPaths[ i 
].toString()));
                        }
                        else {
                                descriptors.add(new 
HadoopDirectory(fs,allPaths[ i ].
                             toString()));                              
                        }
                                
                }

                return descriptors.iterator();
        }
                                
      ...
}
}}}

Reply via email to