SQOOP-1496: Sqoop2: Revisit/Refactor the SubmissionEngine/ExecutionEngine APIs
Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/3d539dd4 Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/3d539dd4 Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/3d539dd4 Branch: refs/heads/sqoop2 Commit: 3d539dd4d7477324dfe62a4e57f684351769b000 Parents: af25bcc Author: Abraham Elmahrek <abra...@elmahrek.com> Authored: Fri Sep 19 16:24:59 2014 -0700 Committer: Abraham Elmahrek <abra...@elmahrek.com> Committed: Thu Oct 9 17:58:18 2014 -0700 ---------------------------------------------------------------------- .../org/apache/sqoop/json/SubmissionBean.java | 20 +- .../org/apache/sqoop/model/MSubmission.java | 31 +- .../apache/sqoop/json/TestSubmissionBean.java | 12 +- .../sqoop/connector/jdbc/TestToInitializer.java | 1 - .../idf/CSVIntermediateDataFormat.java | 4 - .../connector/idf/IntermediateDataFormat.java | 4 - .../idf/CSVIntermediateDataFormatTest.java | 222 ------ .../idf/TestCSVIntermediateDataFormat.java | 222 ++++++ .../sqoop/connector/ConnectorManager.java | 5 +- .../apache/sqoop/framework/ExecutionEngine.java | 20 +- .../org/apache/sqoop/framework/JobManager.java | 447 ++++++------ .../org/apache/sqoop/framework/JobRequest.java | 356 ++++++++++ .../sqoop/framework/SubmissionEngine.java | 7 +- .../sqoop/framework/SubmissionRequest.java | 361 ---------- .../sqoop/framework/TestFrameworkValidator.java | 182 +++-- .../apache/sqoop/framework/TestJobManager.java | 173 +++++ .../apache/sqoop/framework/TestJobRequest.java | 71 ++ .../sqoop/framework/TestSubmissionRequest.java | 71 -- .../sqoop/repository/TestJdbcRepository.java | 694 +++++++++---------- .../sqoop/execution/mapreduce/MRJobRequest.java | 102 +++ .../mapreduce/MRSubmissionRequest.java | 102 --- .../mapreduce/MapreduceExecutionEngine.java | 51 +- .../apache/sqoop/job/mr/ConfigurationUtils.java | 3 - .../apache/sqoop/job/mr/ProgressRunnable.java | 4 +- .../sqoop/job/mr/SqoopDestroyerExecutor.java | 7 +- .../sqoop/job/mr/SqoopFileOutputFormat.java | 4 +- .../org/apache/sqoop/job/mr/SqoopMapper.java | 50 +- .../job/mr/SqoopOutputFormatLoadExecutor.java | 28 +- .../sqoop/shell/utils/SubmissionDisplayer.java | 8 +- .../org/apache/sqoop/job/etl/CallbackBase.java | 49 -- .../java/org/apache/sqoop/job/etl/From.java | 2 +- .../main/java/org/apache/sqoop/job/etl/To.java | 2 +- .../org/apache/sqoop/job/etl/Transferable.java | 51 ++ .../org/apache/sqoop/validation/Validator.java | 1 - .../mapreduce/MapreduceSubmissionEngine.java | 12 +- 35 files changed, 1780 insertions(+), 1599 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/sqoop/blob/3d539dd4/common/src/main/java/org/apache/sqoop/json/SubmissionBean.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/sqoop/json/SubmissionBean.java b/common/src/main/java/org/apache/sqoop/json/SubmissionBean.java index 61d6576..9b1ae74 100644 --- a/common/src/main/java/org/apache/sqoop/json/SubmissionBean.java +++ b/common/src/main/java/org/apache/sqoop/json/SubmissionBean.java @@ -52,8 +52,8 @@ public class SubmissionBean implements JsonBean { private static final String EXCEPTION_TRACE = "exception-trace"; private static final String PROGRESS = "progress"; private static final String COUNTERS = "counters"; - private static final String CONNECTOR_SCHEMA = "schema-connector"; - private static final String HIO_SCHEMA = "schema-hio"; + private static final String FROM_SCHEMA = "schema-from"; + private static final String TO_SCHEMA = "schema-to"; private List<MSubmission> submissions; @@ -116,11 +116,11 @@ public class SubmissionBean implements JsonBean { if(submission.getCounters() != null) { object.put(COUNTERS, extractCounters(submission.getCounters())); } - if(submission.getConnectorSchema() != null) { - object.put(CONNECTOR_SCHEMA, extractSchema(submission.getConnectorSchema())); + if(submission.getFromSchema() != null) { + object.put(FROM_SCHEMA, extractSchema(submission.getFromSchema())); } - if(submission.getHioSchema() != null) { - object.put(HIO_SCHEMA, extractSchema(submission.getHioSchema())); + if(submission.getToSchema() != null) { + object.put(TO_SCHEMA, extractSchema(submission.getToSchema())); } array.add(object); @@ -188,11 +188,11 @@ public class SubmissionBean implements JsonBean { if(object.containsKey(COUNTERS)) { submission.setCounters(restoreCounters((JSONObject) object.get(COUNTERS))); } - if(object.containsKey(CONNECTOR_SCHEMA)) { - submission.setConnectorSchema(restoreSchemna((JSONObject) object.get(CONNECTOR_SCHEMA))); + if(object.containsKey(FROM_SCHEMA)) { + submission.setFromSchema(restoreSchemna((JSONObject) object.get(FROM_SCHEMA))); } - if(object.containsKey(HIO_SCHEMA)) { - submission.setHioSchema(restoreSchemna((JSONObject) object.get(HIO_SCHEMA))); + if(object.containsKey(TO_SCHEMA)) { + submission.setToSchema(restoreSchemna((JSONObject) object.get(TO_SCHEMA))); } this.submissions.add(submission); http://git-wip-us.apache.org/repos/asf/sqoop/blob/3d539dd4/common/src/main/java/org/apache/sqoop/model/MSubmission.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/sqoop/model/MSubmission.java b/common/src/main/java/org/apache/sqoop/model/MSubmission.java index 1edd6ee..ca21135 100644 --- a/common/src/main/java/org/apache/sqoop/model/MSubmission.java +++ b/common/src/main/java/org/apache/sqoop/model/MSubmission.java @@ -100,20 +100,21 @@ public class MSubmission extends MAccountableEntity { String exceptionStackTrace; /** - * Schema that was reported by the connector. + * Schema for the FROM part of the job submission * * This property is required. */ - Schema connectorSchema; + Schema fromSchema; /** + * Schema for the TO part of the job submission * Optional schema that reported by the underlying I/O implementation. Please - * note that this property might be empty and in such case the connector - * schema will use also on Hadoop I/O side. + * note that this property might be empty and in such case use the FROM schema + * on the TO side. * * This property is optional. */ - Schema hioSchema; + Schema toSchema; public MSubmission() { status = SubmissionStatus.UNKNOWN; @@ -219,20 +220,20 @@ public class MSubmission extends MAccountableEntity { this.setExceptionStackTrace(writer.toString()); } - public Schema getConnectorSchema() { - return connectorSchema; + public Schema getFromSchema() { + return fromSchema; } - public void setConnectorSchema(Schema connectorSchema) { - this.connectorSchema = connectorSchema; + public void setFromSchema(Schema connectorSchema) { + this.fromSchema = connectorSchema; } - public Schema getHioSchema() { - return hioSchema; + public Schema getToSchema() { + return toSchema; } - public void setHioSchema(Schema hioSchema) { - this.hioSchema = hioSchema; + public void setToSchema(Schema hioSchema) { + this.toSchema = hioSchema; } @Override @@ -248,8 +249,8 @@ public class MSubmission extends MAccountableEntity { ", externalLink='" + externalLink + '\'' + ", exceptionInfo='" + exceptionInfo + '\'' + ", exceptionStackTrace='" + exceptionStackTrace + '\'' + - ", connectorSchema='" + connectorSchema + '\'' + - ", hioSchema='" + hioSchema + '\'' + + ", fromSchema='" + fromSchema + '\'' + + ", toSchema='" + toSchema + '\'' + '}'; } http://git-wip-us.apache.org/repos/asf/sqoop/blob/3d539dd4/common/src/test/java/org/apache/sqoop/json/TestSubmissionBean.java ---------------------------------------------------------------------- diff --git a/common/src/test/java/org/apache/sqoop/json/TestSubmissionBean.java b/common/src/test/java/org/apache/sqoop/json/TestSubmissionBean.java index d87655e..518c9cb 100644 --- a/common/src/test/java/org/apache/sqoop/json/TestSubmissionBean.java +++ b/common/src/test/java/org/apache/sqoop/json/TestSubmissionBean.java @@ -405,20 +405,20 @@ public class TestSubmissionBean extends TestCase { assertEquals(222222, counter.getValue()); } - public void testTransferConnectorSchema() { + public void testTransferFromSchema() { MSubmission source = new MSubmission(); - source.setConnectorSchema(getSchema()); + source.setFromSchema(getSchema()); - Schema target = transfer(source).getConnectorSchema(); + Schema target = transfer(source).getFromSchema(); assertNotNull(target); assertEquals(getSchema(), target); } - public void testTransferHioSchema() { + public void testTransferToSchema() { MSubmission source = new MSubmission(); - source.setHioSchema(getSchema()); + source.setToSchema(getSchema()); - Schema target = transfer(source).getHioSchema(); + Schema target = transfer(source).getToSchema(); assertNotNull(target); assertEquals(getSchema(), target); } http://git-wip-us.apache.org/repos/asf/sqoop/blob/3d539dd4/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestToInitializer.java ---------------------------------------------------------------------- diff --git a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestToInitializer.java b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestToInitializer.java index eb6fcf1..4767215 100644 --- a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestToInitializer.java +++ b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestToInitializer.java @@ -26,7 +26,6 @@ import org.apache.sqoop.connector.jdbc.configuration.ToJobConfiguration; import org.apache.sqoop.job.etl.Initializer; import org.apache.sqoop.job.etl.InitializerContext; import org.apache.sqoop.validation.Status; -import org.apache.sqoop.validation.Validation; import org.apache.sqoop.validation.ValidationResult; import org.apache.sqoop.validation.ValidationRunner; http://git-wip-us.apache.org/repos/asf/sqoop/blob/3d539dd4/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormat.java ---------------------------------------------------------------------- diff --git a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormat.java b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormat.java index 1e8ab52..df5cb9c 100644 --- a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormat.java +++ b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormat.java @@ -21,7 +21,6 @@ package org.apache.sqoop.connector.idf; import com.google.common.annotations.VisibleForTesting; import org.apache.commons.lang.StringUtils; -import org.apache.log4j.Logger; import org.apache.sqoop.common.SqoopException; import org.apache.sqoop.schema.Schema; import org.apache.sqoop.schema.type.Column; @@ -38,7 +37,6 @@ import java.util.ArrayList; import java.util.LinkedList; import java.util.List; import java.util.regex.Matcher; -import java.util.regex.Pattern; public class CSVIntermediateDataFormat extends IntermediateDataFormat<String> { @@ -46,8 +44,6 @@ public class CSVIntermediateDataFormat extends IntermediateDataFormat<String> { public static final char ESCAPE_CHARACTER = '\\'; public static final char QUOTE_CHARACTER = '\''; - private static final Logger LOG = Logger.getLogger - (CSVIntermediateDataFormat.class); private static final char[] originals = { 0x5C,0x00,0x0A,0x0D,0x1A,0x22,0x27 http://git-wip-us.apache.org/repos/asf/sqoop/blob/3d539dd4/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/IntermediateDataFormat.java ---------------------------------------------------------------------- diff --git a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/IntermediateDataFormat.java b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/IntermediateDataFormat.java index 91b594e..66d46a3 100644 --- a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/IntermediateDataFormat.java +++ b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/IntermediateDataFormat.java @@ -19,14 +19,10 @@ package org.apache.sqoop.connector.idf; import org.apache.sqoop.schema.Schema; -import org.apache.sqoop.schema.type.Column; -import org.apache.sqoop.schema.type.Type; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; -import java.util.ArrayList; -import java.util.List; /** * Abstract class representing a pluggable intermediate data format the Sqoop http://git-wip-us.apache.org/repos/asf/sqoop/blob/3d539dd4/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormatTest.java ---------------------------------------------------------------------- diff --git a/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormatTest.java b/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormatTest.java deleted file mode 100644 index df6d30f..0000000 --- a/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormatTest.java +++ /dev/null @@ -1,222 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.sqoop.connector.idf; - -import org.apache.sqoop.common.SqoopException; -import org.apache.sqoop.schema.Schema; -import org.apache.sqoop.schema.type.Binary; -import org.apache.sqoop.schema.type.FixedPoint; -import org.apache.sqoop.schema.type.Text; -import org.junit.Before; -import org.junit.Test; - -import java.io.UnsupportedEncodingException; -import java.util.Arrays; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; - -public class CSVIntermediateDataFormatTest { - - private final String BYTE_FIELD_ENCODING = "ISO-8859-1"; - - private IntermediateDataFormat<?> data; - - @Before - public void setUp() { - data = new CSVIntermediateDataFormat(); - } - - private String getByteFieldString(byte[] byteFieldData) { - try { - return new StringBuilder("'").append(new String(byteFieldData, BYTE_FIELD_ENCODING)).append("'").toString(); - } catch(UnsupportedEncodingException e) { - // Should never get to this point because ISO-8859-1 is a standard codec. - return null; - } - } - - @Test - public void testStringInStringOut() { - String testData = "10,34,'54','random data'," + getByteFieldString(new byte[] { (byte) -112, (byte) 54}) - + ",'" + String.valueOf(0x0A) + "'"; - data.setTextData(testData); - assertEquals(testData, data.getTextData()); - } - - @Test - public void testNullStringInObjectOut() { - Schema schema = new Schema("test"); - schema.addColumn(new FixedPoint("1")) - .addColumn(new FixedPoint("2")) - .addColumn(new Text("3")) - .addColumn(new Text("4")) - .addColumn(new Binary("5")) - .addColumn(new Text("6")); - data.setSchema(schema); - data.setTextData(null); - - Object[] out = data.getObjectData(); - - assertNull(out); - } - - @Test(expected=SqoopException.class) - public void testEmptyStringInObjectOut() { - Schema schema = new Schema("test"); - schema.addColumn(new FixedPoint("1")) - .addColumn(new FixedPoint("2")) - .addColumn(new Text("3")) - .addColumn(new Text("4")) - .addColumn(new Binary("5")) - .addColumn(new Text("6")); - data.setSchema(schema); - data.setTextData(""); - - data.getObjectData(); - } - - @Test - public void testStringInObjectOut() { - - //byte[0] = -112, byte[1] = 54 - 2's complements - String testData = "10,34,'54','random data'," + getByteFieldString(new byte[] { (byte) -112, (byte) 54}) - + ",'\\n'"; - Schema schema = new Schema("test"); - schema.addColumn(new FixedPoint("1")) - .addColumn(new FixedPoint("2")) - .addColumn(new Text("3")) - .addColumn(new Text("4")) - .addColumn(new Binary("5")) - .addColumn(new Text("6")); - data.setSchema(schema); - data.setTextData(testData); - - Object[] out = data.getObjectData(); - - assertEquals(new Long(10),out[0]); - assertEquals(new Long(34),out[1]); - assertEquals("54",out[2]); - assertEquals("random data",out[3]); - assertEquals(-112, ((byte[])out[4])[0]); - assertEquals(54, ((byte[])out[4])[1]); - assertEquals("\n", out[5].toString()); - } - - @Test - public void testObjectInStringOut() { - Schema schema = new Schema("test"); - schema.addColumn(new FixedPoint("1")) - .addColumn(new FixedPoint("2")) - .addColumn(new Text("3")) - .addColumn(new Text("4")) - .addColumn(new Binary("5")) - .addColumn(new Text("6")); - data.setSchema(schema); - - byte[] byteFieldData = new byte[] { (byte) 0x0D, (byte) -112, (byte) 54}; - Object[] in = new Object[6]; - in[0] = new Long(10); - in[1] = new Long(34); - in[2] = "54"; - in[3] = "random data"; - in[4] = byteFieldData; - in[5] = new String(new char[] { 0x0A }); - - data.setObjectData(in); - - //byte[0] = \r byte[1] = -112, byte[1] = 54 - 2's complements - String testData = "10,34,'54','random data'," + - getByteFieldString(byteFieldData).replaceAll("\r", "\\\\r") + ",'\\n'"; - assertEquals(testData, data.getTextData()); - } - - @Test - public void testObjectInObjectOut() { - //Test escapable sequences too. - //byte[0] = -112, byte[1] = 54 - 2's complements - Schema schema = new Schema("test"); - schema.addColumn(new FixedPoint("1")) - .addColumn(new FixedPoint("2")) - .addColumn(new Text("3")) - .addColumn(new Text("4")) - .addColumn(new Binary("5")) - .addColumn(new Text("6")); - data.setSchema(schema); - - Object[] in = new Object[6]; - in[0] = new Long(10); - in[1] = new Long(34); - in[2] = "54"; - in[3] = "random data"; - in[4] = new byte[] { (byte) -112, (byte) 54}; - in[5] = new String(new char[] { 0x0A }); - Object[] inCopy = new Object[6]; - System.arraycopy(in,0,inCopy,0,in.length); - - // Modifies the input array, so we use the copy to confirm - data.setObjectData(in); - - assertTrue(Arrays.deepEquals(inCopy, data.getObjectData())); - } - - @Test - public void testStringFullRangeOfCharacters() { - Schema schema = new Schema("test"); - schema.addColumn(new Text("1")); - data.setSchema(schema); - - char[] allCharArr = new char[256]; - for(int i = 0; i < allCharArr.length; ++i) { - allCharArr[i] = (char)i; - } - String strData = new String(allCharArr); - - Object[] in = {strData}; - Object[] inCopy = new Object[1]; - System.arraycopy(in,0,inCopy,0,in.length); - - // Modifies the input array, so we use the copy to confirm - data.setObjectData(in); - - assertEquals(strData, data.getObjectData()[0]); - assertTrue(Arrays.deepEquals(inCopy, data.getObjectData())); - } - - @Test - public void testByteArrayFullRangeOfCharacters() { - Schema schema = new Schema("test"); - schema.addColumn(new Binary("1")); - data.setSchema(schema); - - byte[] allCharByteArr = new byte[256]; - for(int i = 0; i < allCharByteArr.length; ++i) { - allCharByteArr[i] = (byte)i; - } - - Object[] in = {allCharByteArr}; - Object[] inCopy = new Object[1]; - System.arraycopy(in,0,inCopy,0,in.length); - - // Modifies the input array, so we use the copy to confirm - data.setObjectData(in); - assertTrue(Arrays.deepEquals(inCopy, data.getObjectData())); - } -} http://git-wip-us.apache.org/repos/asf/sqoop/blob/3d539dd4/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestCSVIntermediateDataFormat.java ---------------------------------------------------------------------- diff --git a/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestCSVIntermediateDataFormat.java b/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestCSVIntermediateDataFormat.java new file mode 100644 index 0000000..8c83a71 --- /dev/null +++ b/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestCSVIntermediateDataFormat.java @@ -0,0 +1,222 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.sqoop.connector.idf; + +import org.apache.sqoop.common.SqoopException; +import org.apache.sqoop.schema.Schema; +import org.apache.sqoop.schema.type.Binary; +import org.apache.sqoop.schema.type.FixedPoint; +import org.apache.sqoop.schema.type.Text; +import org.junit.Before; +import org.junit.Test; + +import java.io.UnsupportedEncodingException; +import java.util.Arrays; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +public class TestCSVIntermediateDataFormat { + + private final String BYTE_FIELD_ENCODING = "ISO-8859-1"; + + private IntermediateDataFormat<?> data; + + @Before + public void setUp() { + data = new CSVIntermediateDataFormat(); + } + + private String getByteFieldString(byte[] byteFieldData) { + try { + return new StringBuilder("'").append(new String(byteFieldData, BYTE_FIELD_ENCODING)).append("'").toString(); + } catch(UnsupportedEncodingException e) { + // Should never get to this point because ISO-8859-1 is a standard codec. + return null; + } + } + + @Test + public void testStringInStringOut() { + String testData = "10,34,'54','random data'," + getByteFieldString(new byte[] { (byte) -112, (byte) 54}) + + ",'" + String.valueOf(0x0A) + "'"; + data.setTextData(testData); + assertEquals(testData, data.getTextData()); + } + + @Test + public void testNullStringInObjectOut() { + Schema schema = new Schema("test"); + schema.addColumn(new FixedPoint("1")) + .addColumn(new FixedPoint("2")) + .addColumn(new Text("3")) + .addColumn(new Text("4")) + .addColumn(new Binary("5")) + .addColumn(new Text("6")); + data.setSchema(schema); + data.setTextData(null); + + Object[] out = data.getObjectData(); + + assertNull(out); + } + + @Test(expected=SqoopException.class) + public void testEmptyStringInObjectOut() { + Schema schema = new Schema("test"); + schema.addColumn(new FixedPoint("1")) + .addColumn(new FixedPoint("2")) + .addColumn(new Text("3")) + .addColumn(new Text("4")) + .addColumn(new Binary("5")) + .addColumn(new Text("6")); + data.setSchema(schema); + data.setTextData(""); + + data.getObjectData(); + } + + @Test + public void testStringInObjectOut() { + + //byte[0] = -112, byte[1] = 54 - 2's complements + String testData = "10,34,'54','random data'," + getByteFieldString(new byte[] { (byte) -112, (byte) 54}) + + ",'\\n'"; + Schema schema = new Schema("test"); + schema.addColumn(new FixedPoint("1")) + .addColumn(new FixedPoint("2")) + .addColumn(new Text("3")) + .addColumn(new Text("4")) + .addColumn(new Binary("5")) + .addColumn(new Text("6")); + data.setSchema(schema); + data.setTextData(testData); + + Object[] out = data.getObjectData(); + + assertEquals(new Long(10),out[0]); + assertEquals(new Long(34),out[1]); + assertEquals("54",out[2]); + assertEquals("random data",out[3]); + assertEquals(-112, ((byte[])out[4])[0]); + assertEquals(54, ((byte[])out[4])[1]); + assertEquals("\n", out[5].toString()); + } + + @Test + public void testObjectInStringOut() { + Schema schema = new Schema("test"); + schema.addColumn(new FixedPoint("1")) + .addColumn(new FixedPoint("2")) + .addColumn(new Text("3")) + .addColumn(new Text("4")) + .addColumn(new Binary("5")) + .addColumn(new Text("6")); + data.setSchema(schema); + + byte[] byteFieldData = new byte[] { (byte) 0x0D, (byte) -112, (byte) 54}; + Object[] in = new Object[6]; + in[0] = new Long(10); + in[1] = new Long(34); + in[2] = "54"; + in[3] = "random data"; + in[4] = byteFieldData; + in[5] = new String(new char[] { 0x0A }); + + data.setObjectData(in); + + //byte[0] = \r byte[1] = -112, byte[1] = 54 - 2's complements + String testData = "10,34,'54','random data'," + + getByteFieldString(byteFieldData).replaceAll("\r", "\\\\r") + ",'\\n'"; + assertEquals(testData, data.getTextData()); + } + + @Test + public void testObjectInObjectOut() { + //Test escapable sequences too. + //byte[0] = -112, byte[1] = 54 - 2's complements + Schema schema = new Schema("test"); + schema.addColumn(new FixedPoint("1")) + .addColumn(new FixedPoint("2")) + .addColumn(new Text("3")) + .addColumn(new Text("4")) + .addColumn(new Binary("5")) + .addColumn(new Text("6")); + data.setSchema(schema); + + Object[] in = new Object[6]; + in[0] = new Long(10); + in[1] = new Long(34); + in[2] = "54"; + in[3] = "random data"; + in[4] = new byte[] { (byte) -112, (byte) 54}; + in[5] = new String(new char[] { 0x0A }); + Object[] inCopy = new Object[6]; + System.arraycopy(in,0,inCopy,0,in.length); + + // Modifies the input array, so we use the copy to confirm + data.setObjectData(in); + + assertTrue(Arrays.deepEquals(inCopy, data.getObjectData())); + } + + @Test + public void testStringFullRangeOfCharacters() { + Schema schema = new Schema("test"); + schema.addColumn(new Text("1")); + data.setSchema(schema); + + char[] allCharArr = new char[256]; + for(int i = 0; i < allCharArr.length; ++i) { + allCharArr[i] = (char)i; + } + String strData = new String(allCharArr); + + Object[] in = {strData}; + Object[] inCopy = new Object[1]; + System.arraycopy(in,0,inCopy,0,in.length); + + // Modifies the input array, so we use the copy to confirm + data.setObjectData(in); + + assertEquals(strData, data.getObjectData()[0]); + assertTrue(Arrays.deepEquals(inCopy, data.getObjectData())); + } + + @Test + public void testByteArrayFullRangeOfCharacters() { + Schema schema = new Schema("test"); + schema.addColumn(new Binary("1")); + data.setSchema(schema); + + byte[] allCharByteArr = new byte[256]; + for(int i = 0; i < allCharByteArr.length; ++i) { + allCharByteArr[i] = (byte)i; + } + + Object[] in = {allCharByteArr}; + Object[] inCopy = new Object[1]; + System.arraycopy(in,0,inCopy,0,in.length); + + // Modifies the input array, so we use the copy to confirm + data.setObjectData(in); + assertTrue(Arrays.deepEquals(inCopy, data.getObjectData())); + } +} http://git-wip-us.apache.org/repos/asf/sqoop/blob/3d539dd4/core/src/main/java/org/apache/sqoop/connector/ConnectorManager.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/sqoop/connector/ConnectorManager.java b/core/src/main/java/org/apache/sqoop/connector/ConnectorManager.java index db6f579..c87df84 100644 --- a/core/src/main/java/org/apache/sqoop/connector/ConnectorManager.java +++ b/core/src/main/java/org/apache/sqoop/connector/ConnectorManager.java @@ -114,10 +114,9 @@ public class ConnectorManager implements Reconfigurable { return bundles; } - public ResourceBundle getResourceBundle(long connectorId, - Locale locale) { + public ResourceBundle getResourceBundle(long connectorId, Locale locale) { ConnectorHandler handler = handlerMap.get(nameMap.get(connectorId)); - return handler.getConnector().getBundle(locale); + return handler.getConnector().getBundle(locale); } public MConnector getConnectorMetadata(long connectorId) { http://git-wip-us.apache.org/repos/asf/sqoop/blob/3d539dd4/core/src/main/java/org/apache/sqoop/framework/ExecutionEngine.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/sqoop/framework/ExecutionEngine.java b/core/src/main/java/org/apache/sqoop/framework/ExecutionEngine.java index 96ec148..75b570d 100644 --- a/core/src/main/java/org/apache/sqoop/framework/ExecutionEngine.java +++ b/core/src/main/java/org/apache/sqoop/framework/ExecutionEngine.java @@ -18,12 +18,11 @@ package org.apache.sqoop.framework; import org.apache.sqoop.common.ImmutableContext; -import org.apache.sqoop.connector.spi.SqoopConnector; -import org.apache.sqoop.model.MSubmission; /** - * Execution engine drive execution of sqoop submission (job). It's responsible + * Execution engine drives execution of sqoop job. It's responsible * for executing all defined steps in the import/export workflow. + * A successful job execution will be recorded in the job submission entity */ public abstract class ExecutionEngine { @@ -31,6 +30,7 @@ public abstract class ExecutionEngine { * Initialize execution engine * * @param context Configuration context + * @parma prefix Execution engine prefix */ public void initialize(ImmutableContext context, String prefix) { } @@ -42,19 +42,19 @@ public abstract class ExecutionEngine { } /** - * Return new SubmissionRequest class or any subclass if it's needed by + * Return new JobRequest class or any subclass if it's needed by * execution and submission engine combination. * - * @return New Submission request object + * @return new JobRequestobject */ - public SubmissionRequest createSubmissionRequest() { - return new SubmissionRequest(); + public JobRequest createJobRequest() { + return new JobRequest(); } /** - * Prepare given submission request. + * Prepare given job request. * - * @param request Submission request + * @param request JobRequest */ - public abstract void prepareSubmission(SubmissionRequest request); + public abstract void prepareJob(JobRequest request); } http://git-wip-us.apache.org/repos/asf/sqoop/blob/3d539dd4/core/src/main/java/org/apache/sqoop/framework/JobManager.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/sqoop/framework/JobManager.java b/core/src/main/java/org/apache/sqoop/framework/JobManager.java index b1b37f6..8149d1c 100644 --- a/core/src/main/java/org/apache/sqoop/framework/JobManager.java +++ b/core/src/main/java/org/apache/sqoop/framework/JobManager.java @@ -1,3 +1,5 @@ + + /** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file @@ -266,264 +268,228 @@ public class JobManager implements Reconfigurable { } public MSubmission submit(long jobId, HttpEventContext ctx) { - String username = ctx.getUsername(); - - Repository repository = RepositoryManager.getInstance().getRepository(); - - MJob job = repository.findJob(jobId); - if (job == null) { - throw new SqoopException(FrameworkError.FRAMEWORK_0004, - "Unknown job id " + jobId); - } - if (!job.getEnabled()) { - throw new SqoopException(FrameworkError.FRAMEWORK_0009, - "Job id: " + job.getPersistenceId()); + MSubmission mSubmission = createJobSubmission(ctx, jobId); + JobRequest jobRequest = createJobRequest(jobId, mSubmission); + // Bootstrap job to execute + prepareJob(jobRequest); + // Make sure that this job id is not currently running and submit the job + // only if it's not. + synchronized (getClass()) { + MSubmission lastSubmission = RepositoryManager.getInstance().getRepository() + .findSubmissionLastForJob(jobId); + if (lastSubmission != null && lastSubmission.getStatus().isRunning()) { + throw new SqoopException(FrameworkError.FRAMEWORK_0002, "Job with id " + jobId); + } + // TODO(Abe): Call multiple destroyers. + // TODO(jarcec): We might need to catch all exceptions here to ensure + // that Destroyer will be executed in all cases. + // NOTE: the following is a blocking call + boolean success = submissionEngine.submit(jobRequest); + if (!success) { + destroySubmission(jobRequest); + mSubmission.setStatus(SubmissionStatus.FAILURE_ON_SUBMIT); + } + RepositoryManager.getInstance().getRepository().createSubmission(mSubmission); } + return mSubmission; + } - MConnection fromConnection = repository.findConnection(job.getConnectionId(Direction.FROM)); - MConnection toConnection = repository.findConnection(job.getConnectionId(Direction.TO)); + private JobRequest createJobRequest(long jobId, MSubmission submission) { + // get job + MJob job = getJob(jobId); - if (!fromConnection.getEnabled()) { - throw new SqoopException(FrameworkError.FRAMEWORK_0010, - "Connection id: " + fromConnection.getPersistenceId()); - } + // get from/to connections for the job + MConnection fromConnection = getConnection(job.getConnectionId(Direction.FROM)); + MConnection toConnection = getConnection(job.getConnectionId(Direction.TO)); - if (!toConnection.getEnabled()) { - throw new SqoopException(FrameworkError.FRAMEWORK_0010, - "Connection id: " + toConnection.getPersistenceId()); - } + // get from/to connectors for the connection + SqoopConnector fromConnector = getConnector(fromConnection.getConnectorId()); + validateSupportedDirection(fromConnector, Direction.FROM); + SqoopConnector toConnector = getConnector(toConnection.getConnectorId()); + validateSupportedDirection(toConnector, Direction.TO); - SqoopConnector fromConnector = - ConnectorManager.getInstance().getConnector(job.getConnectorId(Direction.FROM)); - SqoopConnector toConnector = - ConnectorManager.getInstance().getConnector(job.getConnectorId(Direction.TO)); + // Transform config to fromConnector specific classes + Object fromConnectionConfig = ClassUtils.instantiate(fromConnector + .getConnectionConfigurationClass()); + FormUtils.fromForms(fromConnection.getConnectorPart().getForms(), fromConnectionConfig); - // Make sure that connectors support the directions they will be used from. - if (!fromConnector.getSupportedDirections().contains(Direction.FROM)) { - throw new SqoopException(FrameworkError.FRAMEWORK_0011, - "Connector: " + fromConnector.getClass().getCanonicalName()); - } + // Transform config to toConnector specific classes + Object toConnectorConfig = ClassUtils + .instantiate(toConnector.getConnectionConfigurationClass()); + FormUtils.fromForms(toConnection.getConnectorPart().getForms(), toConnectorConfig); - if (!toConnector.getSupportedDirections().contains(Direction.TO)) { - throw new SqoopException(FrameworkError.FRAMEWORK_0011, - "Connector: " + toConnector.getClass().getCanonicalName()); - } + Object fromJob = ClassUtils.instantiate(fromConnector.getJobConfigurationClass(Direction.FROM)); + FormUtils.fromForms(job.getConnectorPart(Direction.FROM).getForms(), fromJob); - // Transform forms to fromConnector specific classes - Object fromConnectorConnection = ClassUtils.instantiate( - fromConnector.getConnectionConfigurationClass()); - FormUtils.fromForms(fromConnection.getConnectorPart().getForms(), - fromConnectorConnection); - - Object fromJob = ClassUtils.instantiate( - fromConnector.getJobConfigurationClass(Direction.FROM)); - FormUtils.fromForms( - job.getConnectorPart(Direction.FROM).getForms(), fromJob); - - // Transform forms to toConnector specific classes - Object toConnectorConnection = ClassUtils.instantiate( - toConnector.getConnectionConfigurationClass()); - FormUtils.fromForms(toConnection.getConnectorPart().getForms(), - toConnectorConnection); - - Object toJob = ClassUtils.instantiate( - toConnector.getJobConfigurationClass(Direction.TO)); + Object toJob = ClassUtils.instantiate(toConnector.getJobConfigurationClass(Direction.TO)); FormUtils.fromForms(job.getConnectorPart(Direction.TO).getForms(), toJob); - // Transform framework specific forms - Object fromFrameworkConnection = ClassUtils.instantiate( - FrameworkManager.getInstance().getConnectionConfigurationClass()); - Object toFrameworkConnection = ClassUtils.instantiate( - FrameworkManager.getInstance().getConnectionConfigurationClass()); - FormUtils.fromForms(fromConnection.getFrameworkPart().getForms(), - fromFrameworkConnection); - FormUtils.fromForms(toConnection.getFrameworkPart().getForms(), - toFrameworkConnection); - - Object frameworkJob = ClassUtils.instantiate( - FrameworkManager.getInstance().getJobConfigurationClass()); + // Transform framework specific configs + // Q(VB) : Aren't the following 2 exactly the same? + Object fromFrameworkConnection = ClassUtils.instantiate(FrameworkManager.getInstance() + .getConnectionConfigurationClass()); + FormUtils.fromForms(fromConnection.getFrameworkPart().getForms(), fromFrameworkConnection); + + Object toFrameworkConnection = ClassUtils.instantiate(FrameworkManager.getInstance() + .getConnectionConfigurationClass()); + FormUtils.fromForms(toConnection.getFrameworkPart().getForms(), toFrameworkConnection); + + Object frameworkJob = ClassUtils.instantiate(FrameworkManager.getInstance() + .getJobConfigurationClass()); FormUtils.fromForms(job.getFrameworkPart().getForms(), frameworkJob); - // Create request object - MSubmission summary = new MSubmission(jobId); - SubmissionRequest request = executionEngine.createSubmissionRequest(); - - summary.setCreationUser(username); - summary.setLastUpdateUser(username); - - // Save important variables to the submission request - request.setSummary(summary); - request.setConnector(Direction.FROM, fromConnector); - request.setConnector(Direction.TO, toConnector); - request.setConnectorConnectionConfig(Direction.FROM, fromConnectorConnection); - request.setConnectorConnectionConfig(Direction.TO, toConnectorConnection); - request.setConnectorJobConfig(Direction.FROM, fromJob); - request.setConnectorJobConfig(Direction.TO, toJob); - // @TODO(Abe): Should we actually have 2 different Framework Connection config objects? - request.setFrameworkConnectionConfig(Direction.FROM, fromFrameworkConnection); - request.setFrameworkConnectionConfig(Direction.TO, toFrameworkConnection); - request.setConfigFrameworkJob(frameworkJob); - request.setJobName(job.getName()); - request.setJobId(job.getPersistenceId()); - request.setNotificationUrl(notificationBaseUrl + jobId); + // Create a job request for submit/execution + JobRequest jobRequest = executionEngine.createJobRequest(); + // Save important variables to the job request + jobRequest.setSummary(submission); + jobRequest.setConnector(Direction.FROM, fromConnector); + jobRequest.setConnector(Direction.TO, toConnector); + jobRequest.setConnectorConnectionConfig(Direction.FROM, fromConnectionConfig); + jobRequest.setConnectorConnectionConfig(Direction.TO, toConnectorConfig); + jobRequest.setConnectorJobConfig(Direction.FROM, fromJob); + jobRequest.setConnectorJobConfig(Direction.TO, toJob); + // TODO(Abe): Should we actually have 2 different Framework Connection config objects? + jobRequest.setFrameworkConnectionConfig(Direction.FROM, fromFrameworkConnection); + jobRequest.setFrameworkConnectionConfig(Direction.TO, toFrameworkConnection); + jobRequest.setConfigFrameworkJob(frameworkJob); + jobRequest.setJobName(job.getName()); + jobRequest.setJobId(job.getPersistenceId()); + jobRequest.setNotificationUrl(notificationBaseUrl + jobId); Class<? extends IntermediateDataFormat<?>> dataFormatClass = fromConnector.getIntermediateDataFormat(); - request.setIntermediateDataFormat(fromConnector.getIntermediateDataFormat()); - // Create request object + jobRequest.setIntermediateDataFormat(fromConnector.getIntermediateDataFormat()); + + + jobRequest.setFrom(fromConnector.getFrom()); + jobRequest.setTo(toConnector.getTo()); + + addStandardJars(jobRequest); + addConnectorJars(jobRequest, fromConnector, toConnector, dataFormatClass); + addConnectorInitializerJars(jobRequest, Direction.FROM); + addConnectorInitializerJars(jobRequest, Direction.TO); + + Schema fromSchema = getSchemaFromConnector(jobRequest, Direction.FROM); + Schema toSchema = getSchemaFromConnector(jobRequest, Direction.TO); + + // TODO(Gwen): Need better logic here once the Schema refactor: SQOOP-1378 + if (fromSchema != null) { + jobRequest.getSummary().setFromSchema(fromSchema); + } + else { + jobRequest.getSummary().setFromSchema(toSchema); + } + LOG.debug("Using entities: " + jobRequest.getFrom() + ", " + jobRequest.getTo()); + return jobRequest; + } + + private void addConnectorJars(JobRequest jobRequest, SqoopConnector fromConnector, + SqoopConnector toConnector, Class<? extends IntermediateDataFormat<?>> dataFormatClass) { + jobRequest.addJarForClass(fromConnector.getClass()); + jobRequest.addJarForClass(toConnector.getClass()); + jobRequest.addJarForClass(dataFormatClass); + } + private void addStandardJars(JobRequest jobRequest) { // Let's register all important jars // sqoop-common - request.addJarForClass(MapContext.class); + jobRequest.addJarForClass(MapContext.class); // sqoop-core - request.addJarForClass(FrameworkManager.class); + jobRequest.addJarForClass(FrameworkManager.class); // sqoop-spi - request.addJarForClass(SqoopConnector.class); + jobRequest.addJarForClass(SqoopConnector.class); // Execution engine jar - request.addJarForClass(executionEngine.getClass()); - // Connectors in use - request.addJarForClass(fromConnector.getClass()); - request.addJarForClass(toConnector.getClass()); - + jobRequest.addJarForClass(executionEngine.getClass()); // Extra libraries that Sqoop code requires - request.addJarForClass(JSONValue.class); - - // The IDF is used in the ETL process. - request.addJarForClass(dataFormatClass); - - - // Get callbacks - request.setFromCallback(fromConnector.getFrom()); - request.setToCallback(toConnector.getTo()); - LOG.debug("Using callbacks: " + request.getFromCallback() + ", " + request.getToCallback()); - - // Initialize submission from fromConnector perspective - CallbackBase[] baseCallbacks = { - request.getFromCallback(), - request.getToCallback() - }; + jobRequest.addJarForClass(JSONValue.class); + } - CallbackBase baseCallback; - Class<? extends Initializer> initializerClass; - Initializer initializer; - InitializerContext initializerContext; + MSubmission createJobSubmission(HttpEventContext ctx, long jobId) { + MSubmission summary = new MSubmission(jobId); + summary.setCreationUser(ctx.getUsername()); + summary.setLastUpdateUser(ctx.getUsername()); + return summary; + } - // Initialize From Connector callback. - baseCallback = request.getFromCallback(); + SqoopConnector getConnector(long connnectorId) { + return ConnectorManager.getInstance().getConnector(connnectorId); + } - initializerClass = baseCallback - .getInitializer(); - initializer = (Initializer) ClassUtils - .instantiate(initializerClass); + void validateSupportedDirection(SqoopConnector connector, Direction direction) { + // Make sure that connector supports the given direction + if (!connector.getSupportedDirections().contains(direction)) { + throw new SqoopException(FrameworkError.FRAMEWORK_0011, "Connector: " + + connector.getClass().getCanonicalName()); + } + } - if (initializer == null) { - throw new SqoopException(FrameworkError.FRAMEWORK_0006, - "Can't create initializer instance: " + initializerClass.getName()); + MConnection getConnection(long connectionId) { + MConnection connection = RepositoryManager.getInstance().getRepository() + .findConnection(connectionId); + if (!connection.getEnabled()) { + throw new SqoopException(FrameworkError.FRAMEWORK_0010, "Connection id: " + + connection.getPersistenceId()); } + return connection; + } - // Initializer context - initializerContext = new InitializerContext(request.getConnectorContext(Direction.FROM)); + MJob getJob(long jobId) { + MJob job = RepositoryManager.getInstance().getRepository().findJob(jobId); + if (job == null) { + throw new SqoopException(FrameworkError.FRAMEWORK_0004, "Unknown job id: " + jobId); + } - // Initialize submission from fromConnector perspective - initializer.initialize(initializerContext, - request.getConnectorConnectionConfig(Direction.FROM), - request.getConnectorJobConfig(Direction.FROM)); + if (!job.getEnabled()) { + throw new SqoopException(FrameworkError.FRAMEWORK_0009, "Job id: " + job.getPersistenceId()); + } + return job; + } + + private Schema getSchemaFromConnector(JobRequest jobRequest, Direction direction) { - // Add job specific jars to - request.addJars(initializer.getJars(initializerContext, - request.getConnectorConnectionConfig(Direction.FROM), - request.getConnectorJobConfig(Direction.FROM))); + Initializer initializer = getConnectorInitializer(jobRequest, direction); - // @TODO(Abe): Alter behavior of Schema here. Need from Schema. + // Initializer context + InitializerContext initializerContext = getInitializerContext(jobRequest, direction); + // Initialize submission from the connector perspective + initializer.initialize(initializerContext, jobRequest.getConnectorConnectionConfig(direction), + jobRequest.getConnectorJobConfig(direction)); - Schema fromSchema = initializer.getSchema(initializerContext, - request.getConnectorConnectionConfig(Direction.FROM), - request.getConnectorJobConfig(Direction.FROM)); + // TODO(Abe): Alter behavior of Schema here. + return initializer.getSchema(initializerContext, + jobRequest.getConnectorConnectionConfig(direction), + jobRequest.getConnectorJobConfig(direction)); + } - // request.getSummary().setConnectorSchema(initializer.getSchema( - // initializerContext, - // request.getConnectorConnectionConfig(ConnectorType.FROM), - // request.getConnectorJobConfig(ConnectorType.FROM) - // )); + private void addConnectorInitializerJars(JobRequest jobRequest, Direction direction) { - // Initialize To Connector callback. - baseCallback = request.getToCallback(); + Initializer initializer = getConnectorInitializer(jobRequest, direction); + InitializerContext initializerContext = getInitializerContext(jobRequest, direction); + // Add job specific jars to + jobRequest.addJars(initializer.getJars(initializerContext, + jobRequest.getConnectorConnectionConfig(direction), + jobRequest.getConnectorJobConfig(direction))); + } - initializerClass = baseCallback - .getInitializer(); - initializer = (Initializer) ClassUtils - .instantiate(initializerClass); + private Initializer getConnectorInitializer(JobRequest jobRequest, Direction direction) { + Transferable transferable = direction.equals(Direction.FROM) ? jobRequest.getFrom() : jobRequest.getTo(); + Class<? extends Initializer> initializerClass = transferable.getInitializer(); + Initializer initializer = (Initializer) ClassUtils.instantiate(initializerClass); if (initializer == null) { throw new SqoopException(FrameworkError.FRAMEWORK_0006, - "Can't create initializer instance: " + initializerClass.getName()); - } - - // Initializer context - initializerContext = new InitializerContext(request.getConnectorContext(Direction.TO)); - - // Initialize submission from fromConnector perspective - initializer.initialize(initializerContext, - request.getConnectorConnectionConfig(Direction.TO), - request.getConnectorJobConfig(Direction.TO)); - - // Add job specific jars to - request.addJars(initializer.getJars(initializerContext, - request.getConnectorConnectionConfig(Direction.TO), - request.getConnectorJobConfig(Direction.TO))); - - // @TODO(Abe): Alter behavior of Schema here. Need To Schema. - - Schema toSchema = initializer.getSchema(initializerContext, - request.getConnectorConnectionConfig(Direction.TO), - request.getConnectorJobConfig(Direction.TO)); - - // Retrieve and persist the schema -// request.getSummary().setConnectorSchema(initializer.getSchema( -// initializerContext, -// request.getConnectorConnectionConfig(ConnectorType.TO), -// request.getConnectorJobConfig(ConnectorType.TO) -// )); - - //TODO: Need better logic here - if (fromSchema != null) - request.getSummary().setConnectorSchema(fromSchema); - else - request.getSummary().setConnectorSchema(toSchema); - - // Bootstrap job from framework perspective - prepareSubmission(request); - - // Make sure that this job id is not currently running and submit the job - // only if it's not. - synchronized (getClass()) { - MSubmission lastSubmission = repository.findSubmissionLastForJob(jobId); - if (lastSubmission != null && lastSubmission.getStatus().isRunning()) { - throw new SqoopException(FrameworkError.FRAMEWORK_0002, - "Job with id " + jobId); - } - - // @TODO(Abe): Call multiple destroyers. - // TODO(jarcec): We might need to catch all exceptions here to ensure - // that Destroyer will be executed in all cases. - boolean submitted = submissionEngine.submit(request); - if (!submitted) { - destroySubmission(request); - summary.setStatus(SubmissionStatus.FAILURE_ON_SUBMIT); - } - - repository.createSubmission(summary); + "Can't create connector initializer instance: " + initializerClass.getName()); } - - // Return job status most recent - return summary; + return initializer; } - private void prepareSubmission(SubmissionRequest request) { - JobConfiguration jobConfiguration = (JobConfiguration) request - .getConfigFrameworkJob(); + private InitializerContext getInitializerContext(JobRequest jobRequest, Direction direction) { + return new InitializerContext(jobRequest.getConnectorContext(direction)); + } + void prepareJob(JobRequest request) { + JobConfiguration jobConfiguration = (JobConfiguration) request.getConfigFrameworkJob(); // We're directly moving configured number of extractors and loaders to // underlying request object. In the future we might need to throttle this // count based on other running jobs to meet our SLAs. @@ -531,19 +497,19 @@ public class JobManager implements Reconfigurable { request.setLoaders(jobConfiguration.throttling.loaders); // Delegate rest of the job to execution engine - executionEngine.prepareSubmission(request); + executionEngine.prepareJob(request); } /** * Callback that will be called only if we failed to submit the job to the * remote cluster. */ - private void destroySubmission(SubmissionRequest request) { - CallbackBase fromCallback = request.getFromCallback(); - CallbackBase toCallback = request.getToCallback(); + void destroySubmission(JobRequest request) { + Transferable from = request.getFrom(); + Transferable to = request.getTo(); - Class<? extends Destroyer> fromDestroyerClass = fromCallback.getDestroyer(); - Class<? extends Destroyer> toDestroyerClass = toCallback.getDestroyer(); + Class<? extends Destroyer> fromDestroyerClass = from.getDestroyer(); + Class<? extends Destroyer> toDestroyerClass = to.getDestroyer(); Destroyer fromDestroyer = (Destroyer) ClassUtils.instantiate(fromDestroyerClass); Destroyer toDestroyer = (Destroyer) ClassUtils.instantiate(toDestroyerClass); @@ -557,15 +523,15 @@ public class JobManager implements Reconfigurable { "Can't create toDestroyer instance: " + toDestroyerClass.getName()); } - // @TODO(Abe): Update context to manage multiple connectors. As well as summary. + // TODO(Abe): Update context to manage multiple connectors. As well as summary. DestroyerContext fromDestroyerContext = new DestroyerContext( request.getConnectorContext(Direction.FROM), false, request.getSummary() - .getConnectorSchema()); + .getFromSchema()); DestroyerContext toDestroyerContext = new DestroyerContext( request.getConnectorContext(Direction.TO), false, request.getSummary() - .getConnectorSchema()); + .getToSchema()); - // Initialize submission from connector perspective + // destroy submission from connector perspective fromDestroyer.destroy(fromDestroyerContext, request.getConnectorConnectionConfig(Direction.FROM), request.getConnectorJobConfig(Direction.FROM)); toDestroyer.destroy(toDestroyerContext, request.getConnectorConnectionConfig(Direction.TO), @@ -573,42 +539,39 @@ public class JobManager implements Reconfigurable { } public MSubmission stop(long jobId, HttpEventContext ctx) { - String username = ctx.getUsername(); Repository repository = RepositoryManager.getInstance().getRepository(); - MSubmission submission = repository.findSubmissionLastForJob(jobId); + MSubmission mSubmission = repository.findSubmissionLastForJob(jobId); - if (submission == null || !submission.getStatus().isRunning()) { - throw new SqoopException(FrameworkError.FRAMEWORK_0003, - "Job with id " + jobId + " is not running"); + if (mSubmission == null || !mSubmission.getStatus().isRunning()) { + throw new SqoopException(FrameworkError.FRAMEWORK_0003, "Job with id " + jobId + + " is not running"); } + submissionEngine.stop(mSubmission.getExternalId()); - String externalId = submission.getExternalId(); - submissionEngine.stop(externalId); - - submission.setLastUpdateUser(username); + mSubmission.setLastUpdateUser(ctx.getUsername()); // Fetch new information to verify that the stop command has actually worked - update(submission); + update(mSubmission); // Return updated structure - return submission; + return mSubmission; } public MSubmission status(long jobId) { Repository repository = RepositoryManager.getInstance().getRepository(); - MSubmission submission = repository.findSubmissionLastForJob(jobId); + MSubmission mSubmission = repository.findSubmissionLastForJob(jobId); - if (submission == null) { + if (mSubmission == null) { return new MSubmission(jobId, new Date(), SubmissionStatus.NEVER_EXECUTED); } // If the submission is in running state, let's update it - if (submission.getStatus().isRunning()) { - update(submission); + if (mSubmission.getStatus().isRunning()) { + update(mSubmission); } - return submission; + return mSubmission; } private void update(MSubmission submission) { @@ -744,4 +707,4 @@ public class JobManager implements Reconfigurable { LOG.info("Ending submission manager update thread"); } } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/sqoop/blob/3d539dd4/core/src/main/java/org/apache/sqoop/framework/JobRequest.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/sqoop/framework/JobRequest.java b/core/src/main/java/org/apache/sqoop/framework/JobRequest.java new file mode 100644 index 0000000..1f77693 --- /dev/null +++ b/core/src/main/java/org/apache/sqoop/framework/JobRequest.java @@ -0,0 +1,356 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.framework; + +import org.apache.sqoop.common.Direction; +import org.apache.sqoop.common.DirectionError; +import org.apache.sqoop.common.MutableMapContext; +import org.apache.sqoop.common.SqoopException; +import org.apache.sqoop.connector.idf.IntermediateDataFormat; +import org.apache.sqoop.connector.spi.SqoopConnector; +import org.apache.sqoop.job.etl.Transferable; +import org.apache.sqoop.model.MSubmission; +import org.apache.sqoop.utils.ClassUtils; + +import java.util.LinkedList; +import java.util.List; + +/** + * Submission details class is used when creating new submission and contains + * all information that we need to create a new submission (including mappers, + * reducers, ...). + */ +public class JobRequest { + + /** + * Submission summary + */ + MSubmission summary; + + /** + * Original job name + */ + String jobName; + + /** + * Associated job (from metadata perspective) id + */ + long jobId; + + /** + * Connector instances associated with this submission request + */ + SqoopConnector fromConnector; + SqoopConnector toConnector; + + /** + * List of required local jars for the job + */ + List<String> jars; + + /** + * From entity + */ + Transferable from; + + /** + * To entity + */ + Transferable to; + + /** + * All configuration objects + */ + Object fromConnectorConnectionConfig; + Object toConnectorConnectionConfig; + Object fromConnectorJobConfig; + Object toConnectorJobConfig; + Object fromFrameworkConnectionConfig; + Object toFrameworkConnectionConfig; + Object configFrameworkJob; + + /** + * Connector context (submission specific configuration) + */ + MutableMapContext fromConnectorContext; + MutableMapContext toConnectorContext; + + /** + * Framework context (submission specific configuration) + */ + MutableMapContext frameworkContext; + + /** + * Optional notification URL for job progress + */ + String notificationUrl; + + /** + * Number of extractors + */ + Integer extractors; + + /** + * Number of loaders + */ + Integer loaders; + + /** + * The intermediate data format this submission should use. + */ + Class<? extends IntermediateDataFormat> intermediateDataFormat; + + public JobRequest() { + this.jars = new LinkedList<String>(); + this.fromConnectorContext = new MutableMapContext(); + this.toConnectorContext = new MutableMapContext(); + this.frameworkContext = new MutableMapContext(); + this.fromConnector = null; + this.toConnector = null; + this.fromConnectorConnectionConfig = null; + this.toConnectorConnectionConfig = null; + this.fromConnectorJobConfig = null; + this.toConnectorJobConfig = null; + this.fromFrameworkConnectionConfig = null; + this.toFrameworkConnectionConfig = null; + } + + public MSubmission getSummary() { + return summary; + } + + public void setSummary(MSubmission summary) { + this.summary = summary; + } + + public String getJobName() { + return jobName; + } + + public void setJobName(String jobName) { + this.jobName = jobName; + } + + public long getJobId() { + return jobId; + } + + public void setJobId(long jobId) { + this.jobId = jobId; + } + + public SqoopConnector getConnector(Direction type) { + switch(type) { + case FROM: + return fromConnector; + + case TO: + return toConnector; + + default: + throw new SqoopException(DirectionError.DIRECTION_0000, "Direction: " + type); + } + } + + public void setConnector(Direction type, SqoopConnector connector) { + switch(type) { + case FROM: + fromConnector = connector; + break; + + case TO: + toConnector = connector; + break; + + default: + throw new SqoopException(DirectionError.DIRECTION_0000, "Direction: " + type); + } + } + + public List<String> getJars() { + return jars; + } + + public void addJar(String jar) { + if(!jars.contains(jar)) { + jars.add(jar); + } + } + + public void addJarForClass(Class klass) { + addJar(ClassUtils.jarForClass(klass)); + } + + public void addJars(List<String> jars) { + for(String j : jars) { + addJar(j); + } + } + + public Transferable getFrom() { + return from; + } + + public void setFrom(Transferable from) { + this.from = from; + } + + public Transferable getTo() { + return to; + } + + public void setTo(Transferable to) { + this.to = to; + } + + public Object getConnectorConnectionConfig(Direction type) { + switch(type) { + case FROM: + return fromConnectorConnectionConfig; + + case TO: + return toConnectorConnectionConfig; + + default: + throw new SqoopException(DirectionError.DIRECTION_0000, "Direction: " + type); + } + } + + public void setConnectorConnectionConfig(Direction type, Object config) { + switch(type) { + case FROM: + fromConnectorConnectionConfig = config; + break; + case TO: + toConnectorConnectionConfig = config; + break; + default: + throw new SqoopException(DirectionError.DIRECTION_0000, "Direction: " + type); + } + } + + public Object getConnectorJobConfig(Direction type) { + switch(type) { + case FROM: + return fromConnectorJobConfig; + + case TO: + return toConnectorJobConfig; + + default: + throw new SqoopException(DirectionError.DIRECTION_0000, "Direction: " + type); + } + } + + public void setConnectorJobConfig(Direction type, Object config) { + switch(type) { + case FROM: + fromConnectorJobConfig = config; + break; + case TO: + toConnectorJobConfig = config; + break; + default: + throw new SqoopException(DirectionError.DIRECTION_0000, "Direction: " + type); + } + } + + public Object getFrameworkConnectionConfig(Direction type) { + switch(type) { + case FROM: + return fromFrameworkConnectionConfig; + + case TO: + return toFrameworkConnectionConfig; + + default: + throw new SqoopException(DirectionError.DIRECTION_0000, "Direction: " + type); + } + } + + public void setFrameworkConnectionConfig(Direction type, Object config) { + switch(type) { + case FROM: + fromFrameworkConnectionConfig = config; + break; + case TO: + toFrameworkConnectionConfig = config; + break; + default: + throw new SqoopException(DirectionError.DIRECTION_0000, "Direction: " + type); + } + } + + public Object getConfigFrameworkJob() { + return configFrameworkJob; + } + + public void setConfigFrameworkJob(Object config) { + configFrameworkJob = config; + } + + public MutableMapContext getConnectorContext(Direction type) { + switch(type) { + case FROM: + return fromConnectorContext; + + case TO: + return toConnectorContext; + + default: + throw new SqoopException(DirectionError.DIRECTION_0000, "Direction: " + type); + } + } + + public MutableMapContext getFrameworkContext() { + return frameworkContext; + } + + public String getNotificationUrl() { + return notificationUrl; + } + + public void setNotificationUrl(String url) { + this.notificationUrl = url; + } + + public Integer getExtractors() { + return extractors; + } + + public void setExtractors(Integer extractors) { + this.extractors = extractors; + } + + public Integer getLoaders() { + return loaders; + } + + public void setLoaders(Integer loaders) { + this.loaders = loaders; + } + + public Class<? extends IntermediateDataFormat> getIntermediateDataFormat() { + return intermediateDataFormat; + } + + public void setIntermediateDataFormat(Class<? extends IntermediateDataFormat> intermediateDataFormat) { + this.intermediateDataFormat = intermediateDataFormat; + } + +} http://git-wip-us.apache.org/repos/asf/sqoop/blob/3d539dd4/core/src/main/java/org/apache/sqoop/framework/SubmissionEngine.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/sqoop/framework/SubmissionEngine.java b/core/src/main/java/org/apache/sqoop/framework/SubmissionEngine.java index 3c0f6eb..732be3b 100644 --- a/core/src/main/java/org/apache/sqoop/framework/SubmissionEngine.java +++ b/core/src/main/java/org/apache/sqoop/framework/SubmissionEngine.java @@ -22,8 +22,8 @@ import org.apache.sqoop.submission.counter.Counters; import org.apache.sqoop.submission.SubmissionStatus; /** - * Submission engine is capable of executing and getting information about - * submissions to remote (hadoop) cluster. + * Submission engine is responsible in conveying the information about the + * job instances (submissions) to remote (hadoop) cluster. */ public abstract class SubmissionEngine { @@ -31,6 +31,7 @@ public abstract class SubmissionEngine { * Initialize submission engine * * @param context Configuration context + * @param prefix Submission engine prefix */ public void initialize(MapContext context, String prefix) { } @@ -57,7 +58,7 @@ public abstract class SubmissionEngine { * * @return Return true if we were able to submit job to remote cluster. */ - public abstract boolean submit(SubmissionRequest submission); + public abstract boolean submit(JobRequest submission); /** * Hard stop for given submission. http://git-wip-us.apache.org/repos/asf/sqoop/blob/3d539dd4/core/src/main/java/org/apache/sqoop/framework/SubmissionRequest.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/sqoop/framework/SubmissionRequest.java b/core/src/main/java/org/apache/sqoop/framework/SubmissionRequest.java deleted file mode 100644 index bf3f785..0000000 --- a/core/src/main/java/org/apache/sqoop/framework/SubmissionRequest.java +++ /dev/null @@ -1,361 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.sqoop.framework; - -import org.apache.sqoop.common.Direction; -import org.apache.sqoop.common.DirectionError; -import org.apache.sqoop.common.MutableMapContext; -import org.apache.sqoop.common.SqoopException; -import org.apache.sqoop.connector.idf.IntermediateDataFormat; -import org.apache.sqoop.connector.spi.SqoopConnector; -import org.apache.sqoop.job.etl.CallbackBase; -import org.apache.sqoop.model.MSubmission; -import org.apache.sqoop.utils.ClassUtils; - -import java.util.LinkedList; -import java.util.List; - -/** - * Submission details class is used when creating new submission and contains - * all information that we need to create a new submission (including mappers, - * reducers, ...). - */ -public class SubmissionRequest { - - /** - * Submission summary - */ - MSubmission summary; - - /** - * Original job name - */ - String jobName; - - /** - * Associated job (from metadata perspective) id - */ - long jobId; - - /** - * Connector instances associated with this submission request - */ - SqoopConnector fromConnector; - SqoopConnector toConnector; - - /** - * List of required local jars for the job - */ - List<String> jars; - - /** - * From connector callback - */ - CallbackBase fromCallback; - - /** - * To connector callback - */ - CallbackBase toCallback; - - /** - * All configuration objects - */ - Object fromConnectorConnectionConfig; - Object toConnectorConnectionConfig; - Object fromConnectorJobConfig; - Object toConnectorJobConfig; - Object fromFrameworkConnectionConfig; - Object toFrameworkConnectionConfig; - Object configFrameworkJob; - - /** - * Connector context (submission specific configuration) - */ - MutableMapContext fromConnectorContext; - MutableMapContext toConnectorContext; - - /** - * Framework context (submission specific configuration) - */ - MutableMapContext frameworkContext; - - /** - * HDFS output directory - */ - String outputDirectory; - - /** - * Optional notification URL for job progress - */ - String notificationUrl; - - /** - * Number of extractors - */ - Integer extractors; - - /** - * Number of loaders - */ - Integer loaders; - - /** - * The intermediate data format this submission should use. - */ - Class<? extends IntermediateDataFormat> intermediateDataFormat; - - public SubmissionRequest() { - this.jars = new LinkedList<String>(); - this.fromConnectorContext = new MutableMapContext(); - this.toConnectorContext = new MutableMapContext(); - this.frameworkContext = new MutableMapContext(); - this.fromConnector = null; - this.toConnector = null; - this.fromConnectorConnectionConfig = null; - this.toConnectorConnectionConfig = null; - this.fromConnectorJobConfig = null; - this.toConnectorJobConfig = null; - this.fromFrameworkConnectionConfig = null; - this.toFrameworkConnectionConfig = null; - } - - public MSubmission getSummary() { - return summary; - } - - public void setSummary(MSubmission summary) { - this.summary = summary; - } - - public String getJobName() { - return jobName; - } - - public void setJobName(String jobName) { - this.jobName = jobName; - } - - public long getJobId() { - return jobId; - } - - public void setJobId(long jobId) { - this.jobId = jobId; - } - - public SqoopConnector getConnector(Direction type) { - switch(type) { - case FROM: - return fromConnector; - - case TO: - return toConnector; - - default: - throw new SqoopException(DirectionError.DIRECTION_0000, "Direction: " + type); - } - } - - public void setConnector(Direction type, SqoopConnector connector) { - switch(type) { - case FROM: - fromConnector = connector; - break; - - case TO: - toConnector = connector; - break; - - default: - throw new SqoopException(DirectionError.DIRECTION_0000, "Direction: " + type); - } - } - - public List<String> getJars() { - return jars; - } - - public void addJar(String jar) { - if(!jars.contains(jar)) { - jars.add(jar); - } - } - - public void addJarForClass(Class klass) { - addJar(ClassUtils.jarForClass(klass)); - } - - public void addJars(List<String> jars) { - for(String j : jars) { - addJar(j); - } - } - - public CallbackBase getFromCallback() { - return fromCallback; - } - - public void setFromCallback(CallbackBase fromCallback) { - this.fromCallback = fromCallback; - } - - public CallbackBase getToCallback() { - return toCallback; - } - - public void setToCallback(CallbackBase toCallback) { - this.toCallback = toCallback; - } - - public Object getConnectorConnectionConfig(Direction type) { - switch(type) { - case FROM: - return fromConnectorConnectionConfig; - - case TO: - return toConnectorConnectionConfig; - - default: - throw new SqoopException(DirectionError.DIRECTION_0000, "Direction: " + type); - } - } - - public void setConnectorConnectionConfig(Direction type, Object config) { - switch(type) { - case FROM: - fromConnectorConnectionConfig = config; - break; - case TO: - toConnectorConnectionConfig = config; - break; - default: - throw new SqoopException(DirectionError.DIRECTION_0000, "Direction: " + type); - } - } - - public Object getConnectorJobConfig(Direction type) { - switch(type) { - case FROM: - return fromConnectorJobConfig; - - case TO: - return toConnectorJobConfig; - - default: - throw new SqoopException(DirectionError.DIRECTION_0000, "Direction: " + type); - } - } - - public void setConnectorJobConfig(Direction type, Object config) { - switch(type) { - case FROM: - fromConnectorJobConfig = config; - break; - case TO: - toConnectorJobConfig = config; - break; - default: - throw new SqoopException(DirectionError.DIRECTION_0000, "Direction: " + type); - } - } - - public Object getFrameworkConnectionConfig(Direction type) { - switch(type) { - case FROM: - return fromFrameworkConnectionConfig; - - case TO: - return toFrameworkConnectionConfig; - - default: - throw new SqoopException(DirectionError.DIRECTION_0000, "Direction: " + type); - } - } - - public void setFrameworkConnectionConfig(Direction type, Object config) { - switch(type) { - case FROM: - fromFrameworkConnectionConfig = config; - break; - case TO: - toFrameworkConnectionConfig = config; - break; - default: - throw new SqoopException(DirectionError.DIRECTION_0000, "Direction: " + type); - } - } - - public Object getConfigFrameworkJob() { - return configFrameworkJob; - } - - public void setConfigFrameworkJob(Object config) { - configFrameworkJob = config; - } - - public MutableMapContext getConnectorContext(Direction type) { - switch(type) { - case FROM: - return fromConnectorContext; - - case TO: - return toConnectorContext; - - default: - throw new SqoopException(DirectionError.DIRECTION_0000, "Direction: " + type); - } - } - - public MutableMapContext getFrameworkContext() { - return frameworkContext; - } - - public String getNotificationUrl() { - return notificationUrl; - } - - public void setNotificationUrl(String url) { - this.notificationUrl = url; - } - - public Integer getExtractors() { - return extractors; - } - - public void setExtractors(Integer extractors) { - this.extractors = extractors; - } - - public Integer getLoaders() { - return loaders; - } - - public void setLoaders(Integer loaders) { - this.loaders = loaders; - } - - public Class<? extends IntermediateDataFormat> getIntermediateDataFormat() { - return intermediateDataFormat; - } - - public void setIntermediateDataFormat(Class<? extends IntermediateDataFormat> intermediateDataFormat) { - this.intermediateDataFormat = intermediateDataFormat; - } - -} http://git-wip-us.apache.org/repos/asf/sqoop/blob/3d539dd4/core/src/test/java/org/apache/sqoop/framework/TestFrameworkValidator.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/sqoop/framework/TestFrameworkValidator.java b/core/src/test/java/org/apache/sqoop/framework/TestFrameworkValidator.java index 69c1b56..69dd028 100644 --- a/core/src/test/java/org/apache/sqoop/framework/TestFrameworkValidator.java +++ b/core/src/test/java/org/apache/sqoop/framework/TestFrameworkValidator.java @@ -17,64 +17,140 @@ */ package org.apache.sqoop.framework; -import org.apache.sqoop.framework.configuration.ConnectionConfiguration; -import org.apache.sqoop.framework.configuration.JobConfiguration; -import org.apache.sqoop.validation.Status; -import org.apache.sqoop.validation.ValidationResult; -import org.apache.sqoop.validation.ValidationRunner; -import org.junit.Before; -import org.junit.Test; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; /** - * + * NOTE(VB): This test class will soon be removed with the Validator refactoring */ public class TestFrameworkValidator { - FrameworkValidator validator; - - @Before - public void setUp() { - validator = new FrameworkValidator(); - } - - @Test - public void testConnectionValidation() { - ConnectionConfiguration connectionConfiguration = new ConnectionConfiguration(); - ValidationRunner runner = new ValidationRunner(); - ValidationResult result = runner.validate(connectionConfiguration); - assertEquals(Status.FINE, result.getStatus()); - assertEquals(0, result.getMessages().size()); - } - - @Test - public void testJobValidation() { - ValidationRunner runner = new ValidationRunner(); - ValidationResult result; - JobConfiguration configuration; - - // Empty form is allowed - configuration = new JobConfiguration(); - result = runner.validate(configuration); - assertEquals(Status.FINE, result.getStatus()); - - // Explicitly setting extractors and loaders - configuration = new JobConfiguration(); - configuration.throttling.extractors = 3; - configuration.throttling.loaders = 3; - result = runner.validate(configuration); - assertEquals(Status.FINE, result.getStatus()); - assertEquals(0, result.getMessages().size()); - - // Negative and zero values for extractors and loaders -// configuration = new JobConfiguration(); +// FrameworkValidator validator; +// +// @Before +// public void setUp() { +// validator = new FrameworkValidator(); +// } +// +// @Test +// public void testConnectionValidation() { +// ConnectionConfiguration connectionConfiguration = new ConnectionConfiguration(); +// +// Validation validation = validator.validateConnection(connectionConfiguration); +// assertEquals(Status.FINE, validation.getStatus()); +// assertEquals(0, validation.getMessages().size()); +// } +// +// @Test +// public void testExportJobValidation() { +// ExportJobConfiguration configuration; +// Validation validation; +// +// // Empty form is not allowed +// configuration = new ExportJobConfiguration(); +// validation = validator.validateJob(MJob.Type.EXPORT, configuration); +// assertEquals(Status.UNACCEPTABLE, validation.getStatus()); +// assertTrue(validation.getMessages().containsKey(new Validation.FormInput("input.inputDirectory"))); +// +// // Explicitly setting extractors and loaders +// configuration = new ExportJobConfiguration(); +// configuration.input.inputDirectory = "/czech/republic"; +// configuration.throttling.extractors = 3; +// configuration.throttling.loaders = 3; +// +// validation = validator.validateJob(MJob.Type.EXPORT, configuration); +// assertEquals(Status.FINE, validation.getStatus()); +// assertEquals(0, validation.getMessages().size()); +// +// // Negative and zero values for extractors and loaders +// configuration = new ExportJobConfiguration(); +// configuration.input.inputDirectory = "/czech/republic"; +// configuration.throttling.extractors = 0; +// configuration.throttling.loaders = -1; +// +// validation = validator.validateJob(MJob.Type.EXPORT, configuration); +// assertEquals(Status.UNACCEPTABLE, validation.getStatus()); +// assertTrue(validation.getMessages().containsKey(new Validation.FormInput("throttling.extractors"))); +// assertTrue(validation.getMessages().containsKey(new Validation.FormInput("throttling.loaders"))); +// } +// +// +// @Test +// public void testImportJobValidation() { +// ImportJobConfiguration configuration; +// Validation validation; +// +// // Empty form is not allowed +// configuration = new ImportJobConfiguration(); +// validation = validator.validateJob(MJob.Type.IMPORT, configuration); +// assertEquals(Status.UNACCEPTABLE, validation.getStatus()); +// assertTrue(validation.getMessages().containsKey(new Validation.FormInput("output.outputDirectory"))); +// +// // Explicitly setting extractors and loaders +// configuration = new ImportJobConfiguration(); +// configuration.output.outputDirectory = "/czech/republic"; +// configuration.throttling.extractors = 3; +// configuration.throttling.loaders = 3; +// +// validation = validator.validateJob(MJob.Type.IMPORT, configuration); +// assertEquals(Status.FINE, validation.getStatus()); +// assertEquals(0, validation.getMessages().size()); +// +// // Negative and zero values for extractors and loaders +// configuration = new ImportJobConfiguration(); +// configuration.output.outputDirectory = "/czech/republic"; // configuration.throttling.extractors = 0; // configuration.throttling.loaders = -1; -// result = runner.validate(configuration); -// assertEquals(Status.FINE, result.getStatus()); -// assertTrue(result.getMessages().containsKey("throttling.extractors")); -// assertTrue(result.getMessages().containsKey("throttling.loaders")); - } +// +// validation = validator.validateJob(MJob.Type.IMPORT, configuration); +// assertEquals(Status.UNACCEPTABLE, validation.getStatus()); +// assertTrue(validation.getMessages().containsKey(new Validation.FormInput("throttling.extractors"))); +// assertTrue(validation.getMessages().containsKey(new Validation.FormInput("throttling.loaders"))); +// +// // specifying both compression as well as customCompression is +// // unacceptable +// configuration = new ImportJobConfiguration(); +// configuration.output.outputDirectory = "/czech/republic"; +// configuration.throttling.extractors = 2; +// configuration.throttling.loaders = 2; +// configuration.output.compression = OutputCompression.BZIP2; +// configuration.output.customCompression = "some.compression.codec"; +// +// validation = validator.validateJob(MJob.Type.IMPORT, configuration); +// assertEquals(Status.UNACCEPTABLE, validation.getStatus()); +// assertTrue(validation.getMessages().containsKey(new Validation.FormInput("output.compression"))); +// +// // specifying a customCompression is fine +// configuration = new ImportJobConfiguration(); +// configuration.output.outputDirectory = "/czech/republic"; +// configuration.throttling.extractors = 2; +// configuration.throttling.loaders = 2; +// configuration.output.compression = OutputCompression.CUSTOM; +// configuration.output.customCompression = "some.compression.codec"; +// +// validation = validator.validateJob(MJob.Type.IMPORT, configuration); +// assertEquals(Status.FINE, validation.getStatus()); +// +// // specifying a customCompression without codec name is unacceptable +// configuration = new ImportJobConfiguration(); +// configuration.output.outputDirectory = "/czech/republic"; +// configuration.throttling.extractors = 2; +// configuration.throttling.loaders = 2; +// configuration.output.compression = OutputCompression.CUSTOM; +// configuration.output.customCompression = ""; +// +// validation = validator.validateJob(MJob.Type.IMPORT, configuration); +// assertEquals(Status.UNACCEPTABLE, validation.getStatus()); +// assertTrue(validation.getMessages().containsKey(new Validation.FormInput("output.compression"))); +// +// configuration = new ImportJobConfiguration(); +// configuration.output.outputDirectory = "/czech/republic"; +// configuration.throttling.extractors = 2; +// configuration.throttling.loaders = 2; +// configuration.output.compression = OutputCompression.CUSTOM; +// configuration.output.customCompression = null; +// +// validation = validator.validateJob(MJob.Type.IMPORT, configuration); +// assertEquals(Status.UNACCEPTABLE, validation.getStatus()); +// assertTrue(validation.getMessages().containsKey(new Validation.FormInput("output.compression"))); +// +// } }