Repository: flink Updated Branches: refs/heads/master d570d078a -> 30761572b
[FLINK-2005] Remove Record API from jdbc module This closes #982 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/06b37bf5 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/06b37bf5 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/06b37bf5 Branch: refs/heads/master Commit: 06b37bf550315bd1d5be7dc3ed6638fd21768e1a Parents: d570d07 Author: zentol <s.mo...@web.de> Authored: Tue Aug 4 12:45:22 2015 +0200 Committer: Fabian Hueske <fhue...@apache.org> Committed: Tue Aug 4 18:13:30 2015 +0200 ---------------------------------------------------------------------- flink-staging/flink-jdbc/pom.xml | 12 - .../java/record/io/jdbc/JDBCInputFormat.java | 389 ------------------- .../java/record/io/jdbc/JDBCOutputFormat.java | 359 ----------------- .../record/io/jdbc/example/JDBCExample.java | 136 ------- .../java/record/io/jdbc/DevNullLogStream.java | 30 -- .../record/io/jdbc/JDBCInputFormatTest.java | 214 ---------- .../record/io/jdbc/JDBCOutputFormatTest.java | 225 ----------- 7 files changed, 1365 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/06b37bf5/flink-staging/flink-jdbc/pom.xml ---------------------------------------------------------------------- diff --git a/flink-staging/flink-jdbc/pom.xml b/flink-staging/flink-jdbc/pom.xml index 7b499a7..a3976c1 100644 --- a/flink-staging/flink-jdbc/pom.xml +++ b/flink-staging/flink-jdbc/pom.xml @@ -41,18 +41,6 @@ under the License. <artifactId>flink-java</artifactId> <version>${project.version}</version> </dependency> - - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-core</artifactId> - <version>${project.version}</version> - </dependency> - - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-clients</artifactId> - <version>${project.version}</version> - </dependency> <dependency> <groupId>org.apache.derby</groupId> http://git-wip-us.apache.org/repos/asf/flink/blob/06b37bf5/flink-staging/flink-jdbc/src/main/java/org/apache/flink/api/java/record/io/jdbc/JDBCInputFormat.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-jdbc/src/main/java/org/apache/flink/api/java/record/io/jdbc/JDBCInputFormat.java b/flink-staging/flink-jdbc/src/main/java/org/apache/flink/api/java/record/io/jdbc/JDBCInputFormat.java deleted file mode 100644 index 3cd295b..0000000 --- a/flink-staging/flink-jdbc/src/main/java/org/apache/flink/api/java/record/io/jdbc/JDBCInputFormat.java +++ /dev/null @@ -1,389 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.java.record.io.jdbc; - -import java.sql.Connection; -import java.sql.DriverManager; -import java.sql.ResultSet; -import java.sql.ResultSetMetaData; -import java.sql.SQLException; -import java.sql.Statement; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.apache.flink.api.common.io.NonParallelInput; -import org.apache.flink.api.java.record.io.GenericInputFormat; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.types.BooleanValue; -import org.apache.flink.types.DoubleValue; -import org.apache.flink.types.FloatValue; -import org.apache.flink.types.IntValue; -import org.apache.flink.types.LongValue; -import org.apache.flink.types.NullValue; -import org.apache.flink.types.Record; -import org.apache.flink.types.ShortValue; -import org.apache.flink.types.StringValue; - -/** - * InputFormat to read data from a database and generate PactReords. - * The InputFormat has to be configured with the query, and either all - * connection parameters or a complete database URL.{@link Configuration} The position of a value inside a Record is - * determined by the table - * returned. - * - * @see Configuration - * @see Record - * @see DriverManager - */ -public class JDBCInputFormat extends GenericInputFormat implements NonParallelInput { - - private static final long serialVersionUID = 1L; - - @SuppressWarnings("unused") - private static final Logger LOG = LoggerFactory.getLogger(JDBCInputFormat.class); - - - public final String DRIVER_KEY = "driver"; - public final String USERNAME_KEY = "username"; - public final String PASSWORD_KEY = "password"; - public final String URL_KEY = "url"; - public final String QUERY_KEY = "query"; - - - private String username; - private String password; - private String driverName; - private String dbURL; - private String query; - - - private transient Connection dbConn; - private transient Statement statement; - private transient ResultSet resultSet; - - - /** - * Creates a non-configured JDBCInputFormat. This format has to be - * configured using configure(configuration). - */ - public JDBCInputFormat() {} - - /** - * Creates a JDBCInputFormat and configures it. - * - * @param driverName - * JDBC-Drivename - * @param dbURL - * Formatted URL containing all connection parameters. - * @param username - * @param password - * @param query - * Query to execute. - */ - public JDBCInputFormat(String driverName, String dbURL, String username, String password, String query) { - this.driverName = driverName; - this.query = query; - this.dbURL = dbURL; - this.username = username; - this.password = password; - } - - /** - * Creates a JDBCInputFormat and configures it. - * - * @param driverName - * JDBC-Drivername - * @param dbURL - * Formatted URL containing all connection parameters. - * @param query - * Query to execute. - */ - public JDBCInputFormat(String driverName, String dbURL, String query) { - this(driverName, dbURL, "", "", query); - } - - /** - * Creates a JDBCInputFormat and configures it. - * - * @param parameters - * Configuration with all connection parameters. - * @param query - * Query to execute. - */ - public JDBCInputFormat(Configuration parameters, String query) { - this.driverName = parameters.getString(DRIVER_KEY, ""); - this.username = parameters.getString(USERNAME_KEY, ""); - this.password = parameters.getString(PASSWORD_KEY, ""); - this.dbURL = parameters.getString(URL_KEY, ""); - this.query = query; - } - - - /** - * Configures this JDBCInputFormat. This includes setting the connection - * parameters (if necessary), establishing the connection and executing the - * query. - * - * @param parameters - * Configuration containing all or no parameters. - */ - @Override - public void configure(Configuration parameters) { - boolean needConfigure = isFieldNullOrEmpty(this.query) || isFieldNullOrEmpty(this.dbURL); - if (needConfigure) { - this.driverName = parameters.getString(DRIVER_KEY, null); - this.username = parameters.getString(USERNAME_KEY, null); - this.password = parameters.getString(PASSWORD_KEY, null); - this.query = parameters.getString(QUERY_KEY, null); - this.dbURL = parameters.getString(URL_KEY, null); - } - - try { - prepareQueryExecution(); - } catch (SQLException e) { - throw new IllegalArgumentException("Configure failed:\t!", e); - } - } - - /** - * Enters data value from the current resultSet into a Record. - * - * @param pos - * Record position to be set. - * @param type - * SQL type of the resultSet value. - * @param record - * Target Record. - */ - private void retrieveTypeAndFillRecord(int pos, int type, Record record) throws SQLException, - NotTransformableSQLFieldException { - switch (type) { - case java.sql.Types.NULL: - record.setField(pos, NullValue.getInstance()); - break; - case java.sql.Types.BOOLEAN: - record.setField(pos, new BooleanValue(resultSet.getBoolean(pos + 1))); - break; - case java.sql.Types.BIT: - record.setField(pos, new BooleanValue(resultSet.getBoolean(pos + 1))); - break; - case java.sql.Types.CHAR: - record.setField(pos, new StringValue(resultSet.getString(pos + 1))); - break; - case java.sql.Types.NCHAR: - record.setField(pos, new StringValue(resultSet.getString(pos + 1))); - break; - case java.sql.Types.VARCHAR: - record.setField(pos, new StringValue(resultSet.getString(pos + 1))); - break; - case java.sql.Types.LONGVARCHAR: - record.setField(pos, new StringValue(resultSet.getString(pos + 1))); - break; - case java.sql.Types.LONGNVARCHAR: - record.setField(pos, new StringValue(resultSet.getString(pos + 1))); - break; - case java.sql.Types.TINYINT: - record.setField(pos, new ShortValue(resultSet.getShort(pos + 1))); - break; - case java.sql.Types.SMALLINT: - record.setField(pos, new ShortValue(resultSet.getShort(pos + 1))); - break; - case java.sql.Types.BIGINT: - record.setField(pos, new LongValue(resultSet.getLong(pos + 1))); - break; - case java.sql.Types.INTEGER: - record.setField(pos, new IntValue(resultSet.getInt(pos + 1))); - break; - case java.sql.Types.FLOAT: - record.setField(pos, new DoubleValue(resultSet.getDouble(pos + 1))); - break; - case java.sql.Types.REAL: - record.setField(pos, new FloatValue(resultSet.getFloat(pos + 1))); - break; - case java.sql.Types.DOUBLE: - record.setField(pos, new DoubleValue(resultSet.getDouble(pos + 1))); - break; - case java.sql.Types.DECIMAL: - record.setField(pos, new DoubleValue(resultSet.getBigDecimal(pos + 1).doubleValue())); - break; - case java.sql.Types.NUMERIC: - record.setField(pos, new DoubleValue(resultSet.getBigDecimal(pos + 1).doubleValue())); - break; - case java.sql.Types.DATE: - record.setField(pos, new StringValue(resultSet.getDate(pos + 1).toString())); - break; - case java.sql.Types.TIME: - record.setField(pos, new LongValue(resultSet.getTime(pos + 1).getTime())); - break; - case java.sql.Types.TIMESTAMP: - record.setField(pos, new StringValue(resultSet.getTimestamp(pos + 1).toString())); - break; - case java.sql.Types.SQLXML: - record.setField(pos, new StringValue(resultSet.getSQLXML(pos + 1).toString())); - break; - default: - throw new NotTransformableSQLFieldException("Unknown sql-type [" + type + "]on column [" + pos + "]"); - - // case java.sql.Types.BINARY: - // case java.sql.Types.VARBINARY: - // case java.sql.Types.LONGVARBINARY: - // case java.sql.Types.ARRAY: - // case java.sql.Types.JAVA_OBJECT: - // case java.sql.Types.BLOB: - // case java.sql.Types.CLOB: - // case java.sql.Types.NCLOB: - // case java.sql.Types.DATALINK: - // case java.sql.Types.DISTINCT: - // case java.sql.Types.OTHER: - // case java.sql.Types.REF: - // case java.sql.Types.ROWID: - // case java.sql.Types.STRUCT: - } - } - - private boolean isFieldNullOrEmpty(String field) { - return (field == null || field.length() == 0); - } - - private void prepareQueryExecution() throws SQLException { - setClassForDBType(); - prepareCredentialsAndExecute(); - } - - /** - * Loads appropriate JDBC driver. - * - * @param dbType - * Type of the database. - * @return boolean value, indication whether an appropriate driver could be - * found. - */ - private void setClassForDBType() { - try { - Class.forName(driverName); - } catch (ClassNotFoundException cnfe) { - throw new IllegalArgumentException("JDBC-Class not found:\t" + cnfe.getLocalizedMessage()); - } - } - - private void prepareCredentialsAndExecute() throws SQLException { - if (isFieldNullOrEmpty(username)) { - prepareConnection(dbURL); - } else { - prepareConnection(); - } - executeQuery(); - } - - /** - * Establishes a connection to a database. - * - * @param dbURL - * Assembled URL containing all connection parameters. - * @return boolean value, indicating whether a connection could be - * established - */ - private void prepareConnection(String dbURL) throws SQLException { - dbConn = DriverManager.getConnection(dbURL); - } - - /** - * Assembles the Database URL and establishes a connection. - * - * @param dbType - * Type of the database. - * @param username - * Login username. - * @param password - * Login password. - * @return boolean value, indicating whether a connection could be - * established - */ - private void prepareConnection() throws SQLException { - dbConn = DriverManager.getConnection(dbURL, username, password); - } - - private void executeQuery() throws SQLException { - statement = dbConn.createStatement(ResultSet.TYPE_SCROLL_INSENSITIVE, ResultSet.CONCUR_READ_ONLY); - resultSet = statement.executeQuery(this.query); - } - - /** - * Checks whether all data has been read. - * - * @return boolean value indication whether all data has been read. - */ - @Override - public boolean reachedEnd() { - try { - if (resultSet.isLast()) { - resultSet.close(); - statement.close(); - dbConn.close(); - return true; - } else { - return false; - } - } catch (SQLException e) { - throw new IllegalArgumentException("Couldn't evaluate reachedEnd():\t" + e.getMessage()); - } catch (NullPointerException e) { - throw new IllegalArgumentException("Couldn't access resultSet:\t" + e.getMessage()); - } - } - - /** - * Stores the next resultSet row in a Record - * - * @param record - * target Record - * @return boolean value indicating that the operation was successful - */ - @Override - public Record nextRecord(Record record) { - try { - resultSet.next(); - ResultSetMetaData rsmd = resultSet.getMetaData(); - int column_count = rsmd.getColumnCount(); - record.setNumFields(column_count); - - for (int pos = 0; pos < column_count; pos++) { - int type = rsmd.getColumnType(pos + 1); - retrieveTypeAndFillRecord(pos, type, record); - } - return record; - } catch (SQLException e) { - throw new IllegalArgumentException("Couldn't read data:\t" + e.getMessage()); - } catch (NotTransformableSQLFieldException e) { - throw new IllegalArgumentException("Couldn't read data because of unknown column sql-type:\t" - + e.getMessage()); - } catch (NullPointerException e) { - throw new IllegalArgumentException("Couldn't access resultSet:\t" + e.getMessage()); - } - } - - public static class NotTransformableSQLFieldException extends Exception { - - private static final long serialVersionUID = 1L; - - public NotTransformableSQLFieldException(String message) { - super(message); - } - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/06b37bf5/flink-staging/flink-jdbc/src/main/java/org/apache/flink/api/java/record/io/jdbc/JDBCOutputFormat.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-jdbc/src/main/java/org/apache/flink/api/java/record/io/jdbc/JDBCOutputFormat.java b/flink-staging/flink-jdbc/src/main/java/org/apache/flink/api/java/record/io/jdbc/JDBCOutputFormat.java deleted file mode 100644 index 780001a..0000000 --- a/flink-staging/flink-jdbc/src/main/java/org/apache/flink/api/java/record/io/jdbc/JDBCOutputFormat.java +++ /dev/null @@ -1,359 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.java.record.io.jdbc; - -import java.io.IOException; -import java.sql.Connection; -import java.sql.DriverManager; -import java.sql.PreparedStatement; -import java.sql.SQLException; - -import org.apache.flink.api.common.io.FileOutputFormat; -import org.apache.flink.api.common.io.OutputFormat; -import org.apache.flink.api.java.record.operators.GenericDataSink; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.types.BooleanValue; -import org.apache.flink.types.ByteValue; -import org.apache.flink.types.CharValue; -import org.apache.flink.types.DoubleValue; -import org.apache.flink.types.FloatValue; -import org.apache.flink.types.IntValue; -import org.apache.flink.types.LongValue; -import org.apache.flink.types.Record; -import org.apache.flink.types.ShortValue; -import org.apache.flink.types.StringValue; -import org.apache.flink.types.Value; - -public class JDBCOutputFormat implements OutputFormat<Record> { - private static final long serialVersionUID = 1L; - - private static final int DEFAULT_BATCH_INTERVERAL = 5000; - - public static final String DRIVER_KEY = "driver"; - public static final String USERNAME_KEY = "username"; - public static final String PASSWORD_KEY = "password"; - public static final String URL_KEY = "url"; - public static final String QUERY_KEY = "query"; - public static final String FIELD_COUNT_KEY = "fields"; - public static final String FIELD_TYPE_KEY = "type"; - public static final String BATCH_INTERVAL = "batchInt"; - - private Connection dbConn; - private PreparedStatement upload; - - private String username; - private String password; - private String driverName; - private String dbURL; - - private String query; - private int fieldCount; - private Class<? extends Value>[] fieldClasses; - - /** - * Variable indicating the current number of insert sets in a batch. - */ - private int batchCount = 0; - - /** - * Commit interval of batches. - * High batch interval: faster inserts, more memory required (reduce if OutOfMemoryExceptions occur) - * low batch interval: slower inserts, less memory. - */ - private int batchInterval = DEFAULT_BATCH_INTERVERAL; - - - /** - * Configures this JDBCOutputFormat. - * - * @param parameters - * Configuration containing all parameters. - */ - @Override - public void configure(Configuration parameters) { - this.driverName = parameters.getString(DRIVER_KEY, null); - this.username = parameters.getString(USERNAME_KEY, null); - this.password = parameters.getString(PASSWORD_KEY, null); - this.dbURL = parameters.getString(URL_KEY, null); - this.query = parameters.getString(QUERY_KEY, null); - this.fieldCount = parameters.getInteger(FIELD_COUNT_KEY, 0); - this.batchInterval = parameters.getInteger(BATCH_INTERVAL, DEFAULT_BATCH_INTERVERAL); - - @SuppressWarnings("unchecked") - Class<Value>[] classes = new Class[this.fieldCount]; - this.fieldClasses = classes; - - ClassLoader cl = getClass().getClassLoader(); - - try { - for (int i = 0; i < this.fieldCount; i++) { - Class<? extends Value> clazz = parameters.<Value>getClass(FIELD_TYPE_KEY + i, null, cl); - if (clazz == null) { - throw new IllegalArgumentException("Invalid configuration for JDBCOutputFormat: " - + "No type class for parameter " + i); - } - this.fieldClasses[i] = clazz; - } - } - catch (ClassNotFoundException e) { - throw new RuntimeException("Could not load data type classes.", e); - } - } - - /** - * Connects to the target database and initializes the prepared statement. - * - * @param taskNumber The number of the parallel instance. - * @throws IOException Thrown, if the output could not be opened due to an - * I/O problem. - */ - @Override - public void open(int taskNumber, int numTasks) throws IOException { - try { - establishConnection(); - upload = dbConn.prepareStatement(query); - } catch (SQLException sqe) { - throw new IllegalArgumentException("open() failed:\t!", sqe); - } catch (ClassNotFoundException cnfe) { - throw new IllegalArgumentException("JDBC-Class not found:\t", cnfe); - } - } - - private void establishConnection() throws SQLException, ClassNotFoundException { - Class.forName(driverName); - if (username == null) { - dbConn = DriverManager.getConnection(dbURL); - } else { - dbConn = DriverManager.getConnection(dbURL, username, password); - } - } - - /** - * Adds a record to the prepared statement. - * <p> - * When this method is called, the output format is guaranteed to be opened. - * - * @param record The records to add to the output. - * @throws IOException Thrown, if the records could not be added due to an - * I/O problem. - */ - - @Override - public void writeRecord(Record record) throws IOException { - try { - for (int x = 0; x < record.getNumFields(); x++) { - Value temp = record.getField(x, fieldClasses[x]); - addValue(x + 1, temp); - } - upload.addBatch(); - batchCount++; - if(batchCount >= batchInterval) { - upload.executeBatch(); - batchCount = 0; - } - } catch (SQLException sqe) { - throw new IllegalArgumentException("writeRecord() failed:\t", sqe); - } catch (IllegalArgumentException iae) { - throw new IllegalArgumentException("writeRecord() failed:\t", iae); - } - } - - private enum pactType { - BooleanValue, - ByteValue, - CharValue, - DoubleValue, - FloatValue, - IntValue, - LongValue, - ShortValue, - StringValue - } - - private void addValue(int index, Value value) throws SQLException { - pactType type; - try { - type = pactType.valueOf(value.getClass().getSimpleName()); - } catch (IllegalArgumentException iae) { - throw new IllegalArgumentException("PactType not supported:\t", iae); - } - switch (type) { - case BooleanValue: - upload.setBoolean(index, ((BooleanValue) value).getValue()); - break; - case ByteValue: - upload.setByte(index, ((ByteValue) value).getValue()); - break; - case CharValue: - upload.setString(index, String.valueOf(((CharValue) value).getValue())); - break; - case DoubleValue: - upload.setDouble(index, ((DoubleValue) value).getValue()); - break; - case FloatValue: - upload.setFloat(index, ((FloatValue) value).getValue()); - break; - case IntValue: - upload.setInt(index, ((IntValue) value).getValue()); - break; - case LongValue: - upload.setLong(index, ((LongValue) value).getValue()); - break; - case ShortValue: - upload.setShort(index, ((ShortValue) value).getValue()); - break; - case StringValue: - upload.setString(index, ((StringValue) value).getValue()); - break; - } - } - - /** - * Executes prepared statement and closes all resources of this instance. - * - * @throws IOException Thrown, if the input could not be closed properly. - */ - @Override - public void close() throws IOException { - try { - upload.executeBatch(); - batchCount = 0; - upload.close(); - dbConn.close(); - } catch (SQLException sqe) { - throw new IllegalArgumentException("close() failed:\t", sqe); - } - } - - /** - * Creates a configuration builder that can be used to set the - * output format's parameters to the config in a fluent fashion. - * - * @return A config builder for setting parameters. - */ - public static ConfigBuilder configureOutputFormat(GenericDataSink target) { - return new ConfigBuilder(target.getParameters()); - } - - /** - * Abstract builder used to set parameters to the output format's - * configuration in a fluent way. - */ - protected static abstract class AbstractConfigBuilder<T> - extends FileOutputFormat.AbstractConfigBuilder<T> { - - /** - * Creates a new builder for the given configuration. - * - * @param config The configuration into which the parameters will be written. - */ - protected AbstractConfigBuilder(Configuration config) { - super(config); - } - - /** - * Sets the query field. - * @param value value to be set. - * @return The builder itself. - */ - public T setQuery(String value) { - this.config.setString(QUERY_KEY, value); - @SuppressWarnings("unchecked") - T ret = (T) this; - return ret; - } - - /** - * Sets the url field. - * @param value value to be set. - * @return The builder itself. - */ - public T setUrl(String value) { - this.config.setString(URL_KEY, value); - @SuppressWarnings("unchecked") - T ret = (T) this; - return ret; - } - - /** - * Sets the username field. - * @param value value to be set. - * @return The builder itself. - */ - public T setUsername(String value) { - this.config.setString(USERNAME_KEY, value); - @SuppressWarnings("unchecked") - T ret = (T) this; - return ret; - } - - /** - * Sets the password field. - * @param value value to be set. - * @return The builder itself. - */ - public T setPassword(String value) { - this.config.setString(PASSWORD_KEY, value); - @SuppressWarnings("unchecked") - T ret = (T) this; - return ret; - } - - /** - * Sets the driver field. - * @param value value to be set. - * @return The builder itself. - */ - public T setDriver(String value) { - this.config.setString(DRIVER_KEY, value); - @SuppressWarnings("unchecked") - T ret = (T) this; - return ret; - } - - /** - * Sets the type of a column. - * Types are applied in the order they were set. - * @param type PactType to apply. - * @return The builder itself. - */ - public T setClass(Class<? extends Value> type) { - final int numYet = this.config.getInteger(FIELD_COUNT_KEY, 0); - this.config.setClass(FIELD_TYPE_KEY + numYet, type); - this.config.setInteger(FIELD_COUNT_KEY, numYet + 1); - @SuppressWarnings("unchecked") - T ret = (T) this; - return ret; - } - } - - /** - * A builder used to set parameters to the output format's configuration in a fluent way. - */ - public static final class ConfigBuilder extends AbstractConfigBuilder<ConfigBuilder> { - /** - * Creates a new builder for the given configuration. - * - * @param targetConfig The configuration into which the parameters will be written. - */ - protected ConfigBuilder(Configuration targetConfig) { - super(targetConfig); - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/06b37bf5/flink-staging/flink-jdbc/src/main/java/org/apache/flink/api/java/record/io/jdbc/example/JDBCExample.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-jdbc/src/main/java/org/apache/flink/api/java/record/io/jdbc/example/JDBCExample.java b/flink-staging/flink-jdbc/src/main/java/org/apache/flink/api/java/record/io/jdbc/example/JDBCExample.java deleted file mode 100644 index 213fd6a..0000000 --- a/flink-staging/flink-jdbc/src/main/java/org/apache/flink/api/java/record/io/jdbc/example/JDBCExample.java +++ /dev/null @@ -1,136 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.java.record.io.jdbc.example; - -import java.sql.Connection; -import java.sql.DriverManager; -import java.sql.Statement; - -import org.apache.flink.api.common.JobExecutionResult; -import org.apache.flink.api.common.Plan; -import org.apache.flink.api.common.Program; -import org.apache.flink.api.common.ProgramDescription; -import org.apache.flink.api.java.record.io.jdbc.JDBCInputFormat; -import org.apache.flink.api.java.record.io.jdbc.JDBCOutputFormat; -import org.apache.flink.api.java.record.operators.GenericDataSink; -import org.apache.flink.api.java.record.operators.GenericDataSource; -import org.apache.flink.client.LocalExecutor; -import org.apache.flink.types.FloatValue; -import org.apache.flink.types.IntValue; -import org.apache.flink.types.StringValue; - -/** - * Stand-alone example for the JDBC connector. - * - * NOTE: To run this example, you need the apache derby code in your classpath. - * See the Maven file (pom.xml) for a reference to the derby dependency. You can - * simply Change the scope of the Maven dependency from test to compile. - */ -public class JDBCExample implements Program, ProgramDescription { - - @Override - public Plan getPlan(String[] args) { - /* - * In this example we use the constructor where the url contains all the settings that are needed. - * You could also use the default constructor and deliver a Configuration with all the needed settings. - * You also could set the settings to the source-instance. - */ - GenericDataSource<JDBCInputFormat> source = new GenericDataSource<JDBCInputFormat>( - new JDBCInputFormat( - "org.apache.derby.jdbc.EmbeddedDriver", - "jdbc:derby:memory:ebookshop", - "select * from books"), - "Data Source"); - - GenericDataSink sink = new GenericDataSink(new JDBCOutputFormat(), "Data Output"); - JDBCOutputFormat.configureOutputFormat(sink) - .setDriver("org.apache.derby.jdbc.EmbeddedDriver") - .setUrl("jdbc:derby:memory:ebookshop") - .setQuery("insert into newbooks (id,title,author,price,qty) values (?,?,?,?,?)") - .setClass(IntValue.class) - .setClass(StringValue.class) - .setClass(StringValue.class) - .setClass(FloatValue.class) - .setClass(IntValue.class); - - sink.addInput(source); - return new Plan(sink, "JDBC Example Job"); - } - - @Override - public String getDescription() { - return "Parameter:"; - } - - /* - * To run this example, you need the apache derby code in your classpath! - */ - public static void main(String[] args) throws Exception { - - prepareTestDb(); - JDBCExample tut = new JDBCExample(); - JobExecutionResult res = LocalExecutor.execute(tut, args); - System.out.println("runtime: " + res.getNetRuntime() + " ms"); - - System.exit(0); - } - - private static void prepareTestDb() throws Exception { - String dbURL = "jdbc:derby:memory:ebookshop;create=true"; - Class.forName("org.apache.derby.jdbc.EmbeddedDriver"); - Connection conn = DriverManager.getConnection(dbURL); - - StringBuilder sqlQueryBuilder = new StringBuilder("CREATE TABLE books ("); - sqlQueryBuilder.append("id INT NOT NULL DEFAULT 0,"); - sqlQueryBuilder.append("title VARCHAR(50) DEFAULT NULL,"); - sqlQueryBuilder.append("author VARCHAR(50) DEFAULT NULL,"); - sqlQueryBuilder.append("price FLOAT DEFAULT NULL,"); - sqlQueryBuilder.append("qty INT DEFAULT NULL,"); - sqlQueryBuilder.append("PRIMARY KEY (id))"); - - Statement stat = conn.createStatement(); - stat.executeUpdate(sqlQueryBuilder.toString()); - stat.close(); - - sqlQueryBuilder = new StringBuilder("CREATE TABLE newbooks ("); - sqlQueryBuilder.append("id INT NOT NULL DEFAULT 0,"); - sqlQueryBuilder.append("title VARCHAR(50) DEFAULT NULL,"); - sqlQueryBuilder.append("author VARCHAR(50) DEFAULT NULL,"); - sqlQueryBuilder.append("price FLOAT DEFAULT NULL,"); - sqlQueryBuilder.append("qty INT DEFAULT NULL,"); - sqlQueryBuilder.append("PRIMARY KEY (id))"); - - stat = conn.createStatement(); - stat.executeUpdate(sqlQueryBuilder.toString()); - stat.close(); - - sqlQueryBuilder = new StringBuilder("INSERT INTO books (id, title, author, price, qty) VALUES "); - sqlQueryBuilder.append("(1001, 'Java for dummies', 'Tan Ah Teck', 11.11, 11),"); - sqlQueryBuilder.append("(1002, 'More Java for dummies', 'Tan Ah Teck', 22.22, 22),"); - sqlQueryBuilder.append("(1003, 'More Java for more dummies', 'Mohammad Ali', 33.33, 33),"); - sqlQueryBuilder.append("(1004, 'A Cup of Java', 'Kumar', 44.44, 44),"); - sqlQueryBuilder.append("(1005, 'A Teaspoon of Java', 'Kevin Jones', 55.55, 55)"); - - stat = conn.createStatement(); - stat.execute(sqlQueryBuilder.toString()); - stat.close(); - - conn.close(); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/06b37bf5/flink-staging/flink-jdbc/src/test/java/org/apache/flink/api/java/record/io/jdbc/DevNullLogStream.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-jdbc/src/test/java/org/apache/flink/api/java/record/io/jdbc/DevNullLogStream.java b/flink-staging/flink-jdbc/src/test/java/org/apache/flink/api/java/record/io/jdbc/DevNullLogStream.java deleted file mode 100644 index 172f585..0000000 --- a/flink-staging/flink-jdbc/src/test/java/org/apache/flink/api/java/record/io/jdbc/DevNullLogStream.java +++ /dev/null @@ -1,30 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.flink.api.java.record.io.jdbc; - -import java.io.OutputStream; - -public class DevNullLogStream { - - public static final OutputStream DEV_NULL = new OutputStream() { - public void write(int b) {} - }; - -} http://git-wip-us.apache.org/repos/asf/flink/blob/06b37bf5/flink-staging/flink-jdbc/src/test/java/org/apache/flink/api/java/record/io/jdbc/JDBCInputFormatTest.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-jdbc/src/test/java/org/apache/flink/api/java/record/io/jdbc/JDBCInputFormatTest.java b/flink-staging/flink-jdbc/src/test/java/org/apache/flink/api/java/record/io/jdbc/JDBCInputFormatTest.java deleted file mode 100644 index 8e0a2c5..0000000 --- a/flink-staging/flink-jdbc/src/test/java/org/apache/flink/api/java/record/io/jdbc/JDBCInputFormatTest.java +++ /dev/null @@ -1,214 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.api.java.record.io.jdbc; - -import java.io.IOException; -import java.sql.Connection; -import java.sql.DriverManager; -import java.sql.SQLException; -import java.sql.Statement; - -import org.junit.Assert; - -import org.apache.flink.api.java.record.io.jdbc.JDBCInputFormat; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.types.DoubleValue; -import org.apache.flink.types.IntValue; -import org.apache.flink.types.Record; -import org.apache.flink.types.StringValue; -import org.apache.flink.types.Value; -import org.junit.After; -import org.junit.BeforeClass; -import org.junit.Test; - -public class JDBCInputFormatTest { - JDBCInputFormat jdbcInputFormat; - Configuration config; - static Connection conn; - static final Value[][] dbData = { - {new IntValue(1001), new StringValue("Java for dummies"), new StringValue("Tan Ah Teck"), new DoubleValue(11.11), new IntValue(11)}, - {new IntValue(1002), new StringValue("More Java for dummies"), new StringValue("Tan Ah Teck"), new DoubleValue(22.22), new IntValue(22)}, - {new IntValue(1003), new StringValue("More Java for more dummies"), new StringValue("Mohammad Ali"), new DoubleValue(33.33), new IntValue(33)}, - {new IntValue(1004), new StringValue("A Cup of Java"), new StringValue("Kumar"), new DoubleValue(44.44), new IntValue(44)}, - {new IntValue(1005), new StringValue("A Teaspoon of Java"), new StringValue("Kevin Jones"), new DoubleValue(55.55), new IntValue(55)}}; - - @BeforeClass - public static void setUpClass() { - try { - prepareDerbyDatabase(); - } catch (Exception e) { - e.printStackTrace(); - Assert.fail(); - } - } - - private static void prepareDerbyDatabase() throws ClassNotFoundException { - System.setProperty("derby.stream.error.field","org.apache.flink.api.java.record.io.jdbc.DevNullLogStream.DEV_NULL"); - String dbURL = "jdbc:derby:memory:ebookshop;create=true"; - createConnection(dbURL); - } - - private static void cleanUpDerbyDatabases() { - try { - String dbURL = "jdbc:derby:memory:ebookshop;create=true"; - Class.forName("org.apache.derby.jdbc.EmbeddedDriver"); - conn = DriverManager.getConnection(dbURL); - Statement stat = conn.createStatement(); - stat.executeUpdate("DROP TABLE books"); - stat.close(); - conn.close(); - } catch (Exception e) { - e.printStackTrace(); - Assert.fail(); - } - } - - /* - Loads JDBC derby driver ; creates(if necessary) and populates database. - */ - private static void createConnection(String dbURL) { - try { - Class.forName("org.apache.derby.jdbc.EmbeddedDriver"); - conn = DriverManager.getConnection(dbURL); - createTable(); - insertDataToSQLTables(); - conn.close(); - } catch (Exception e) { - e.printStackTrace(); - Assert.fail(); - } - } - - private static void createTable() throws SQLException { - StringBuilder sqlQueryBuilder = new StringBuilder("CREATE TABLE books ("); - sqlQueryBuilder.append("id INT NOT NULL DEFAULT 0,"); - sqlQueryBuilder.append("title VARCHAR(50) DEFAULT NULL,"); - sqlQueryBuilder.append("author VARCHAR(50) DEFAULT NULL,"); - sqlQueryBuilder.append("price FLOAT DEFAULT NULL,"); - sqlQueryBuilder.append("qty INT DEFAULT NULL,"); - sqlQueryBuilder.append("PRIMARY KEY (id))"); - - Statement stat = conn.createStatement(); - stat.executeUpdate(sqlQueryBuilder.toString()); - stat.close(); - - sqlQueryBuilder = new StringBuilder("CREATE TABLE bookscontent ("); - sqlQueryBuilder.append("id INT NOT NULL DEFAULT 0,"); - sqlQueryBuilder.append("title VARCHAR(50) DEFAULT NULL,"); - sqlQueryBuilder.append("content BLOB(10K) DEFAULT NULL,"); - sqlQueryBuilder.append("PRIMARY KEY (id))"); - - stat = conn.createStatement(); - stat.executeUpdate(sqlQueryBuilder.toString()); - stat.close(); - } - - private static void insertDataToSQLTables() throws SQLException { - StringBuilder sqlQueryBuilder = new StringBuilder("INSERT INTO books (id, title, author, price, qty) VALUES "); - sqlQueryBuilder.append("(1001, 'Java for dummies', 'Tan Ah Teck', 11.11, 11),"); - sqlQueryBuilder.append("(1002, 'More Java for dummies', 'Tan Ah Teck', 22.22, 22),"); - sqlQueryBuilder.append("(1003, 'More Java for more dummies', 'Mohammad Ali', 33.33, 33),"); - sqlQueryBuilder.append("(1004, 'A Cup of Java', 'Kumar', 44.44, 44),"); - sqlQueryBuilder.append("(1005, 'A Teaspoon of Java', 'Kevin Jones', 55.55, 55)"); - - Statement stat = conn.createStatement(); - stat.execute(sqlQueryBuilder.toString()); - stat.close(); - - sqlQueryBuilder = new StringBuilder("INSERT INTO bookscontent (id, title, content) VALUES "); - sqlQueryBuilder.append("(1001, 'Java for dummies', CAST(X'7f454c4602' AS BLOB)),"); - sqlQueryBuilder.append("(1002, 'More Java for dummies', CAST(X'7f454c4602' AS BLOB)),"); - sqlQueryBuilder.append("(1003, 'More Java for more dummies', CAST(X'7f454c4602' AS BLOB)),"); - sqlQueryBuilder.append("(1004, 'A Cup of Java', CAST(X'7f454c4602' AS BLOB)),"); - sqlQueryBuilder.append("(1005, 'A Teaspoon of Java', CAST(X'7f454c4602' AS BLOB))"); - - stat = conn.createStatement(); - stat.execute(sqlQueryBuilder.toString()); - stat.close(); - } - - - @After - public void tearDown() { - jdbcInputFormat = null; - } - - @Test(expected = IllegalArgumentException.class) - public void testInvalidConnection() { - jdbcInputFormat = new JDBCInputFormat("org.apache.derby.jdbc.EmbeddedDriver", "jdbc:derby:memory:idontexist", "select * from books;"); - jdbcInputFormat.configure(null); - } - - @Test(expected = IllegalArgumentException.class) - public void testInvalidQuery() { - jdbcInputFormat = new JDBCInputFormat("org.apache.derby.jdbc.EmbeddedDriver", "jdbc:derby:memory:ebookshop", "abc"); - jdbcInputFormat.configure(null); - } - - @Test(expected = IllegalArgumentException.class) - public void testInvalidDBType() { - jdbcInputFormat = new JDBCInputFormat("idontexist.Driver", "jdbc:derby:memory:ebookshop", "select * from books;"); - jdbcInputFormat.configure(null); - } - - @Test(expected = IllegalArgumentException.class) - public void testUnsupportedSQLType() { - jdbcInputFormat = new JDBCInputFormat("org.apache.derby.jdbc.EmbeddedDriver", "jdbc:derby:memory:ebookshop", "select * from bookscontent"); - jdbcInputFormat.configure(null); - jdbcInputFormat.nextRecord(new Record()); - } - - @Test(expected = IllegalArgumentException.class) - public void testNotConfiguredFormatNext() { - jdbcInputFormat = new JDBCInputFormat("org.apache.derby.jdbc.EmbeddedDriver", "jdbc:derby:memory:ebookshop", "select * from books"); - jdbcInputFormat.nextRecord(new Record()); - } - - @Test(expected = IllegalArgumentException.class) - public void testNotConfiguredFormatEnd() { - jdbcInputFormat = new JDBCInputFormat("org.apache.derby.jdbc.EmbeddedDriver", "jdbc:derby:memory:ebookshop", "select * from books"); - jdbcInputFormat.reachedEnd(); - } - - @Test - public void testJDBCInputFormat() throws IOException { - jdbcInputFormat = new JDBCInputFormat("org.apache.derby.jdbc.EmbeddedDriver", "jdbc:derby:memory:ebookshop", "select * from books"); - jdbcInputFormat.configure(null); - Record record = new Record(); - int recordCount = 0; - while (!jdbcInputFormat.reachedEnd()) { - jdbcInputFormat.nextRecord(record); - Assert.assertEquals(5, record.getNumFields()); - Assert.assertEquals("Field 0 should be int", IntValue.class, record.getField(0, IntValue.class).getClass()); - Assert.assertEquals("Field 1 should be String", StringValue.class, record.getField(1, StringValue.class).getClass()); - Assert.assertEquals("Field 2 should be String", StringValue.class, record.getField(2, StringValue.class).getClass()); - Assert.assertEquals("Field 3 should be float", DoubleValue.class, record.getField(3, DoubleValue.class).getClass()); - Assert.assertEquals("Field 4 should be int", IntValue.class, record.getField(4, IntValue.class).getClass()); - - int[] pos = {0, 1, 2, 3, 4}; - Value[] values = {new IntValue(), new StringValue(), new StringValue(), new DoubleValue(), new IntValue()}; - Assert.assertTrue(record.equalsFields(pos, dbData[recordCount], values)); - - recordCount++; - } - Assert.assertEquals(5, recordCount); - - cleanUpDerbyDatabases(); - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/06b37bf5/flink-staging/flink-jdbc/src/test/java/org/apache/flink/api/java/record/io/jdbc/JDBCOutputFormatTest.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-jdbc/src/test/java/org/apache/flink/api/java/record/io/jdbc/JDBCOutputFormatTest.java b/flink-staging/flink-jdbc/src/test/java/org/apache/flink/api/java/record/io/jdbc/JDBCOutputFormatTest.java deleted file mode 100644 index c824ea1..0000000 --- a/flink-staging/flink-jdbc/src/test/java/org/apache/flink/api/java/record/io/jdbc/JDBCOutputFormatTest.java +++ /dev/null @@ -1,225 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.java.record.io.jdbc; - -import java.io.IOException; -import java.sql.Connection; -import java.sql.DriverManager; -import java.sql.SQLException; -import java.sql.Statement; - -import org.junit.Assert; - -import org.apache.flink.configuration.Configuration; -import org.apache.flink.types.DoubleValue; -import org.apache.flink.types.FloatValue; -import org.apache.flink.types.IntValue; -import org.apache.flink.types.Record; -import org.apache.flink.types.StringValue; -import org.apache.flink.types.Value; -import org.junit.After; -import org.junit.BeforeClass; -import org.junit.Test; - -public class JDBCOutputFormatTest { - private JDBCInputFormat jdbcInputFormat; - private JDBCOutputFormat jdbcOutputFormat; - - private static Connection conn; - - static final Value[][] dbData = { - {new IntValue(1001), new StringValue("Java for dummies"), new StringValue("Tan Ah Teck"), new DoubleValue(11.11), new IntValue(11)}, - {new IntValue(1002), new StringValue("More Java for dummies"), new StringValue("Tan Ah Teck"), new DoubleValue(22.22), new IntValue(22)}, - {new IntValue(1003), new StringValue("More Java for more dummies"), new StringValue("Mohammad Ali"), new DoubleValue(33.33), new IntValue(33)}, - {new IntValue(1004), new StringValue("A Cup of Java"), new StringValue("Kumar"), new DoubleValue(44.44), new IntValue(44)}, - {new IntValue(1005), new StringValue("A Teaspoon of Java"), new StringValue("Kevin Jones"), new DoubleValue(55.55), new IntValue(55)}}; - - @BeforeClass - public static void setUpClass() { - try { - System.setProperty("derby.stream.error.field", "org.apache.flink.api.java.record.io.jdbc.DevNullLogStream.DEV_NULL"); - prepareDerbyInputDatabase(); - prepareDerbyOutputDatabase(); - } catch (ClassNotFoundException e) { - e.printStackTrace(); - Assert.fail(); - } - } - - private static void cleanUpDerbyDatabases() { - try { - String dbURL = "jdbc:derby:memory:ebookshop;create=true"; - Class.forName("org.apache.derby.jdbc.EmbeddedDriver"); - conn = DriverManager.getConnection(dbURL); - Statement stat = conn.createStatement(); - stat.executeUpdate("DROP TABLE books"); - stat.executeUpdate("DROP TABLE newbooks"); - stat.close(); - conn.close(); - } catch (Exception e) { - e.printStackTrace(); - Assert.fail(); - } - } - - private static void prepareDerbyInputDatabase() throws ClassNotFoundException { - try { - String dbURL = "jdbc:derby:memory:ebookshop;create=true"; - Class.forName("org.apache.derby.jdbc.EmbeddedDriver"); - conn = DriverManager.getConnection(dbURL); - createTableBooks(); - insertDataToSQLTables(); - conn.close(); - } catch (ClassNotFoundException e) { - e.printStackTrace(); - Assert.fail(); - } catch (SQLException e) { - e.printStackTrace(); - Assert.fail(); - } - } - - private static void prepareDerbyOutputDatabase() throws ClassNotFoundException { - try { - String dbURL = "jdbc:derby:memory:ebookshop;create=true"; - Class.forName("org.apache.derby.jdbc.EmbeddedDriver"); - conn = DriverManager.getConnection(dbURL); - createTableNewBooks(); - conn.close(); - } catch (ClassNotFoundException e) { - e.printStackTrace(); - Assert.fail(); - } catch (SQLException e) { - e.printStackTrace(); - Assert.fail(); - } - } - - private static void createTableBooks() throws SQLException { - StringBuilder sqlQueryBuilder = new StringBuilder("CREATE TABLE books ("); - sqlQueryBuilder.append("id INT NOT NULL DEFAULT 0,"); - sqlQueryBuilder.append("title VARCHAR(50) DEFAULT NULL,"); - sqlQueryBuilder.append("author VARCHAR(50) DEFAULT NULL,"); - sqlQueryBuilder.append("price FLOAT DEFAULT NULL,"); - sqlQueryBuilder.append("qty INT DEFAULT NULL,"); - sqlQueryBuilder.append("PRIMARY KEY (id))"); - - Statement stat = conn.createStatement(); - stat.executeUpdate(sqlQueryBuilder.toString()); - stat.close(); - } - - private static void createTableNewBooks() throws SQLException { - StringBuilder sqlQueryBuilder = new StringBuilder("CREATE TABLE newbooks ("); - sqlQueryBuilder.append("id INT NOT NULL DEFAULT 0,"); - sqlQueryBuilder.append("title VARCHAR(50) DEFAULT NULL,"); - sqlQueryBuilder.append("author VARCHAR(50) DEFAULT NULL,"); - sqlQueryBuilder.append("price FLOAT DEFAULT NULL,"); - sqlQueryBuilder.append("qty INT DEFAULT NULL,"); - sqlQueryBuilder.append("PRIMARY KEY (id))"); - - Statement stat = conn.createStatement(); - stat.executeUpdate(sqlQueryBuilder.toString()); - stat.close(); - } - - private static void insertDataToSQLTables() throws SQLException { - StringBuilder sqlQueryBuilder = new StringBuilder("INSERT INTO books (id, title, author, price, qty) VALUES "); - sqlQueryBuilder.append("(1001, 'Java for dummies', 'Tan Ah Teck', 11.11, 11),"); - sqlQueryBuilder.append("(1002, 'More Java for dummies', 'Tan Ah Teck', 22.22, 22),"); - sqlQueryBuilder.append("(1003, 'More Java for more dummies', 'Mohammad Ali', 33.33, 33),"); - sqlQueryBuilder.append("(1004, 'A Cup of Java', 'Kumar', 44.44, 44),"); - sqlQueryBuilder.append("(1005, 'A Teaspoon of Java', 'Kevin Jones', 55.55, 55)"); - - Statement stat = conn.createStatement(); - stat.execute(sqlQueryBuilder.toString()); - stat.close(); - } - - - @After - public void tearDown() { - jdbcOutputFormat = null; - cleanUpDerbyDatabases(); - } - - @Test - public void testJDBCOutputFormat() throws IOException { - String sourceTable = "books"; - String targetTable = "newbooks"; - String driverPath = "org.apache.derby.jdbc.EmbeddedDriver"; - String dbUrl = "jdbc:derby:memory:ebookshop"; - - Configuration cfg = new Configuration(); - cfg.setString("driver", driverPath); - cfg.setString("url", dbUrl); - cfg.setString("query", "insert into " + targetTable + " (id, title, author, price, qty) values (?,?,?,?,?)"); - cfg.setInteger("fields", 5); - cfg.setClass("type0", IntValue.class); - cfg.setClass("type1", StringValue.class); - cfg.setClass("type2", StringValue.class); - cfg.setClass("type3", FloatValue.class); - cfg.setClass("type4", IntValue.class); - - jdbcOutputFormat = new JDBCOutputFormat(); - jdbcOutputFormat.configure(cfg); - jdbcOutputFormat.open(0,1); - - jdbcInputFormat = new JDBCInputFormat( - driverPath, - dbUrl, - "select * from " + sourceTable); - jdbcInputFormat.configure(null); - - Record record = new Record(); - while (!jdbcInputFormat.reachedEnd()) { - jdbcInputFormat.nextRecord(record); - jdbcOutputFormat.writeRecord(record); - } - - jdbcOutputFormat.close(); - jdbcInputFormat.close(); - - jdbcInputFormat = new JDBCInputFormat( - driverPath, - dbUrl, - "select * from " + targetTable); - jdbcInputFormat.configure(null); - - int recordCount = 0; - while (!jdbcInputFormat.reachedEnd()) { - jdbcInputFormat.nextRecord(record); - Assert.assertEquals(5, record.getNumFields()); - Assert.assertEquals("Field 0 should be int", IntValue.class, record.getField(0, IntValue.class).getClass()); - Assert.assertEquals("Field 1 should be String", StringValue.class, record.getField(1, StringValue.class).getClass()); - Assert.assertEquals("Field 2 should be String", StringValue.class, record.getField(2, StringValue.class).getClass()); - Assert.assertEquals("Field 3 should be float", DoubleValue.class, record.getField(3, DoubleValue.class).getClass()); - Assert.assertEquals("Field 4 should be int", IntValue.class, record.getField(4, IntValue.class).getClass()); - - int[] pos = {0, 1, 2, 3, 4}; - Value[] values = {new IntValue(), new StringValue(), new StringValue(), new DoubleValue(), new IntValue()}; - Assert.assertTrue(record.equalsFields(pos, dbData[recordCount], values)); - - recordCount++; - } - Assert.assertEquals(5, recordCount); - - jdbcInputFormat.close(); - } -}