Dear Wiki user, You have subscribed to a wiki page or wiki category on "Pig Wiki" for change notification.
The following page has been changed by AntonioMagnaghi: http://wiki.apache.org/pig/PigAbstractionFrontEnd New page: = Data-Storage Based Pig Front-End = This is a sample code fragment where `PigContext.java` has been adapted to use the Data Storage API defined above. {{{ - - // configuration for connecting to hadoop - transient private JobConf conf = null; @@ -79,16 +81,21 @@ + // configuration information for file system(s) + transient private DataStorageProperties fileSystemConf; + //main file system that jobs and shell commands access - transient private FileSystem dfs; + transient private DataStorage dfs; }}} {{{ @@ -195,21 +199,32 @@ } } - - lfs = FileSystem.getNamed("local", conf); - - mLogger.info("Connecting to hadoop file system at: " + conf.get("fs.default.name")); - dfs = FileSystem.get(conf); + HadoopDataStorageConfiguration conf = new HadoopDataStorageConfiguration(); + fileSystemConf = conf; + + lfs = new HadoopFileSystem(new URI(...), conf); + + mLogger.info("Connecting to hadoop file system at: " + fileSystemConf.getValue("fs.default.name")); + + dfs = new HadoopFileSystem (new URI(...), conf); + + mLogger.info("Connecting to map-reduce job tracker at: " + conf.getValue("mapred.job.tracker")); + - mLogger.info("Connecting to map-reduce job tracker at: " + conf.get("mapred.job.tracker")); - jobTracker = (JobSubmissionProtocol) RPC.getProxy(JobSubmissionProtocol.class, - JobSubmissionProtocol.versionID, JobTracker.getAddress(conf), conf); - jobClient = new JobClient(conf); + HadoopExecutionEngineConfiguration execConf = new HadoopExecutionEngineConfiguration(); + backEndConf = execConf; + jobClient = new HadoopExecutionEngine(execConf); }else{ - conf = new JobConf(); - lfs = FileSystem.getNamed("local", conf); - dfs = lfs; // for local execution, the "dfs" is the local file system + HadoopDataStorageConfiguration conf = new HadoopDataStorageConfiguration(); + fileSystemConf = conf; + + lfs = new HadoopFileSystem(new URI(...), new HadoopDataStorageConfiguration(conf)); + + dfs = lfs; // for local execution, the "dfs" is the local file system } }catch (IOException e){ }}} These are sample code fragments from `PigServer.java`. Operations that previously utilized the Hadoop file system directly, now have been adapted to use the Data Storage API defined above. {{{ @@ -485,37 +565,99 @@ * @return * @throws IOException */ - public long fileSize(String filename) throws IOException { - FileSystem dfs = pigContext.getDfs(); - Path p = new Path(filename); - long len = dfs.getLength(p); - long replication = dfs.getDefaultReplication(); - return len * replication; + public long fileSize(String name) throws IOException { + try { + DataStorage dfs = pigContext.getDfs(); + DataStorageElementDescriptor elem = dfs.asElement(name); + DataStorageProperties elemStats = elem.getStatistics(); + + long len = new Long(elemStats. getValue("length.bytes").toString()); + long replication = new Long(elemStats. getValue("replication").toString()); + + return len * replication; + } + catch (DataStorageException e) + { + return 0; + } } }}} {{{ - public boolean deleteFile(String filename) throws IOException { - return pigContext.getDfs().delete(new Path(filename)); + public boolean deleteFile(String name) throws IOException { + try { + DataStorage ds = pigContext.getDfs(); + DataStorageElementDescriptor elem = ds.asElement(name); + + return elem.delete(); + } + catch (DataStorageException e) + { + throw new IOException(e); + } } }}}