Author: hashutosh Date: Wed Jul 28 00:36:56 2010 New Revision: 979917 URL: http://svn.apache.org/viewvc?rev=979917&view=rev Log: PIG-1229: allow pig to write output into a JDBC db
Added: hadoop/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/DBStorage.java hadoop/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestDBStorage.java Modified: hadoop/pig/trunk/contrib/CHANGES.txt hadoop/pig/trunk/contrib/piggybank/java/build.xml hadoop/pig/trunk/ivy.xml hadoop/pig/trunk/ivy/libraries.properties Modified: hadoop/pig/trunk/contrib/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/CHANGES.txt?rev=979917&r1=979916&r2=979917&view=diff ============================================================================== --- hadoop/pig/trunk/contrib/CHANGES.txt (original) +++ hadoop/pig/trunk/contrib/CHANGES.txt Wed Jul 28 00:36:56 2010 @@ -24,6 +24,8 @@ INCOMPATIBLE CHANGES IMPROVEMENTS +PIG-1229 allow pig to write output into a JDBC db (ankur via hashutosh) + PIG-1385 UDF to create tuples and bags (hcbusy via gates) PIG-1331 Add Owl as a contrib project (ajaykidave via gates) Modified: hadoop/pig/trunk/contrib/piggybank/java/build.xml URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/piggybank/java/build.xml?rev=979917&r1=979916&r2=979917&view=diff ============================================================================== --- hadoop/pig/trunk/contrib/piggybank/java/build.xml (original) +++ hadoop/pig/trunk/contrib/piggybank/java/build.xml Wed Jul 28 00:36:56 2010 @@ -37,6 +37,7 @@ <property name="pigtest" value="../../../build/test/classes" /> <property name="udfjar" value="piggybank.jar" /> <property name="src.dir" value="src/main/java/org/apache/pig/piggybank" /> + <property name="hsqldb.jar" value="../../../build/ivy/lib/Pig/hsqldb-1.8.0.10.jar"/> <!-- jar properties --> <property name=".javadoc" value="${build.docs}/api" /> @@ -70,6 +71,7 @@ <pathelement location="${hadoopjar}"/> <pathelement location="${pigtest}"/> <pathelement location="${hive_execjar}"/> + <pathelement location="${hsqldb.jar}"/> </path> <path id="test.classpath"> Added: hadoop/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/DBStorage.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/DBStorage.java?rev=979917&view=auto ============================================================================== --- hadoop/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/DBStorage.java (added) +++ hadoop/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/DBStorage.java Wed Jul 28 00:36:56 2010 @@ -0,0 +1,300 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pig.piggybank.storage; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.OutputCommitter; +import org.apache.hadoop.mapreduce.OutputFormat; +import org.apache.hadoop.mapreduce.RecordWriter; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.pig.StoreFunc; +import org.apache.pig.backend.executionengine.ExecException; +import org.apache.pig.data.DataByteArray; +import org.apache.pig.data.DataType; +import org.apache.pig.data.Tuple; + +import java.io.IOException; +import java.sql.*; + +public class DBStorage extends StoreFunc { + private final Log log = LogFactory.getLog(getClass()); + + private PreparedStatement ps; + private Connection con; + private String jdbcURL; + private String user; + private String pass; + private int batchSize; + private int count = 0; + private String insertQuery; + + public DBStorage(String driver, String jdbcURL, String insertQuery) { + this(driver, jdbcURL, null, null, insertQuery, "100"); + } + + public DBStorage(String driver, String jdbcURL, String user, String pass, + String insertQuery) throws SQLException { + this(driver, jdbcURL, user, pass, insertQuery, "100"); + } + + public DBStorage(String driver, String jdbcURL, String user, String pass, + String insertQuery, String batchSize) throws RuntimeException { + log.debug("DBStorage(" + driver + "," + jdbcURL + "," + user + ",XXXX," + + insertQuery + ")"); + try { + Class.forName(driver); + } catch (ClassNotFoundException e) { + log.error("can't load DB driver:" + driver, e); + throw new RuntimeException("Can't load DB Driver", e); + } + this.jdbcURL = jdbcURL; + this.user = user; + this.pass = pass; + this.insertQuery = insertQuery; + this.batchSize = Integer.parseInt(batchSize); + } + + /** + * Write the tuple to Database directly here. + */ + public void putNext(Tuple tuple) throws IOException { + int sqlPos = 1; + try { + int size = tuple.size(); + for (int i = 0; i < size; i++) { + try { + Object field = tuple.get(i); + + switch (DataType.findType(field)) { + case DataType.NULL: + ps.setNull(sqlPos, java.sql.Types.VARCHAR); + sqlPos++; + break; + + case DataType.BOOLEAN: + ps.setBoolean(sqlPos, (Boolean) field); + sqlPos++; + break; + + case DataType.INTEGER: + ps.setInt(sqlPos, (Integer) field); + sqlPos++; + break; + + case DataType.LONG: + ps.setLong(sqlPos, (Long) field); + sqlPos++; + break; + + case DataType.FLOAT: + ps.setFloat(sqlPos, (Float) field); + sqlPos++; + break; + + case DataType.DOUBLE: + ps.setDouble(sqlPos, (Double) field); + sqlPos++; + break; + + case DataType.BYTEARRAY: + byte[] b = ((DataByteArray) field).get(); + ps.setBytes(sqlPos, b); + + sqlPos++; + break; + case DataType.CHARARRAY: + ps.setString(sqlPos, (String) field); + sqlPos++; + break; + case DataType.BYTE: + ps.setByte(sqlPos, (Byte) field); + sqlPos++; + break; + + case DataType.MAP: + case DataType.TUPLE: + case DataType.BAG: + throw new RuntimeException("Cannot store a non-flat tuple " + + "using DbStorage"); + + default: + throw new RuntimeException("Unknown datatype " + + DataType.findType(field)); + + } + + } catch (ExecException ee) { + throw new RuntimeException(ee); + } + + } + ps.addBatch(); + count++; + if (count > batchSize) { + count = 0; + ps.executeBatch(); + ps.clearBatch(); + ps.clearParameters(); + } + } catch (SQLException e) { + try { + log + .error("Unable to insert record:" + tuple.toDelimitedString("\t"), + e); + } catch (ExecException ee) { + // do nothing + } + if (e.getErrorCode() == 1366) { + // errors that come due to utf-8 character encoding + // ignore these kind of errors TODO: Temporary fix - need to find a + // better way of handling them in the argument statement itself + } else { + throw new RuntimeException("JDBC error", e); + } + } + } + + class MyDBOutputFormat extends OutputFormat<NullWritable, NullWritable> { + + @Override + public void checkOutputSpecs(JobContext context) throws IOException, + InterruptedException { + // IGNORE + } + + @Override + public OutputCommitter getOutputCommitter(TaskAttemptContext context) + throws IOException, InterruptedException { + return new OutputCommitter() { + + @Override + public void abortTask(TaskAttemptContext context) throws IOException { + try { + if (ps != null) { + ps.close(); + } + if (con != null) { + con.rollback(); + con.close(); + } + } catch (SQLException sqe) { + throw new IOException(sqe); + } + } + + @Override + public void commitTask(TaskAttemptContext context) throws IOException { + if (ps != null) { + try { + ps.executeBatch(); + con.commit(); + ps.close(); + con.close(); + ps = null; + con = null; + } catch (SQLException e) { + log.error("ps.close", e); + throw new IOException("JDBC Error", e); + } + } + } + + @Override + public boolean needsTaskCommit(TaskAttemptContext context) + throws IOException { + return true; + } + + @Override + public void cleanupJob(JobContext context) throws IOException { + // IGNORE + } + + @Override + public void setupJob(JobContext context) throws IOException { + // IGNORE + } + + @Override + public void setupTask(TaskAttemptContext context) throws IOException { + // IGNORE + } + }; + } + + @Override + public RecordWriter<NullWritable, NullWritable> getRecordWriter( + TaskAttemptContext context) throws IOException, InterruptedException { + // We don't use a record writer to write to database + return new RecordWriter<NullWritable, NullWritable>() { + @Override + public void close(TaskAttemptContext context) { + // Noop + } + @Override + public void write(NullWritable k, NullWritable v) { + // Noop + } + }; + } + + } + + @SuppressWarnings("unchecked") + @Override + public OutputFormat getOutputFormat() + throws IOException { + return new MyDBOutputFormat(); + } + + /** + * Initialise the database connection and prepared statement here. + */ + @SuppressWarnings("unchecked") + @Override + public void prepareToWrite(RecordWriter writer) + throws IOException { + ps = null; + con = null; + if (insertQuery == null) { + throw new IOException("SQL Insert command not specified"); + } + try { + if (user == null || pass == null) { + con = DriverManager.getConnection(jdbcURL); + } else { + con = DriverManager.getConnection(jdbcURL, user, pass); + } + con.setAutoCommit(false); + ps = con.prepareStatement(insertQuery); + } catch (SQLException e) { + log.error("Unable to connect to JDBC @" + jdbcURL); + throw new IOException("JDBC Error", e); + } + count = 0; + } + + @Override + public void setStoreLocation(String location, Job job) throws IOException { + // IGNORE since we are writing records to DB. + } +} Added: hadoop/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestDBStorage.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestDBStorage.java?rev=979917&view=auto ============================================================================== --- hadoop/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestDBStorage.java (added) +++ hadoop/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestDBStorage.java Wed Jul 28 00:36:56 2010 @@ -0,0 +1,188 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.pig.piggybank.test.storage; + +import java.io.File; +import java.io.FileWriter; +import java.io.FilenameFilter; +import java.io.IOException; +import java.io.PrintWriter; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.List; + +import org.apache.pig.ExecType; +import org.apache.pig.PigServer; +import org.apache.pig.backend.executionengine.ExecException; +import org.apache.pig.backend.executionengine.ExecJob; +import org.apache.pig.test.MiniCluster; +import org.apache.pig.test.Util; +import org.hsqldb.Server; +import org.junit.After; +import org.junit.Before; + +import junit.framework.TestCase; + +public class TestDBStorage extends TestCase { + + private PigServer pigServer; + private MiniCluster cluster; + private Server dbServer; + private String driver = "org.hsqldb.jdbcDriver"; + // private String url = "jdbc:hsqldb:mem:."; + private String dblocation = "/tmp/batchtest"; + private String url = "jdbc:hsqldb:file:" + dblocation + + ";hsqldb.default_table_type=cached;hsqldb.cache_rows=100"; + private String user = "sa"; + private String password = ""; + + private static final String INPUT_FILE = "datafile.txt"; + + public TestDBStorage() throws ExecException, IOException { + // Initialise Pig server + cluster = MiniCluster.buildCluster(); + pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties()); + pigServer.getPigContext().getProperties() + .setProperty("mapred.map.max.attempts", "1"); + pigServer.getPigContext().getProperties() + .setProperty("mapred.reduce.max.attempts", "1"); + System.out.println("Pig server initialized successfully"); + // Initialise DBServer + dbServer = new Server(); + dbServer.setDatabaseName(0, "batchtest"); + // dbServer.setDatabasePath(0, "mem:test;sql.enforce_strict_size=true"); + dbServer.setDatabasePath(0, + "file:/tmp/batchtest;sql.enforce_strict_size=true"); + dbServer.setLogWriter(null); + dbServer.setErrWriter(null); + try { + Class.forName(driver); + } catch (Exception e) { + e.printStackTrace(); + System.out.println(this + ".setUp() error: " + e.getMessage()); + } + System.out.println("Database server initialized successfully"); + } + + private void createFile() throws IOException { + PrintWriter w = new PrintWriter(new FileWriter(INPUT_FILE)); + w = new PrintWriter(new FileWriter(INPUT_FILE)); + w.println("100\tapple\t1.0"); + w.println("100\torange\t2.0"); + w.println("100\tbanana\t1.1"); + w.close(); + Util.copyFromLocalToCluster(cluster, INPUT_FILE, INPUT_FILE); + } + + private void createTable() throws IOException { + Connection con = null; + String sql = "create table ttt (id integer, name varchar(32), ratio double)"; + try { + con = DriverManager.getConnection(url, user, password); + } catch (SQLException sqe) { + throw new IOException("Unable to obtain a connection to the database", + sqe); + } + try { + Statement st = con.createStatement(); + st.executeUpdate(sql); + st.close(); + con.close(); + } catch (SQLException sqe) { + throw new IOException("Cannot create table", sqe); + } + } + + @Before + public void setUp() throws IOException { + createFile(); + createTable(); + } + + @After + public void tearDown() throws IOException { + new File(INPUT_FILE).delete(); + Util.deleteFile(cluster, INPUT_FILE); + pigServer.shutdown(); + dbServer.stop(); + + File[] dbFiles = new File("/tmp").listFiles(new FilenameFilter() { + @Override + public boolean accept(File dir, String name) { + if (name.startsWith("batchtest")) { + return true; + } else { + return false; + } + } + }); + if (dbFiles != null) { + for (File file : dbFiles) { + file.delete(); + } + } + } + + public void testWriteToDB() throws IOException { + String insertQuery = "insert into ttt (id, name, ratio) values (?,?,?)"; + String dbStore = "org.apache.pig.piggybank.storage.DBStorage('" + driver + + "', '" + url + "','" + insertQuery + "');"; + pigServer.setBatchOn(); + pigServer.registerQuery("A = LOAD '" + INPUT_FILE + + "' as (id:int, fruit:chararray, ratio:double);"); + pigServer.registerQuery("STORE A INTO 'dummy' USING " + dbStore); + ExecJob job = pigServer.executeBatch().get(0); + try { + while(!job.hasCompleted()) Thread.sleep(1000); + } catch(InterruptedException ie) {// ignore + } + + assertNotSame("Failed: " + job.getException(), job.getStatus(), + ExecJob.JOB_STATUS.FAILED); + + Connection con = null; + String selectQuery = "select id, name, ratio from ttt order by name"; + try { + con = DriverManager.getConnection(url, user, password); + } catch (SQLException sqe) { + throw new IOException( + "Unable to obtain database connection for data verification", sqe); + } + try { + PreparedStatement ps = con.prepareStatement(selectQuery); + ResultSet rs = ps.executeQuery(); + + int expId = 100; + String[] expNames = { "apple", "banana", "orange" }; + double[] expRatios = { 1.0, 1.1, 2.0 }; + for (int i = 0; i < 3 && rs.next(); i++) { + assertEquals("Id mismatch", expId, rs.getInt(1)); + assertEquals("Name mismatch", expNames[i], rs.getString(2)); + assertEquals("Ratio mismatch", expRatios[i], rs.getDouble(3), 0.0001); + } + } catch (SQLException sqe) { + throw new IOException( + "Unable to read data from database for verification", sqe); + } + } +} Modified: hadoop/pig/trunk/ivy.xml URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/ivy.xml?rev=979917&r1=979916&r2=979917&view=diff ============================================================================== --- hadoop/pig/trunk/ivy.xml (original) +++ hadoop/pig/trunk/ivy.xml Wed Jul 28 00:36:56 2010 @@ -74,5 +74,7 @@ <dependency org="joda-time" name="joda-time" rev="${joda-time.version}" conf="compile->master"/> <dependency org="org.python" name="jython" rev="${jython.version}" conf="compile->master"/> <!--ATM hbase, hbase-test.jar, hadoop.jar are resolved from the lib dir--> + <dependency org="hsqldb" name="hsqldb" rev="${hsqldb.version}" + conf="test->default" /> </dependencies> </ivy-module> Modified: hadoop/pig/trunk/ivy/libraries.properties URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/ivy/libraries.properties?rev=979917&r1=979916&r2=979917&view=diff ============================================================================== --- hadoop/pig/trunk/ivy/libraries.properties (original) +++ hadoop/pig/trunk/ivy/libraries.properties Wed Jul 28 00:36:56 2010 @@ -22,6 +22,8 @@ commons-cli.version=1.0 commons-logging.version=1.0.3 checkstyle.version=4.2 +hsqldb.version=1.8.0.10 + ivy.version=2.0.0-rc2 javacc.version=4.2