Repository: flink
Updated Branches:
  refs/heads/master d570d078a -> 30761572b


[FLINK-2005] Remove Record API from jdbc module

This closes #982


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/06b37bf5
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/06b37bf5
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/06b37bf5

Branch: refs/heads/master
Commit: 06b37bf550315bd1d5be7dc3ed6638fd21768e1a
Parents: d570d07
Author: zentol <s.mo...@web.de>
Authored: Tue Aug 4 12:45:22 2015 +0200
Committer: Fabian Hueske <fhue...@apache.org>
Committed: Tue Aug 4 18:13:30 2015 +0200

----------------------------------------------------------------------
 flink-staging/flink-jdbc/pom.xml                |  12 -
 .../java/record/io/jdbc/JDBCInputFormat.java    | 389 -------------------
 .../java/record/io/jdbc/JDBCOutputFormat.java   | 359 -----------------
 .../record/io/jdbc/example/JDBCExample.java     | 136 -------
 .../java/record/io/jdbc/DevNullLogStream.java   |  30 --
 .../record/io/jdbc/JDBCInputFormatTest.java     | 214 ----------
 .../record/io/jdbc/JDBCOutputFormatTest.java    | 225 -----------
 7 files changed, 1365 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/06b37bf5/flink-staging/flink-jdbc/pom.xml
----------------------------------------------------------------------
diff --git a/flink-staging/flink-jdbc/pom.xml b/flink-staging/flink-jdbc/pom.xml
index 7b499a7..a3976c1 100644
--- a/flink-staging/flink-jdbc/pom.xml
+++ b/flink-staging/flink-jdbc/pom.xml
@@ -41,18 +41,6 @@ under the License.
                        <artifactId>flink-java</artifactId>
                        <version>${project.version}</version>
                </dependency>
-               
-               <dependency>
-                       <groupId>org.apache.flink</groupId>
-                       <artifactId>flink-core</artifactId>
-                       <version>${project.version}</version>
-               </dependency>
-                               
-               <dependency>
-                       <groupId>org.apache.flink</groupId>
-                       <artifactId>flink-clients</artifactId>
-                       <version>${project.version}</version>
-               </dependency>
 
                <dependency>
                        <groupId>org.apache.derby</groupId>

http://git-wip-us.apache.org/repos/asf/flink/blob/06b37bf5/flink-staging/flink-jdbc/src/main/java/org/apache/flink/api/java/record/io/jdbc/JDBCInputFormat.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-jdbc/src/main/java/org/apache/flink/api/java/record/io/jdbc/JDBCInputFormat.java
 
