SQOOP-1574: Sqoop2: From/To: Rebase against Sqoop2 branch
Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/d7e04904 Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/d7e04904 Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/d7e04904 Branch: refs/heads/sqoop2 Commit: d7e04904300ddef91cb70d65efbc16493b206815 Parents: fc8eb9b Author: Abraham Elmahrek <abra...@elmahrek.com> Authored: Thu Oct 9 19:06:32 2014 -0700 Committer: Abraham Elmahrek <abra...@elmahrek.com> Committed: Thu Oct 9 19:39:41 2014 -0700 ---------------------------------------------------------------------- .../apache/sqoop/tools/tool/JSONConstants.java | 4 +- .../sqoop/tools/tool/RepositoryDumpTool.java | 15 +- .../sqoop/tools/tool/RepositoryLoadTool.java | 190 +++++++++---------- 3 files changed, 95 insertions(+), 114 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/sqoop/blob/d7e04904/tools/src/main/java/org/apache/sqoop/tools/tool/JSONConstants.java ---------------------------------------------------------------------- diff --git a/tools/src/main/java/org/apache/sqoop/tools/tool/JSONConstants.java b/tools/src/main/java/org/apache/sqoop/tools/tool/JSONConstants.java index 288cba3..9bec0d0 100644 --- a/tools/src/main/java/org/apache/sqoop/tools/tool/JSONConstants.java +++ b/tools/src/main/java/org/apache/sqoop/tools/tool/JSONConstants.java @@ -28,9 +28,9 @@ public final class JSONConstants { public static final String CONNECTOR_NAME = "connector-name"; public static final String ALL = "all"; public static final String NAME = "name"; - public static final String CONNECTION_ID = "connection-id"; + public static final String LINK_ID = "link-id"; public static final String JOB_ID = "job-id"; - public static final String CONNECTIONS = "connections"; + public static final String LINKS = "links"; public static final String JOBS = "jobs"; public static final String SUBMISSIONS = "submissions"; public static final String METADATA = "metadata"; http://git-wip-us.apache.org/repos/asf/sqoop/blob/d7e04904/tools/src/main/java/org/apache/sqoop/tools/tool/RepositoryDumpTool.java ---------------------------------------------------------------------- diff --git a/tools/src/main/java/org/apache/sqoop/tools/tool/RepositoryDumpTool.java b/tools/src/main/java/org/apache/sqoop/tools/tool/RepositoryDumpTool.java index d41b0d2..f89c546 100644 --- a/tools/src/main/java/org/apache/sqoop/tools/tool/RepositoryDumpTool.java +++ b/tools/src/main/java/org/apache/sqoop/tools/tool/RepositoryDumpTool.java @@ -25,25 +25,21 @@ import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.ParseException; import org.apache.log4j.Logger; import org.apache.sqoop.connector.ConnectorManager; -import org.apache.sqoop.json.ConnectionBean; import org.apache.sqoop.json.JobBean; +import org.apache.sqoop.json.LinkBean; import org.apache.sqoop.json.SubmissionBean; -import org.apache.sqoop.model.MConnector; import org.apache.sqoop.repository.Repository; import org.apache.sqoop.repository.RepositoryManager; import org.apache.sqoop.tools.ConfiguredTool; import org.apache.sqoop.common.VersionInfo; -import static org.apache.sqoop.json.util.FormSerialization.ALL; +import static org.apache.sqoop.json.util.ConfigSerialization.ALL; import org.json.simple.JSONArray; import org.json.simple.JSONObject; import java.io.BufferedWriter; import java.io.FileWriter; import java.io.IOException; -import java.util.HashMap; import java.util.Iterator; -import java.util.List; -import java.util.Map; /** * Write user-created content of Sqoop repository to JSON formatted file @@ -106,8 +102,8 @@ public class RepositoryDumpTool extends ConfiguredTool { JSONObject result = new JSONObject(); LOG.info("Dumping Connections with skipSensitive=" + String.valueOf(skipSensitive)); - ConnectionBean connections = new ConnectionBean(repository.findConnections()); - result.put(JSONConstants.CONNECTIONS, addConnectorName(connections.extract(skipSensitive))); + LinkBean links = new LinkBean(repository.findLinks()); + result.put(JSONConstants.LINKS, addConnectorName(links.extract(skipSensitive))); LOG.info("Dumping Jobs with skipSensitive=" + String.valueOf(skipSensitive)); JobBean jobs = new JobBean(repository.findJobs()); @@ -134,7 +130,6 @@ public class RepositoryDumpTool extends ConfiguredTool { } private JSONObject addConnectorName(JSONObject json) { - Repository repository = RepositoryManager.getInstance().getRepository(); ConnectorManager connectorManager = ConnectorManager.getInstance(); JSONArray results = (JSONArray) json.get(ALL); @@ -144,7 +139,7 @@ public class RepositoryDumpTool extends ConfiguredTool { while (iterator.hasNext()) { JSONObject result = iterator.next(); Long connectorId = (Long) result.get(JSONConstants.CONNECTOR_ID); - result.put(JSONConstants.CONNECTOR_NAME, connectorManager.getConnectorMetadata(connectorId).getUniqueName()); + result.put(JSONConstants.CONNECTOR_NAME, connectorManager.getConnectorConfig(connectorId).getUniqueName()); } return json; http://git-wip-us.apache.org/repos/asf/sqoop/blob/d7e04904/tools/src/main/java/org/apache/sqoop/tools/tool/RepositoryLoadTool.java ---------------------------------------------------------------------- diff --git a/tools/src/main/java/org/apache/sqoop/tools/tool/RepositoryLoadTool.java b/tools/src/main/java/org/apache/sqoop/tools/tool/RepositoryLoadTool.java index c6124da..76ebd3b 100644 --- a/tools/src/main/java/org/apache/sqoop/tools/tool/RepositoryLoadTool.java +++ b/tools/src/main/java/org/apache/sqoop/tools/tool/RepositoryLoadTool.java @@ -26,24 +26,27 @@ import org.apache.commons.cli.Options; import org.apache.commons.cli.ParseException; import org.apache.commons.io.Charsets; import org.apache.log4j.Logger; +import org.apache.sqoop.common.Direction; import org.apache.sqoop.common.VersionInfo; import org.apache.sqoop.connector.ConnectorManager; -import org.apache.sqoop.connector.spi.MetadataUpgrader; +import org.apache.sqoop.connector.spi.RepositoryUpgrader; import org.apache.sqoop.connector.spi.SqoopConnector; -import org.apache.sqoop.framework.FrameworkManager; -import org.apache.sqoop.json.ConnectionBean; +import org.apache.sqoop.driver.Driver; import org.apache.sqoop.json.JobBean; +import org.apache.sqoop.json.LinkBean; import org.apache.sqoop.json.SubmissionBean; -import org.apache.sqoop.model.FormUtils; -import org.apache.sqoop.model.MConnection; -import org.apache.sqoop.model.MConnectionForms; +import org.apache.sqoop.model.ConfigUtils; +import org.apache.sqoop.model.MConfig; import org.apache.sqoop.model.MConnector; -import org.apache.sqoop.model.MForm; -import org.apache.sqoop.model.MFramework; +import org.apache.sqoop.model.MDriver; +import org.apache.sqoop.model.MDriverConfig; +import org.apache.sqoop.model.MFromConfig; import org.apache.sqoop.model.MJob; -import org.apache.sqoop.model.MJobForms; +import org.apache.sqoop.model.MLink; +import org.apache.sqoop.model.MLinkConfig; import org.apache.sqoop.model.MPersistableEntity; import org.apache.sqoop.model.MSubmission; +import org.apache.sqoop.model.MToConfig; import org.apache.sqoop.repository.Repository; import org.apache.sqoop.repository.RepositoryManager; import org.apache.sqoop.tools.ConfiguredTool; @@ -58,9 +61,9 @@ import java.util.Map; import org.apache.commons.io.IOUtils; import org.apache.sqoop.utils.ClassUtils; +import org.apache.sqoop.validation.ConfigValidationResult; +import org.apache.sqoop.validation.ConfigValidationRunner; import org.apache.sqoop.validation.Status; -import org.apache.sqoop.validation.Validation; -import org.apache.sqoop.validation.Validator; import org.json.simple.JSONArray; import org.json.simple.JSONObject; import org.json.simple.JSONValue; @@ -139,28 +142,25 @@ public class RepositoryLoadTool extends ConfiguredTool { ConnectorManager.getInstance().initialize(); ConnectorManager connectorManager = ConnectorManager.getInstance(); - FrameworkManager.getInstance().initialize(); - FrameworkManager frameworkManager = FrameworkManager.getInstance(); - LOG.info("Loading Connections"); - JSONObject jsonConns = (JSONObject) repo.get(JSONConstants.CONNECTIONS); + JSONObject jsonConns = (JSONObject) repo.get(JSONConstants.LINKS); if (jsonConns == null) { - LOG.error("Malformed JSON file. Key "+ JSONConstants.CONNECTIONS + " not found."); + LOG.error("Malformed JSON file. Key "+ JSONConstants.LINKS + " not found."); return false; } - ConnectionBean connectionBean = new ConnectionBean(); - connectionBean.restore(updateConnectorIDUsingName(jsonConns)); + LinkBean linkBean = new LinkBean(); + linkBean.restore(updateConnectorIDUsingName(jsonConns)); HashMap<Long,Long> connectionIds = new HashMap<Long, Long>(); - for (MConnection connection : connectionBean.getConnections()) { - long oldId = connection.getPersistenceId(); - long newId = loadConnection(connection); - if (newId == connection.PERSISTANCE_ID_DEFAULT) { - LOG.error("loading connection " + connection.getName() + " with previous ID " + oldId + " failed. Aborting repository load. Check log for details."); + for (MLink link : linkBean.getLinks()) { + long oldId = link.getPersistenceId(); + long newId = loadLink(link); + if (newId == link.PERSISTANCE_ID_DEFAULT) { + LOG.error("loading connection " + link.getName() + " with previous ID " + oldId + " failed. Aborting repository load. Check log for details."); return false; } connectionIds.put(oldId,newId); @@ -176,7 +176,7 @@ public class RepositoryLoadTool extends ConfiguredTool { } JobBean jobBean = new JobBean(); - jobBean.restore(updateIdUsingMap(updateConnectorIDUsingName(jsonJobs), connectionIds,JSONConstants.CONNECTION_ID)); + jobBean.restore(updateIdUsingMap(updateConnectorIDUsingName(jsonJobs), connectionIds,JSONConstants.LINK_ID)); HashMap<Long,Long> jobIds = new HashMap<Long, Long>(); for (MJob job: jobBean.getJobs()) { @@ -242,117 +242,103 @@ public class RepositoryLoadTool extends ConfiguredTool { return true; } - private long loadConnection(MConnection connection) { + private long loadLink(MLink link) { //starting by pretending we have a brand new connection - resetPersistenceId(connection); + resetPersistenceId(link); - MetadataUpgrader upgrader = FrameworkManager.getInstance().getMetadataUpgrader(); - MFramework framework = FrameworkManager.getInstance().getFramework(); + RepositoryUpgrader upgrader = Driver.getInstance().getDriverConfigRepositoryUpgrader(); Repository repository = RepositoryManager.getInstance().getRepository(); - List<MForm> frameworkForms = framework.getConnectionForms().clone(false).getForms(); - MConnectionForms newConnectionFrameworkForms = new MConnectionForms(frameworkForms); - - MConnector mConnector = ConnectorManager.getInstance().getConnectorMetadata(connection.getConnectorId()); - List<MForm> connectorForms = mConnector.getConnectionForms().clone(false).getForms(); - MConnectionForms newConnectionConnectorForms = new MConnectionForms(connectorForms); + MConnector mConnector = ConnectorManager.getInstance().getConnectorConfig(link.getConnectorId()); + List<MConfig> connectorConfigs = mConnector.getLinkConfig().clone(false).getConfigs(); + MLinkConfig newLinkConfigs = new MLinkConfig(connectorConfigs); // upgrading the forms to make sure they match the current repository - upgrader.upgrade(connection.getFrameworkPart(), newConnectionFrameworkForms); - upgrader.upgrade(connection.getConnectorPart(), newConnectionConnectorForms); - MConnection newConnection = new MConnection(connection, newConnectionConnectorForms, newConnectionFrameworkForms); + upgrader.upgrade(link.getConnectorLinkConfig(), newLinkConfigs); + MLink newLink = new MLink(link, newLinkConfigs); - // Transform form structures to objects for validations + // Transform config structures to objects for validations SqoopConnector connector = - ConnectorManager.getInstance().getConnector(connection.getConnectorId()); + ConnectorManager.getInstance().getConnector(link.getConnectorId()); Object connectorConfig = ClassUtils.instantiate( - connector.getConnectionConfigurationClass()); - Object frameworkConfig = ClassUtils.instantiate( - FrameworkManager.getInstance().getConnectionConfigurationClass()); + connector.getLinkConfigurationClass()); - FormUtils.fromForms( - connection.getConnectorPart().getForms(), connectorConfig); - FormUtils.fromForms( - connection.getFrameworkPart().getForms(), frameworkConfig); + ConfigUtils.fromConfigs( + link.getConnectorLinkConfig().getConfigs(), connectorConfig); - Validator connectorValidator = connector.getValidator(); - Validator frameworkValidator = FrameworkManager.getInstance().getValidator(); + ConfigValidationRunner validationRunner = new ConfigValidationRunner(); + ConfigValidationResult result = validationRunner.validate(connectorConfig); - Validation connectorValidation = - connectorValidator.validateConnection(connectorConfig); - Validation frameworkValidation = - frameworkValidator.validateConnection(frameworkConfig); - - Status finalStatus = Status.getWorstStatus(connectorValidation.getStatus(), - frameworkValidation.getStatus()); + Status finalStatus = Status.getWorstStatus(result.getStatus()); if (finalStatus.canProceed()) { - repository.createConnection(newConnection); + repository.createLink(newLink); } else { - LOG.error("Failed to load connection:" + connection.getName()); - LOG.error("Status of connector forms:" + connectorValidation.getStatus().toString()); - LOG.error("Status of framework forms:" + frameworkValidation.getStatus().toString()); + LOG.error("Failed to load link:" + link.getName()); + LOG.error("Status of connector configs:" + result.getStatus().toString()); } - return newConnection.getPersistenceId(); + return newLink.getPersistenceId(); } private long loadJob(MJob job) { //starting by pretending we have a brand new job resetPersistenceId(job); - - MetadataUpgrader upgrader = FrameworkManager.getInstance().getMetadataUpgrader(); - MFramework framework = FrameworkManager.getInstance().getFramework(); + RepositoryUpgrader upgrader = Driver.getInstance().getDriverConfigRepositoryUpgrader(); + MDriver driver = Driver.getInstance().getDriver(); Repository repository = RepositoryManager.getInstance().getRepository(); - MJob.Type jobType = job.getType(); - List<MForm> frameworkForms = framework.getJobForms(job.getType()).clone(false).getForms(); - MJobForms newJobFrameworkForms = new MJobForms(jobType,frameworkForms); - - MConnector mConnector = ConnectorManager.getInstance().getConnectorMetadata(job.getConnectorId()); - List<MForm> connectorForms = mConnector.getJobForms(jobType).clone(false).getForms(); - MJobForms newJobConnectorForms = new MJobForms(jobType,connectorForms); - - // upgrading the forms to make sure they match the current repository - upgrader.upgrade(job.getFrameworkPart(), newJobFrameworkForms); - upgrader.upgrade(job.getConnectorPart(), newJobConnectorForms); - MJob newJob = new MJob(job, newJobConnectorForms, newJobFrameworkForms); - - // Transform form structures to objects for validations - SqoopConnector connector = - ConnectorManager.getInstance().getConnector(job.getConnectorId()); - - Object connectorConfig = ClassUtils.instantiate( - connector.getJobConfigurationClass(jobType)); - Object frameworkConfig = ClassUtils.instantiate( - FrameworkManager.getInstance().getJobConfigurationClass(jobType)); - - FormUtils.fromForms( - job.getConnectorPart().getForms(), connectorConfig); - FormUtils.fromForms( - job.getFrameworkPart().getForms(), frameworkConfig); - - Validator connectorValidator = connector.getValidator(); - Validator frameworkValidator = FrameworkManager.getInstance().getValidator(); - - Validation connectorValidation = - connectorValidator.validateJob(jobType,connectorConfig); - Validation frameworkValidation = - frameworkValidator.validateJob(jobType,frameworkConfig); - - Status finalStatus = Status.getWorstStatus(connectorValidation.getStatus(), - frameworkValidation.getStatus()); + MDriverConfig driverConfigs = driver.getDriverConfig(); + MFromConfig fromConfigs = job.getFromJobConfig(); + MToConfig toConfigs = job.getToJobConfig(); + + // upgrading the configs to make sure they match the current repository + upgrader.upgrade(job.getDriverConfig(), driverConfigs); + upgrader.upgrade(job.getFromJobConfig(), fromConfigs); + upgrader.upgrade(job.getToJobConfig(), toConfigs); + MJob newJob = new MJob(job, fromConfigs, toConfigs, driverConfigs); + + // Transform config structures to objects for validations + SqoopConnector fromConnector = + ConnectorManager.getInstance().getConnector( + job.getConnectorId(Direction.FROM)); + SqoopConnector toConnector = + ConnectorManager.getInstance().getConnector( + job.getConnectorId(Direction.TO)); + + Object fromConnectorConfig = ClassUtils.instantiate( + fromConnector.getJobConfigurationClass(Direction.FROM)); + Object toConnectorConfig = ClassUtils.instantiate( + toConnector.getJobConfigurationClass(Direction.TO)); + Object driverConfig = ClassUtils.instantiate( + Driver.getInstance().getDriverConfigurationGroupClass()); + + ConfigUtils.fromConfigs( + job.getFromJobConfig().getConfigs(), fromConnectorConfig); + ConfigUtils.fromConfigs( + job.getToJobConfig().getConfigs(), toConnectorConfig); + ConfigUtils.fromConfigs( + job.getDriverConfig().getConfigs(), driverConfig); + + ConfigValidationRunner validationRunner = new ConfigValidationRunner(); + ConfigValidationResult fromConnectorConfigResult = validationRunner.validate(fromConnectorConfig); + ConfigValidationResult toConnectorConfigResult = validationRunner.validate(toConnectorConfig); + ConfigValidationResult driverConfigResult = validationRunner.validate(driverConfig); + + Status finalStatus = Status.getWorstStatus(fromConnectorConfigResult.getStatus(), + toConnectorConfigResult.getStatus(), driverConfigResult.getStatus()); if (finalStatus.canProceed()) { repository.createJob(newJob); } else { LOG.error("Failed to load job:" + job.getName()); - LOG.error("Status of connector forms:" + connectorValidation.getStatus().toString()); - LOG.error("Status of framework forms:" + frameworkValidation.getStatus().toString()); + LOG.error("Status of from connector configs:" + fromConnectorConfigResult.getStatus().toString()); + LOG.error("Status of to connector configs:" + toConnectorConfigResult.getStatus().toString()); + LOG.error("Status of driver configs:" + driverConfigResult.getStatus().toString()); } return newJob.getPersistenceId();