b/flink-staging/flink-jdbc/src/main/java/org/apache/flink/api/java/record/io/jdbc/JDBCInputFormat.java
deleted file mode 100644
index 3cd295b..0000000
--- 
a/flink-staging/flink-jdbc/src/main/java/org/apache/flink/api/java/record/io/jdbc/JDBCInputFormat.java
+++ /dev/null
@@ -1,389 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.record.io.jdbc;
-
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.ResultSet;
-import java.sql.ResultSetMetaData;
-import java.sql.SQLException;
-import java.sql.Statement;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.flink.api.common.io.NonParallelInput;
-import org.apache.flink.api.java.record.io.GenericInputFormat;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.types.BooleanValue;
-import org.apache.flink.types.DoubleValue;
-import org.apache.flink.types.FloatValue;
-import org.apache.flink.types.IntValue;
-import org.apache.flink.types.LongValue;
-import org.apache.flink.types.NullValue;
-import org.apache.flink.types.Record;
-import org.apache.flink.types.ShortValue;
-import org.apache.flink.types.StringValue;
-
-/**
- * InputFormat to read data from a database and generate PactReords.
- * The InputFormat has to be configured with the query, and either all
- * connection parameters or a complete database URL.{@link Configuration} The 
position of a value inside a Record is
- * determined by the table
- * returned.
- * 
- * @see Configuration
- * @see Record
- * @see DriverManager
- */
-public class JDBCInputFormat extends GenericInputFormat implements 
NonParallelInput {
-
-       private static final long serialVersionUID = 1L;
-       
-       @SuppressWarnings("unused")
-       private static final Logger LOG = 
LoggerFactory.getLogger(JDBCInputFormat.class);
-       
-
-       public final String DRIVER_KEY = "driver";
-       public final String USERNAME_KEY = "username";
-       public final String PASSWORD_KEY = "password";
-       public final String URL_KEY = "url";
-       public final String QUERY_KEY = "query";
-
-
-       private String username;
-       private String password;
-       private String driverName;
-       private String dbURL;
-       private String query;
-
-       
-       private transient Connection dbConn;
-       private transient Statement statement;
-       private transient ResultSet resultSet;
-
-
-       /**
-        * Creates a non-configured JDBCInputFormat. This format has to be
-        * configured using configure(configuration).
-        */
-       public JDBCInputFormat() {}
-
-       /**
-        * Creates a JDBCInputFormat and configures it.
-        * 
-        * @param driverName
-        *        JDBC-Drivename
-        * @param dbURL
-        *        Formatted URL containing all connection parameters.
-        * @param username
-        * @param password
-        * @param query
-        *        Query to execute.
-        */
-       public JDBCInputFormat(String driverName, String dbURL, String 
username, String password, String query) {
-               this.driverName = driverName;
-               this.query = query;
-               this.dbURL = dbURL;
-               this.username = username;
-               this.password = password;
-       }
-
-       /**
-        * Creates a JDBCInputFormat and configures it.
-        * 
-        * @param driverName
-        *        JDBC-Drivername
-        * @param dbURL
-        *        Formatted URL containing all connection parameters.
-        * @param query
-        *        Query to execute.
-        */
-       public JDBCInputFormat(String driverName, String dbURL, String query) {
-               this(driverName, dbURL, "", "", query);
-       }
-
-       /**
-        * Creates a JDBCInputFormat and configures it.
-        * 
-        * @param parameters
-        *        Configuration with all connection parameters.
-        * @param query
-        *        Query to execute.
-        */
-       public JDBCInputFormat(Configuration parameters, String query) {
-               this.driverName = parameters.getString(DRIVER_KEY, "");
-               this.username = parameters.getString(USERNAME_KEY, "");
-               this.password = parameters.getString(PASSWORD_KEY, "");
-               this.dbURL = parameters.getString(URL_KEY, "");
-               this.query = query;
-       }
-
-       
-       /**
-        * Configures this JDBCInputFormat. This includes setting the connection
-        * parameters (if necessary), establishing the connection and executing 
the
-        * query.
-        * 
-        * @param parameters
-        *        Configuration containing all or no parameters.
-        */
-       @Override
-       public void configure(Configuration parameters) {
-               boolean needConfigure = isFieldNullOrEmpty(this.query) || 
isFieldNullOrEmpty(this.dbURL);
-               if (needConfigure) {
-                       this.driverName = parameters.getString(DRIVER_KEY, 
null);
-                       this.username = parameters.getString(USERNAME_KEY, 
null);
-                       this.password = parameters.getString(PASSWORD_KEY, 
null);
-                       this.query = parameters.getString(QUERY_KEY, null);
-                       this.dbURL = parameters.getString(URL_KEY, null);
-               }
-
-               try {
-                       prepareQueryExecution();
-               } catch (SQLException e) {
-                       throw new IllegalArgumentException("Configure 
failed:\t!", e);
-               }
-       }
-
-       /**
-        * Enters data value from the current resultSet into a Record.
-        * 
-        * @param pos
-        *        Record position to be set.
-        * @param type
-        *        SQL type of the resultSet value.
-        * @param record
-        *        Target Record.
-        */
-       private void retrieveTypeAndFillRecord(int pos, int type, Record 
record) throws SQLException,
-                       NotTransformableSQLFieldException {
-               switch (type) {
-               case java.sql.Types.NULL:
-                       record.setField(pos, NullValue.getInstance());
-                       break;
-               case java.sql.Types.BOOLEAN:
-                       record.setField(pos, new 
BooleanValue(resultSet.getBoolean(pos + 1)));
-                       break;
-               case java.sql.Types.BIT:
-                       record.setField(pos, new 
BooleanValue(resultSet.getBoolean(pos + 1)));
-                       break;
-               case java.sql.Types.CHAR:
-                       record.setField(pos, new 
StringValue(resultSet.getString(pos + 1)));
-                       break;
-               case java.sql.Types.NCHAR:
-                       record.setField(pos, new 
StringValue(resultSet.getString(pos + 1)));
-                       break;
-               case java.sql.Types.VARCHAR:
-                       record.setField(pos, new 
StringValue(resultSet.getString(pos + 1)));
-                       break;
-               case java.sql.Types.LONGVARCHAR:
-                       record.setField(pos, new 
StringValue(resultSet.getString(pos + 1)));
-                       break;
-               case java.sql.Types.LONGNVARCHAR:
-                       record.setField(pos, new 
StringValue(resultSet.getString(pos + 1)));
-                       break;
-               case java.sql.Types.TINYINT:
-                       record.setField(pos, new 
ShortValue(resultSet.getShort(pos + 1)));
-                       break;
-               case java.sql.Types.SMALLINT:
-                       record.setField(pos, new 
ShortValue(resultSet.getShort(pos + 1)));
-                       break;
-               case java.sql.Types.BIGINT:
-                       record.setField(pos, new 
LongValue(resultSet.getLong(pos + 1)));
-                       break;
-               case java.sql.Types.INTEGER:
-                       record.setField(pos, new IntValue(resultSet.getInt(pos 
+ 1)));
-                       break;
-               case java.sql.Types.FLOAT:
-                       record.setField(pos, new 
DoubleValue(resultSet.getDouble(pos + 1)));
-                       break;
-               case java.sql.Types.REAL:
-                       record.setField(pos, new 
FloatValue(resultSet.getFloat(pos + 1)));
-                       break;
-               case java.sql.Types.DOUBLE:
-                       record.setField(pos, new 
DoubleValue(resultSet.getDouble(pos + 1)));
-                       break;
-               case java.sql.Types.DECIMAL:
-                       record.setField(pos, new 
DoubleValue(resultSet.getBigDecimal(pos + 1).doubleValue()));
-                       break;
-               case java.sql.Types.NUMERIC:
-                       record.setField(pos, new 
DoubleValue(resultSet.getBigDecimal(pos + 1).doubleValue()));
-                       break;
-               case java.sql.Types.DATE:
-                       record.setField(pos, new 
StringValue(resultSet.getDate(pos + 1).toString()));
-                       break;
-               case java.sql.Types.TIME:
-                       record.setField(pos, new 
LongValue(resultSet.getTime(pos + 1).getTime()));
-                       break;
-               case java.sql.Types.TIMESTAMP:
-                       record.setField(pos, new 
StringValue(resultSet.getTimestamp(pos + 1).toString()));
-                       break;
-               case java.sql.Types.SQLXML:
-                       record.setField(pos, new 
StringValue(resultSet.getSQLXML(pos + 1).toString()));
-                       break;
-               default:
-                       throw new NotTransformableSQLFieldException("Unknown 
sql-type [" + type + "]on column [" + pos + "]");
-
-                       // case java.sql.Types.BINARY:
-                       // case java.sql.Types.VARBINARY:
-                       // case java.sql.Types.LONGVARBINARY:
-                       // case java.sql.Types.ARRAY:
-                       // case java.sql.Types.JAVA_OBJECT:
-                       // case java.sql.Types.BLOB:
-                       // case java.sql.Types.CLOB:
-                       // case java.sql.Types.NCLOB:
-                       // case java.sql.Types.DATALINK:
-                       // case java.sql.Types.DISTINCT:
-                       // case java.sql.Types.OTHER:
-                       // case java.sql.Types.REF:
-                       // case java.sql.Types.ROWID:
-                       // case java.sql.Types.STRUCT:
-               }
-       }
-
-       private boolean isFieldNullOrEmpty(String field) {
-               return (field == null || field.length() == 0);
-       }
-
-       private void prepareQueryExecution() throws SQLException {
-               setClassForDBType();
-               prepareCredentialsAndExecute();
-       }
-
-       /**
-        * Loads appropriate JDBC driver.
-        * 
-        * @param dbType
-        *        Type of the database.
-        * @return boolean value, indication whether an appropriate driver 
could be
-        *         found.
-        */
-       private void setClassForDBType() {
-               try {
-                       Class.forName(driverName);
-               } catch (ClassNotFoundException cnfe) {
-                       throw new IllegalArgumentException("JDBC-Class not 
found:\t" + cnfe.getLocalizedMessage());
-               }
-       }
-
-       private void prepareCredentialsAndExecute() throws SQLException {
-               if (isFieldNullOrEmpty(username)) {
-                       prepareConnection(dbURL);
-               } else {
-                       prepareConnection();
-               }
-               executeQuery();
-       }
-
-       /**
-        * Establishes a connection to a database.
-        * 
-        * @param dbURL
-        *        Assembled URL containing all connection parameters.
-        * @return boolean value, indicating whether a connection could be
-        *         established
-        */
-       private void prepareConnection(String dbURL) throws SQLException {
-               dbConn = DriverManager.getConnection(dbURL);
-       }
-
-       /**
-        * Assembles the Database URL and establishes a connection.
-        * 
-        * @param dbType
-        *        Type of the database.
-        * @param username
-        *        Login username.
-        * @param password
-        *        Login password.
-        * @return boolean value, indicating whether a connection could be
-        *         established
-        */
-       private void prepareConnection() throws SQLException {
-               dbConn = DriverManager.getConnection(dbURL, username, password);
-       }
-
-       private void executeQuery() throws SQLException {
-               statement = 
dbConn.createStatement(ResultSet.TYPE_SCROLL_INSENSITIVE, 
ResultSet.CONCUR_READ_ONLY);
-               resultSet = statement.executeQuery(this.query);
-       }
-
-       /**
-        * Checks whether all data has been read.
-        * 
-        * @return boolean value indication whether all data has been read.
-        */
-       @Override
-       public boolean reachedEnd() {
-               try {
-                       if (resultSet.isLast()) {
-                               resultSet.close();
-                               statement.close();
-                               dbConn.close();
-                               return true;
-                       } else {
-                               return false;
-                       }
-               } catch (SQLException e) {
-                       throw new IllegalArgumentException("Couldn't evaluate 
reachedEnd():\t" + e.getMessage());
-               } catch (NullPointerException e) {
-                       throw new IllegalArgumentException("Couldn't access 
resultSet:\t" + e.getMessage());
-               }
-       }
-
-       /**
-        * Stores the next resultSet row in a Record
-        * 
-        * @param record
-        *        target Record
-        * @return boolean value indicating that the operation was successful
-        */
-       @Override
-       public Record nextRecord(Record record) {
-               try {
-                       resultSet.next();
-                       ResultSetMetaData rsmd = resultSet.getMetaData();
-                       int column_count = rsmd.getColumnCount();
-                       record.setNumFields(column_count);
-
-                       for (int pos = 0; pos < column_count; pos++) {
-                               int type = rsmd.getColumnType(pos + 1);
-                               retrieveTypeAndFillRecord(pos, type, record);
-                       }
-                       return record;
-               } catch (SQLException e) {
-                       throw new IllegalArgumentException("Couldn't read 
data:\t" + e.getMessage());
-               } catch (NotTransformableSQLFieldException e) {
-                       throw new IllegalArgumentException("Couldn't read data 
because of unknown column sql-type:\t"
-                               + e.getMessage());
-               } catch (NullPointerException e) {
-                       throw new IllegalArgumentException("Couldn't access 
resultSet:\t" + e.getMessage());
-               }
-       }
-       
-       public static class NotTransformableSQLFieldException extends Exception 
{
-
-               private static final long serialVersionUID = 1L;
-
-               public NotTransformableSQLFieldException(String message) {
-                       super(message);
-               }
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06b37bf5/flink-staging/flink-jdbc/src/main/java/org/apache/flink/api/java/record/io/jdbc/JDBCOutputFormat.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-jdbc/src/main/java/org/apache/flink/api/java/record/io/jdbc/JDBCOutputFormat.java
 
b/flink-staging/flink-jdbc/src/main/java/org/apache/flink/api/java/record/io/jdbc/JDBCOutputFormat.java
deleted file mode 100644
index 780001a..0000000
--- 
a/flink-staging/flink-jdbc/src/main/java/org/apache/flink/api/java/record/io/jdbc/JDBCOutputFormat.java
+++ /dev/null
@@ -1,359 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.record.io.jdbc;
-
-import java.io.IOException;
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.PreparedStatement;
-import java.sql.SQLException;
-
-import org.apache.flink.api.common.io.FileOutputFormat;
-import org.apache.flink.api.common.io.OutputFormat;
-import org.apache.flink.api.java.record.operators.GenericDataSink;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.types.BooleanValue;
-import org.apache.flink.types.ByteValue;
-import org.apache.flink.types.CharValue;
-import org.apache.flink.types.DoubleValue;
-import org.apache.flink.types.FloatValue;
-import org.apache.flink.types.IntValue;
-import org.apache.flink.types.LongValue;
-import org.apache.flink.types.Record;
-import org.apache.flink.types.ShortValue;
-import org.apache.flink.types.StringValue;
-import org.apache.flink.types.Value;
-
-public class JDBCOutputFormat implements OutputFormat<Record> {
-       private static final long serialVersionUID = 1L;
-
-       private static final int DEFAULT_BATCH_INTERVERAL = 5000;
-       
-       public static final String DRIVER_KEY = "driver";
-       public static final String USERNAME_KEY = "username";
-       public static final String PASSWORD_KEY = "password";
-       public static final String URL_KEY = "url";
-       public static final String QUERY_KEY = "query";
-       public static final String FIELD_COUNT_KEY = "fields";
-       public static final String FIELD_TYPE_KEY = "type";
-       public static final String BATCH_INTERVAL = "batchInt";
-
-       private Connection dbConn;
-       private PreparedStatement upload;
-
-       private String username;
-       private String password;
-       private String driverName;
-       private String dbURL;
-
-       private String query;
-       private int fieldCount;
-       private Class<? extends Value>[] fieldClasses;
-       
-       /**
-        * Variable indicating the current number of insert sets in a batch.
-        */
-       private int batchCount = 0;
-       
-       /**
-        * Commit interval of batches.
-        * High batch interval: faster inserts, more memory required (reduce if 
OutOfMemoryExceptions occur)
-        * low batch interval: slower inserts, less memory.
-        */
-       private int batchInterval = DEFAULT_BATCH_INTERVERAL;
-       
-
-       /**
-        * Configures this JDBCOutputFormat.
-        * 
-        * @param parameters
-        *        Configuration containing all parameters.
-        */
-       @Override
-       public void configure(Configuration parameters) {
-               this.driverName = parameters.getString(DRIVER_KEY, null);
-               this.username = parameters.getString(USERNAME_KEY, null);
-               this.password = parameters.getString(PASSWORD_KEY, null);
-               this.dbURL = parameters.getString(URL_KEY, null);
-               this.query = parameters.getString(QUERY_KEY, null);
-               this.fieldCount = parameters.getInteger(FIELD_COUNT_KEY, 0);
-               this.batchInterval = parameters.getInteger(BATCH_INTERVAL, 
DEFAULT_BATCH_INTERVERAL);
-
-               @SuppressWarnings("unchecked")
-               Class<Value>[] classes = new Class[this.fieldCount];
-               this.fieldClasses = classes;
-               
-               ClassLoader cl = getClass().getClassLoader();
-
-               try {
-                       for (int i = 0; i < this.fieldCount; i++) {
-                               Class<? extends Value> clazz = 
parameters.<Value>getClass(FIELD_TYPE_KEY + i, null, cl);
-                               if (clazz == null) {
-                                       throw new 
IllegalArgumentException("Invalid configuration for JDBCOutputFormat: "
-                                                       + "No type class for 
parameter " + i);
-                               }
-                               this.fieldClasses[i] = clazz;
-                       }
-               }
-               catch (ClassNotFoundException e) {
-                       throw new RuntimeException("Could not load data type 
classes.", e);
-               }
-       }
-
-       /**
-        * Connects to the target database and initializes the prepared 
statement.
-        *
-        * @param taskNumber The number of the parallel instance.
-        * @throws IOException Thrown, if the output could not be opened due to 
an
-        * I/O problem.
-        */
-       @Override
-       public void open(int taskNumber, int numTasks) throws IOException {
-               try {
-                       establishConnection();
-                       upload = dbConn.prepareStatement(query);
-               } catch (SQLException sqe) {
-                       throw new IllegalArgumentException("open() failed:\t!", 
sqe);
-               } catch (ClassNotFoundException cnfe) {
-                       throw new IllegalArgumentException("JDBC-Class not 
found:\t", cnfe);
-               }
-       }
-
-       private void establishConnection() throws SQLException, 
ClassNotFoundException {
-               Class.forName(driverName);
-               if (username == null) {
-                       dbConn = DriverManager.getConnection(dbURL);
-               } else {
-                       dbConn = DriverManager.getConnection(dbURL, username, 
password);
-               }
-       }
-
-       /**
-        * Adds a record to the prepared statement.
-        * <p>
-        * When this method is called, the output format is guaranteed to be 
opened.
-        *
-        * @param record The records to add to the output.
-        * @throws IOException Thrown, if the records could not be added due to 
an
-        * I/O problem.
-        */
-       
-       @Override
-       public void writeRecord(Record record) throws IOException {
-               try {
-                       for (int x = 0; x < record.getNumFields(); x++) {
-                               Value temp = record.getField(x, 
fieldClasses[x]);
-                               addValue(x + 1, temp);
-                       }
-                       upload.addBatch();
-                       batchCount++;
-                       if(batchCount >= batchInterval) {
-                               upload.executeBatch();
-                               batchCount = 0;
-                       }
-               } catch (SQLException sqe) {
-                       throw new IllegalArgumentException("writeRecord() 
failed:\t", sqe);
-               } catch (IllegalArgumentException iae) {
-                       throw new IllegalArgumentException("writeRecord() 
failed:\t", iae);
-               }
-       }
-
-       private enum pactType {
-               BooleanValue,
-               ByteValue,
-               CharValue,
-               DoubleValue,
-               FloatValue,
-               IntValue,
-               LongValue,
-               ShortValue,
-               StringValue
-       }
-
-       private void addValue(int index, Value value) throws SQLException {
-               pactType type;
-               try {
-                       type = 
pactType.valueOf(value.getClass().getSimpleName());
-               } catch (IllegalArgumentException iae) {
-                       throw new IllegalArgumentException("PactType not 
supported:\t", iae);
-               }
-               switch (type) {
-                       case BooleanValue:
-                               upload.setBoolean(index, ((BooleanValue) 
value).getValue());
-                               break;
-                       case ByteValue:
-                               upload.setByte(index, ((ByteValue) 
value).getValue());
-                               break;
-                       case CharValue:
-                               upload.setString(index, 
String.valueOf(((CharValue) value).getValue()));
-                               break;
-                       case DoubleValue:
-                               upload.setDouble(index, ((DoubleValue) 
value).getValue());
-                               break;
-                       case FloatValue:
-                               upload.setFloat(index, ((FloatValue) 
value).getValue());
-                               break;
-                       case IntValue:
-                               upload.setInt(index, ((IntValue) 
value).getValue());
-                               break;
-                       case LongValue:
-                               upload.setLong(index, ((LongValue) 
value).getValue());
-                               break;
-                       case ShortValue:
-                               upload.setShort(index, ((ShortValue) 
value).getValue());
-                               break;
-                       case StringValue:
-                               upload.setString(index, ((StringValue) 
value).getValue());
-                               break;
-               }
-       }
-
-       /**
-        * Executes prepared statement and closes all resources of this 
instance.
-        *
-        * @throws IOException Thrown, if the input could not be closed 
properly.
-        */
-       @Override
-       public void close() throws IOException {
-               try {
-                       upload.executeBatch();
-                       batchCount = 0;
-                       upload.close();
-                       dbConn.close();
-               } catch (SQLException sqe) {
-                       throw new IllegalArgumentException("close() failed:\t", 
sqe);
-               }
-       }
-
-       /**
-        * Creates a configuration builder that can be used to set the 
-        * output format's parameters to the config in a fluent fashion.
-        * 
-        * @return A config builder for setting parameters.
-        */
-       public static ConfigBuilder configureOutputFormat(GenericDataSink 
target) {
-               return new ConfigBuilder(target.getParameters());
-       }
-
-       /**
-        * Abstract builder used to set parameters to the output format's 
-        * configuration in a fluent way.
-        */
-       protected static abstract class AbstractConfigBuilder<T>
-                       extends FileOutputFormat.AbstractConfigBuilder<T> {
-
-               /**
-                * Creates a new builder for the given configuration.
-                * 
-                * @param config The configuration into which the parameters 
will be written.
-                */
-               protected AbstractConfigBuilder(Configuration config) {
-                       super(config);
-               }
-
-               /**
-                * Sets the query field.
-                * @param value value to be set.
-                * @return The builder itself.
-                */
-               public T setQuery(String value) {
-                       this.config.setString(QUERY_KEY, value);
-                       @SuppressWarnings("unchecked")
-                       T ret = (T) this;
-                       return ret;
-               }
-
-               /**
-                * Sets the url field.
-                * @param value value to be set.
-                * @return The builder itself.
-                */
-               public T setUrl(String value) {
-                       this.config.setString(URL_KEY, value);
-                       @SuppressWarnings("unchecked")
-                       T ret = (T) this;
-                       return ret;
-               }
-
-               /**
-                * Sets the username field.
-                * @param value value to be set.
-                * @return The builder itself.
-                */
-               public T setUsername(String value) {
-                       this.config.setString(USERNAME_KEY, value);
-                       @SuppressWarnings("unchecked")
-                       T ret = (T) this;
-                       return ret;
-               }
-
-               /**
-                * Sets the password field.
-                * @param value value to be set.
-                * @return The builder itself.
-                */
-               public T setPassword(String value) {
-                       this.config.setString(PASSWORD_KEY, value);
-                       @SuppressWarnings("unchecked")
-                       T ret = (T) this;
-                       return ret;
-               }
-
-               /**
-                * Sets the driver field.
-                * @param value value to be set.
-                * @return The builder itself.
-                */
-               public T setDriver(String value) {
-                       this.config.setString(DRIVER_KEY, value);
-                       @SuppressWarnings("unchecked")
-                       T ret = (T) this;
-                       return ret;
-               }
-
-               /**
-                * Sets the type of a column.
-                * Types are applied in the order they were set.
-                * @param type PactType to apply.
-                * @return The builder itself.
-                */
-               public T setClass(Class<? extends Value> type) {
-                       final int numYet = 
this.config.getInteger(FIELD_COUNT_KEY, 0);
-                       this.config.setClass(FIELD_TYPE_KEY + numYet, type);
-                       this.config.setInteger(FIELD_COUNT_KEY, numYet + 1);
-                       @SuppressWarnings("unchecked")
-                       T ret = (T) this;
-                       return ret;
-               }
-       }
-
-       /**
-        * A builder used to set parameters to the output format's 
configuration in a fluent way.
-        */
-       public static final class ConfigBuilder extends 
AbstractConfigBuilder<ConfigBuilder> {
-               /**
-                * Creates a new builder for the given configuration.
-                * 
-                * @param targetConfig The configuration into which the 
parameters will be written.
-                */
-               protected ConfigBuilder(Configuration targetConfig) {
-                       super(targetConfig);
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06b37bf5/flink-staging/flink-jdbc/src/main/java/org/apache/flink/api/java/record/io/jdbc/example/JDBCExample.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-jdbc/src/main/java/org/apache/flink/api/java/record/io/jdbc/example/JDBCExample.java
 
b/flink-staging/flink-jdbc/src/main/java/org/apache/flink/api/java/record/io/jdbc/example/JDBCExample.java
deleted file mode 100644
index 213fd6a..0000000
--- 
a/flink-staging/flink-jdbc/src/main/java/org/apache/flink/api/java/record/io/jdbc/example/JDBCExample.java
+++ /dev/null
@@ -1,136 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.record.io.jdbc.example;
-
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.Statement;
-
-import org.apache.flink.api.common.JobExecutionResult;
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.api.common.Program;
-import org.apache.flink.api.common.ProgramDescription;
-import org.apache.flink.api.java.record.io.jdbc.JDBCInputFormat;
-import org.apache.flink.api.java.record.io.jdbc.JDBCOutputFormat;
-import org.apache.flink.api.java.record.operators.GenericDataSink;
-import org.apache.flink.api.java.record.operators.GenericDataSource;
-import org.apache.flink.client.LocalExecutor;
-import org.apache.flink.types.FloatValue;
-import org.apache.flink.types.IntValue;
-import org.apache.flink.types.StringValue;
-
-/**
- * Stand-alone example for the JDBC connector.
- *
- * NOTE: To run this example, you need the apache derby code in your classpath.
- * See the Maven file (pom.xml) for a reference to the derby dependency. You 
can
- * simply Change the scope of the Maven dependency from test to compile.
- */
-public class JDBCExample implements Program, ProgramDescription {
-
-       @Override
-       public Plan getPlan(String[] args) {
-               /*
-                * In this example we use the constructor where the url 
contains all the settings that are needed.
-                * You could also use the default constructor and deliver a 
Configuration with all the needed settings.
-                * You also could set the settings to the source-instance.
-                */
-               GenericDataSource<JDBCInputFormat> source = new 
GenericDataSource<JDBCInputFormat>(
-                               new JDBCInputFormat(
-                                               
"org.apache.derby.jdbc.EmbeddedDriver",
-                                               "jdbc:derby:memory:ebookshop",
-                                               "select * from books"),
-                               "Data Source");
-
-               GenericDataSink sink = new GenericDataSink(new 
JDBCOutputFormat(), "Data Output");
-               JDBCOutputFormat.configureOutputFormat(sink)
-                               
.setDriver("org.apache.derby.jdbc.EmbeddedDriver")
-                               .setUrl("jdbc:derby:memory:ebookshop")
-                               .setQuery("insert into newbooks 
(id,title,author,price,qty) values (?,?,?,?,?)")
-                               .setClass(IntValue.class)
-                               .setClass(StringValue.class)
-                               .setClass(StringValue.class)
-                               .setClass(FloatValue.class)
-                               .setClass(IntValue.class);
-
-               sink.addInput(source);
-               return new Plan(sink, "JDBC Example Job");
-       }
-
-       @Override
-       public String getDescription() {
-               return "Parameter:";
-       }
-
-       /*
-        * To run this example, you need the apache derby code in your 
classpath!
-        */
-       public static void main(String[] args) throws Exception {
-
-               prepareTestDb();
-               JDBCExample tut = new JDBCExample();
-               JobExecutionResult res = LocalExecutor.execute(tut, args);
-               System.out.println("runtime: " + res.getNetRuntime() + " ms");
-
-               System.exit(0);
-       }
-
-       private static void prepareTestDb() throws Exception {
-               String dbURL = "jdbc:derby:memory:ebookshop;create=true";
-               Class.forName("org.apache.derby.jdbc.EmbeddedDriver");
-               Connection conn = DriverManager.getConnection(dbURL);
-
-               StringBuilder sqlQueryBuilder = new StringBuilder("CREATE TABLE 
books (");
-               sqlQueryBuilder.append("id INT NOT NULL DEFAULT 0,");
-               sqlQueryBuilder.append("title VARCHAR(50) DEFAULT NULL,");
-               sqlQueryBuilder.append("author VARCHAR(50) DEFAULT NULL,");
-               sqlQueryBuilder.append("price FLOAT DEFAULT NULL,");
-               sqlQueryBuilder.append("qty INT DEFAULT NULL,");
-               sqlQueryBuilder.append("PRIMARY KEY (id))");
-
-               Statement stat = conn.createStatement();
-               stat.executeUpdate(sqlQueryBuilder.toString());
-               stat.close();
-
-               sqlQueryBuilder = new StringBuilder("CREATE TABLE newbooks (");
-               sqlQueryBuilder.append("id INT NOT NULL DEFAULT 0,");
-               sqlQueryBuilder.append("title VARCHAR(50) DEFAULT NULL,");
-               sqlQueryBuilder.append("author VARCHAR(50) DEFAULT NULL,");
-               sqlQueryBuilder.append("price FLOAT DEFAULT NULL,");
-               sqlQueryBuilder.append("qty INT DEFAULT NULL,");
-               sqlQueryBuilder.append("PRIMARY KEY (id))");
-
-               stat = conn.createStatement();
-               stat.executeUpdate(sqlQueryBuilder.toString());
-               stat.close();
-
-               sqlQueryBuilder = new StringBuilder("INSERT INTO books (id, 
title, author, price, qty) VALUES ");
-               sqlQueryBuilder.append("(1001, 'Java for dummies', 'Tan Ah 
Teck', 11.11, 11),");
-               sqlQueryBuilder.append("(1002, 'More Java for dummies', 'Tan Ah 
Teck', 22.22, 22),");
-               sqlQueryBuilder.append("(1003, 'More Java for more dummies', 
'Mohammad Ali', 33.33, 33),");
-               sqlQueryBuilder.append("(1004, 'A Cup of Java', 'Kumar', 44.44, 
44),");
-               sqlQueryBuilder.append("(1005, 'A Teaspoon of Java', 'Kevin 
Jones', 55.55, 55)");
-
-               stat = conn.createStatement();
-               stat.execute(sqlQueryBuilder.toString());
-               stat.close();
-               
-               conn.close();
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06b37bf5/flink-staging/flink-jdbc/src/test/java/org/apache/flink/api/java/record/io/jdbc/DevNullLogStream.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-jdbc/src/test/java/org/apache/flink/api/java/record/io/jdbc/DevNullLogStream.java
 
b/flink-staging/flink-jdbc/src/test/java/org/apache/flink/api/java/record/io/jdbc/DevNullLogStream.java
deleted file mode 100644
index 172f585..0000000
--- 
a/flink-staging/flink-jdbc/src/test/java/org/apache/flink/api/java/record/io/jdbc/DevNullLogStream.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.api.java.record.io.jdbc;
-
-import java.io.OutputStream;
-
-public class DevNullLogStream {
-
-       public static final OutputStream DEV_NULL = new OutputStream() {
-               public void write(int b) {}
-       };
-       
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06b37bf5/flink-staging/flink-jdbc/src/test/java/org/apache/flink/api/java/record/io/jdbc/JDBCInputFormatTest.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-jdbc/src/test/java/org/apache/flink/api/java/record/io/jdbc/JDBCInputFormatTest.java
 
b/flink-staging/flink-jdbc/src/test/java/org/apache/flink/api/java/record/io/jdbc/JDBCInputFormatTest.java
deleted file mode 100644
index 8e0a2c5..0000000
--- 
a/flink-staging/flink-jdbc/src/test/java/org/apache/flink/api/java/record/io/jdbc/JDBCInputFormatTest.java
+++ /dev/null
@@ -1,214 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.java.record.io.jdbc;
-
-import java.io.IOException;
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.SQLException;
-import java.sql.Statement;
-
-import org.junit.Assert;
-
-import org.apache.flink.api.java.record.io.jdbc.JDBCInputFormat;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.types.DoubleValue;
-import org.apache.flink.types.IntValue;
-import org.apache.flink.types.Record;
-import org.apache.flink.types.StringValue;
-import org.apache.flink.types.Value;
-import org.junit.After;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-public class JDBCInputFormatTest {
-       JDBCInputFormat jdbcInputFormat;
-       Configuration config;
-       static Connection conn;
-       static final Value[][] dbData = {
-               {new IntValue(1001), new StringValue("Java for dummies"), new 
StringValue("Tan Ah Teck"), new DoubleValue(11.11), new IntValue(11)},
-               {new IntValue(1002), new StringValue("More Java for dummies"), 
new StringValue("Tan Ah Teck"), new DoubleValue(22.22), new IntValue(22)},
-               {new IntValue(1003), new StringValue("More Java for more 
dummies"), new StringValue("Mohammad Ali"), new DoubleValue(33.33), new 
IntValue(33)},
-               {new IntValue(1004), new StringValue("A Cup of Java"), new 
StringValue("Kumar"), new DoubleValue(44.44), new IntValue(44)},
-               {new IntValue(1005), new StringValue("A Teaspoon of Java"), new 
StringValue("Kevin Jones"), new DoubleValue(55.55), new IntValue(55)}};
-
-       @BeforeClass
-       public static void setUpClass() {
-               try {
-                       prepareDerbyDatabase();
-               } catch (Exception e) {
-                       e.printStackTrace();
-                       Assert.fail();
-               }
-       }
-
-       private static void prepareDerbyDatabase() throws 
ClassNotFoundException {
-               
System.setProperty("derby.stream.error.field","org.apache.flink.api.java.record.io.jdbc.DevNullLogStream.DEV_NULL");
-               String dbURL = "jdbc:derby:memory:ebookshop;create=true";
-               createConnection(dbURL);
-       }
-
-       private static void cleanUpDerbyDatabases() {
-               try {
-                               String dbURL = 
"jdbc:derby:memory:ebookshop;create=true";
-                               
Class.forName("org.apache.derby.jdbc.EmbeddedDriver");
-                               conn = DriverManager.getConnection(dbURL);
-                               Statement stat = conn.createStatement();
-                               stat.executeUpdate("DROP TABLE books");
-                               stat.close();
-                               conn.close();
-                       } catch (Exception e) {
-                               e.printStackTrace();
-                               Assert.fail();
-                       } 
-       }
-       
-       /*
-        Loads JDBC derby driver ; creates(if necessary) and populates database.
-        */
-       private static void createConnection(String dbURL) {
-               try {
-                       Class.forName("org.apache.derby.jdbc.EmbeddedDriver");
-                       conn = DriverManager.getConnection(dbURL);
-                       createTable();
-                       insertDataToSQLTables();
-                       conn.close();
-               } catch (Exception e) {
-                       e.printStackTrace();
-                       Assert.fail();
-               }
-       }
-
-       private static void createTable() throws SQLException {
-               StringBuilder sqlQueryBuilder = new StringBuilder("CREATE TABLE 
books (");
-               sqlQueryBuilder.append("id INT NOT NULL DEFAULT 0,");
-               sqlQueryBuilder.append("title VARCHAR(50) DEFAULT NULL,");
-               sqlQueryBuilder.append("author VARCHAR(50) DEFAULT NULL,");
-               sqlQueryBuilder.append("price FLOAT DEFAULT NULL,");
-               sqlQueryBuilder.append("qty INT DEFAULT NULL,");
-               sqlQueryBuilder.append("PRIMARY KEY (id))");
-
-               Statement stat = conn.createStatement();
-               stat.executeUpdate(sqlQueryBuilder.toString());
-               stat.close();
-
-               sqlQueryBuilder = new StringBuilder("CREATE TABLE bookscontent 
(");
-               sqlQueryBuilder.append("id INT NOT NULL DEFAULT 0,");
-               sqlQueryBuilder.append("title VARCHAR(50) DEFAULT NULL,");
-               sqlQueryBuilder.append("content BLOB(10K) DEFAULT NULL,");
-               sqlQueryBuilder.append("PRIMARY KEY (id))");
-
-               stat = conn.createStatement();
-               stat.executeUpdate(sqlQueryBuilder.toString());
-               stat.close();
-       }
-
-       private static void insertDataToSQLTables() throws SQLException {
-               StringBuilder sqlQueryBuilder = new StringBuilder("INSERT INTO 
books (id, title, author, price, qty) VALUES ");
-               sqlQueryBuilder.append("(1001, 'Java for dummies', 'Tan Ah 
Teck', 11.11, 11),");
-               sqlQueryBuilder.append("(1002, 'More Java for dummies', 'Tan Ah 
Teck', 22.22, 22),");
-               sqlQueryBuilder.append("(1003, 'More Java for more dummies', 
'Mohammad Ali', 33.33, 33),");
-               sqlQueryBuilder.append("(1004, 'A Cup of Java', 'Kumar', 44.44, 
44),");
-               sqlQueryBuilder.append("(1005, 'A Teaspoon of Java', 'Kevin 
Jones', 55.55, 55)");
-
-               Statement stat = conn.createStatement();
-               stat.execute(sqlQueryBuilder.toString());
-               stat.close();
-
-               sqlQueryBuilder = new StringBuilder("INSERT INTO bookscontent 
(id, title, content) VALUES ");
-               sqlQueryBuilder.append("(1001, 'Java for dummies', 
CAST(X'7f454c4602' AS BLOB)),");
-               sqlQueryBuilder.append("(1002, 'More Java for dummies', 
CAST(X'7f454c4602' AS BLOB)),");
-               sqlQueryBuilder.append("(1003, 'More Java for more dummies', 
CAST(X'7f454c4602' AS BLOB)),");
-               sqlQueryBuilder.append("(1004, 'A Cup of Java', 
CAST(X'7f454c4602' AS BLOB)),");
-               sqlQueryBuilder.append("(1005, 'A Teaspoon of Java', 
CAST(X'7f454c4602' AS BLOB))");
-
-               stat = conn.createStatement();
-               stat.execute(sqlQueryBuilder.toString());
-               stat.close();
-       }
-
-
-       @After
-       public void tearDown() {
-               jdbcInputFormat = null;
-       }
-
-       @Test(expected = IllegalArgumentException.class)
-       public void testInvalidConnection() {
-               jdbcInputFormat = new 
JDBCInputFormat("org.apache.derby.jdbc.EmbeddedDriver", 
"jdbc:derby:memory:idontexist", "select * from books;");
-               jdbcInputFormat.configure(null);
-       }
-
-       @Test(expected = IllegalArgumentException.class)
-       public void testInvalidQuery() {
-               jdbcInputFormat = new 
JDBCInputFormat("org.apache.derby.jdbc.EmbeddedDriver", 
"jdbc:derby:memory:ebookshop", "abc");
-               jdbcInputFormat.configure(null);
-       }
-
-       @Test(expected = IllegalArgumentException.class)
-       public void testInvalidDBType() {
-               jdbcInputFormat = new JDBCInputFormat("idontexist.Driver", 
"jdbc:derby:memory:ebookshop", "select * from books;");
-               jdbcInputFormat.configure(null);
-       }
-
-       @Test(expected = IllegalArgumentException.class)
-       public void testUnsupportedSQLType() {
-               jdbcInputFormat = new 
JDBCInputFormat("org.apache.derby.jdbc.EmbeddedDriver", 
"jdbc:derby:memory:ebookshop", "select * from bookscontent");
-               jdbcInputFormat.configure(null);
-               jdbcInputFormat.nextRecord(new Record());
-       }
-
-       @Test(expected = IllegalArgumentException.class)
-       public void testNotConfiguredFormatNext() {
-               jdbcInputFormat = new 
JDBCInputFormat("org.apache.derby.jdbc.EmbeddedDriver", 
"jdbc:derby:memory:ebookshop", "select * from books");
-               jdbcInputFormat.nextRecord(new Record());
-       }
-
-       @Test(expected = IllegalArgumentException.class)
-       public void testNotConfiguredFormatEnd() {
-               jdbcInputFormat = new 
JDBCInputFormat("org.apache.derby.jdbc.EmbeddedDriver", 
"jdbc:derby:memory:ebookshop", "select * from books");
-               jdbcInputFormat.reachedEnd();
-       }
-
-       @Test
-       public void testJDBCInputFormat() throws IOException {
-               jdbcInputFormat = new 
JDBCInputFormat("org.apache.derby.jdbc.EmbeddedDriver", 
"jdbc:derby:memory:ebookshop", "select * from books");
-               jdbcInputFormat.configure(null);
-               Record record = new Record();
-               int recordCount = 0;
-               while (!jdbcInputFormat.reachedEnd()) {
-                       jdbcInputFormat.nextRecord(record);
-                       Assert.assertEquals(5, record.getNumFields());
-                       Assert.assertEquals("Field 0 should be int", 
IntValue.class, record.getField(0, IntValue.class).getClass());
-                       Assert.assertEquals("Field 1 should be String", 
StringValue.class, record.getField(1, StringValue.class).getClass());
-                       Assert.assertEquals("Field 2 should be String", 
StringValue.class, record.getField(2, StringValue.class).getClass());
-                       Assert.assertEquals("Field 3 should be float", 
DoubleValue.class, record.getField(3, DoubleValue.class).getClass());
-                       Assert.assertEquals("Field 4 should be int", 
IntValue.class, record.getField(4, IntValue.class).getClass());
-
-                       int[] pos = {0, 1, 2, 3, 4};
-                       Value[] values = {new IntValue(), new StringValue(), 
new StringValue(), new DoubleValue(), new IntValue()};
-                       Assert.assertTrue(record.equalsFields(pos, 
dbData[recordCount], values));
-
-                       recordCount++;
-               }
-               Assert.assertEquals(5, recordCount);
-               
-               cleanUpDerbyDatabases();
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06b37bf5/flink-staging/flink-jdbc/src/test/java/org/apache/flink/api/java/record/io/jdbc/JDBCOutputFormatTest.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-jdbc/src/test/java/org/apache/flink/api/java/record/io/jdbc/JDBCOutputFormatTest.java
 
b/flink-staging/flink-jdbc/src/test/java/org/apache/flink/api/java/record/io/jdbc/JDBCOutputFormatTest.java
deleted file mode 100644
index c824ea1..0000000
--- 
a/flink-staging/flink-jdbc/src/test/java/org/apache/flink/api/java/record/io/jdbc/JDBCOutputFormatTest.java
+++ /dev/null
@@ -1,225 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.record.io.jdbc;
-
-import java.io.IOException;
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.SQLException;
-import java.sql.Statement;
-
-import org.junit.Assert;
-
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.types.DoubleValue;
-import org.apache.flink.types.FloatValue;
-import org.apache.flink.types.IntValue;
-import org.apache.flink.types.Record;
-import org.apache.flink.types.StringValue;
-import org.apache.flink.types.Value;
-import org.junit.After;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-public class JDBCOutputFormatTest {
-       private JDBCInputFormat jdbcInputFormat;
-       private JDBCOutputFormat jdbcOutputFormat;
-
-       private static Connection conn;
-
-       static final Value[][] dbData = {
-               {new IntValue(1001), new StringValue("Java for dummies"), new 
StringValue("Tan Ah Teck"), new DoubleValue(11.11), new IntValue(11)},
-               {new IntValue(1002), new StringValue("More Java for dummies"), 
new StringValue("Tan Ah Teck"), new DoubleValue(22.22), new IntValue(22)},
-               {new IntValue(1003), new StringValue("More Java for more 
dummies"), new StringValue("Mohammad Ali"), new DoubleValue(33.33), new 
IntValue(33)},
-               {new IntValue(1004), new StringValue("A Cup of Java"), new 
StringValue("Kumar"), new DoubleValue(44.44), new IntValue(44)},
-               {new IntValue(1005), new StringValue("A Teaspoon of Java"), new 
StringValue("Kevin Jones"), new DoubleValue(55.55), new IntValue(55)}};
-
-       @BeforeClass
-       public static void setUpClass() {
-               try {
-                       System.setProperty("derby.stream.error.field", 
"org.apache.flink.api.java.record.io.jdbc.DevNullLogStream.DEV_NULL");
-                       prepareDerbyInputDatabase();
-                       prepareDerbyOutputDatabase();
-               } catch (ClassNotFoundException e) {
-                       e.printStackTrace();
-                       Assert.fail();
-               }
-       }
-
-       private static void cleanUpDerbyDatabases() {
-                try {
-                        String dbURL = 
"jdbc:derby:memory:ebookshop;create=true";
-                        Class.forName("org.apache.derby.jdbc.EmbeddedDriver");
-                        conn = DriverManager.getConnection(dbURL);
-                        Statement stat = conn.createStatement();
-                        stat.executeUpdate("DROP TABLE books");
-                        stat.executeUpdate("DROP TABLE newbooks");
-                        stat.close();
-                        conn.close();
-                } catch (Exception e) {
-                        e.printStackTrace();
-                        Assert.fail();
-                } 
-       }
-       
-       private static void prepareDerbyInputDatabase() throws 
ClassNotFoundException {
-               try {
-                       String dbURL = 
"jdbc:derby:memory:ebookshop;create=true";
-                       Class.forName("org.apache.derby.jdbc.EmbeddedDriver");
-                       conn = DriverManager.getConnection(dbURL);
-                       createTableBooks();
-                       insertDataToSQLTables();
-                       conn.close();
-               } catch (ClassNotFoundException e) {
-                       e.printStackTrace();
-                       Assert.fail();
-               } catch (SQLException e) {
-                       e.printStackTrace();
-                       Assert.fail();
-               }
-       }
-
-       private static void prepareDerbyOutputDatabase() throws 
ClassNotFoundException {
-               try {
-                       String dbURL = 
"jdbc:derby:memory:ebookshop;create=true";
-                       Class.forName("org.apache.derby.jdbc.EmbeddedDriver");
-                       conn = DriverManager.getConnection(dbURL);
-                       createTableNewBooks();
-                       conn.close();
-               } catch (ClassNotFoundException e) {
-                       e.printStackTrace();
-                       Assert.fail();
-               } catch (SQLException e) {
-                       e.printStackTrace();
-                       Assert.fail();
-               }
-       }
-
-       private static void createTableBooks() throws SQLException {
-               StringBuilder sqlQueryBuilder = new StringBuilder("CREATE TABLE 
books (");
-               sqlQueryBuilder.append("id INT NOT NULL DEFAULT 0,");
-               sqlQueryBuilder.append("title VARCHAR(50) DEFAULT NULL,");
-               sqlQueryBuilder.append("author VARCHAR(50) DEFAULT NULL,");
-               sqlQueryBuilder.append("price FLOAT DEFAULT NULL,");
-               sqlQueryBuilder.append("qty INT DEFAULT NULL,");
-               sqlQueryBuilder.append("PRIMARY KEY (id))");
-
-               Statement stat = conn.createStatement();
-               stat.executeUpdate(sqlQueryBuilder.toString());
-               stat.close();
-       }
-
-       private static void createTableNewBooks() throws SQLException {
-               StringBuilder sqlQueryBuilder = new StringBuilder("CREATE TABLE 
newbooks (");
-               sqlQueryBuilder.append("id INT NOT NULL DEFAULT 0,");
-               sqlQueryBuilder.append("title VARCHAR(50) DEFAULT NULL,");
-               sqlQueryBuilder.append("author VARCHAR(50) DEFAULT NULL,");
-               sqlQueryBuilder.append("price FLOAT DEFAULT NULL,");
-               sqlQueryBuilder.append("qty INT DEFAULT NULL,");
-               sqlQueryBuilder.append("PRIMARY KEY (id))");
-
-               Statement stat = conn.createStatement();
-               stat.executeUpdate(sqlQueryBuilder.toString());
-               stat.close();
-       }
-
-       private static void insertDataToSQLTables() throws SQLException {
-               StringBuilder sqlQueryBuilder = new StringBuilder("INSERT INTO 
books (id, title, author, price, qty) VALUES ");
-               sqlQueryBuilder.append("(1001, 'Java for dummies', 'Tan Ah 
Teck', 11.11, 11),");
-               sqlQueryBuilder.append("(1002, 'More Java for dummies', 'Tan Ah 
Teck', 22.22, 22),");
-               sqlQueryBuilder.append("(1003, 'More Java for more dummies', 
'Mohammad Ali', 33.33, 33),");
-               sqlQueryBuilder.append("(1004, 'A Cup of Java', 'Kumar', 44.44, 
44),");
-               sqlQueryBuilder.append("(1005, 'A Teaspoon of Java', 'Kevin 
Jones', 55.55, 55)");
-
-               Statement stat = conn.createStatement();
-               stat.execute(sqlQueryBuilder.toString());
-               stat.close();
-       }
-
-
-       @After
-       public void tearDown() {
-               jdbcOutputFormat = null;
-               cleanUpDerbyDatabases();
-       }
-
-       @Test
-       public void testJDBCOutputFormat() throws IOException {
-               String sourceTable = "books";
-               String targetTable = "newbooks";
-               String driverPath = "org.apache.derby.jdbc.EmbeddedDriver";
-               String dbUrl = "jdbc:derby:memory:ebookshop";
-
-               Configuration cfg = new Configuration();
-               cfg.setString("driver", driverPath);
-               cfg.setString("url", dbUrl);
-               cfg.setString("query", "insert into " + targetTable + " (id, 
title, author, price, qty) values (?,?,?,?,?)");
-               cfg.setInteger("fields", 5);
-               cfg.setClass("type0", IntValue.class);
-               cfg.setClass("type1", StringValue.class);
-               cfg.setClass("type2", StringValue.class);
-               cfg.setClass("type3", FloatValue.class);
-               cfg.setClass("type4", IntValue.class);
-
-               jdbcOutputFormat = new JDBCOutputFormat();
-               jdbcOutputFormat.configure(cfg);
-               jdbcOutputFormat.open(0,1);
-
-               jdbcInputFormat = new JDBCInputFormat(
-                               driverPath,
-                               dbUrl,
-                               "select * from " + sourceTable);
-               jdbcInputFormat.configure(null);
-
-               Record record = new Record();
-               while (!jdbcInputFormat.reachedEnd()) {
-                       jdbcInputFormat.nextRecord(record);
-                       jdbcOutputFormat.writeRecord(record);
-               }
-
-               jdbcOutputFormat.close();
-               jdbcInputFormat.close();
-
-               jdbcInputFormat = new JDBCInputFormat(
-                               driverPath,
-                               dbUrl,
-                               "select * from " + targetTable);
-               jdbcInputFormat.configure(null);
-
-               int recordCount = 0;
-               while (!jdbcInputFormat.reachedEnd()) {
-                       jdbcInputFormat.nextRecord(record);
-                       Assert.assertEquals(5, record.getNumFields());
-                       Assert.assertEquals("Field 0 should be int", 
IntValue.class, record.getField(0, IntValue.class).getClass());
-                       Assert.assertEquals("Field 1 should be String", 
StringValue.class, record.getField(1, StringValue.class).getClass());
-                       Assert.assertEquals("Field 2 should be String", 
StringValue.class, record.getField(2, StringValue.class).getClass());
-                       Assert.assertEquals("Field 3 should be float", 
DoubleValue.class, record.getField(3, DoubleValue.class).getClass());
-                       Assert.assertEquals("Field 4 should be int", 
IntValue.class, record.getField(4, IntValue.class).getClass());
-
-                       int[] pos = {0, 1, 2, 3, 4};
-                       Value[] values = {new IntValue(), new StringValue(), 
new StringValue(), new DoubleValue(), new IntValue()};
-                       Assert.assertTrue(record.equalsFields(pos, 
dbData[recordCount], values));
-
-                       recordCount++;
-               }
-               Assert.assertEquals(5, recordCount);
-
-               jdbcInputFormat.close();
-       }
-}

Reply via email to