http://git-wip-us.apache.org/repos/asf/sqoop/blob/049994a0/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsMetadataUpgrader.java ---------------------------------------------------------------------- diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsMetadataUpgrader.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsMetadataUpgrader.java deleted file mode 100644 index 3e51e38..0000000 --- a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsMetadataUpgrader.java +++ /dev/null @@ -1,83 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.sqoop.connector.hdfs; - -import org.apache.log4j.Logger; -import org.apache.sqoop.common.SqoopException; -import org.apache.sqoop.connector.spi.MetadataUpgrader; -import org.apache.sqoop.model.MConnectionForms; -import org.apache.sqoop.model.MForm; -import org.apache.sqoop.model.MInput; -import org.apache.sqoop.model.MJobForms; - -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -public class HdfsMetadataUpgrader extends MetadataUpgrader { - private static final Logger LOG = - Logger.getLogger(HdfsMetadataUpgrader.class); - - /* - * For now, there is no real upgrade. So copy all data over, - * set the validation messages and error messages to be the same as for the - * inputs in the original one. - */ - - @Override - public void upgrade(MConnectionForms original, - MConnectionForms upgradeTarget) { - doUpgrade(original.getForms(), upgradeTarget.getForms()); - } - - @Override - public void upgrade(MJobForms original, MJobForms upgradeTarget) { - doUpgrade(original.getForms(), upgradeTarget.getForms()); - } - - @SuppressWarnings("unchecked") - private void doUpgrade(List<MForm> original, List<MForm> target) { - // Easier to find the form in the original forms list if we use a map. - // Since the constructor of MJobForms takes a list, - // index is not guaranteed to be the same, so we need to look for - // equivalence - Map<String, MForm> formMap = new HashMap<String, MForm>(); - for (MForm form : original) { - formMap.put(form.getName(), form); - } - for (MForm form : target) { - List<MInput<?>> inputs = form.getInputs(); - MForm originalForm = formMap.get(form.getName()); - if (originalForm == null) { - LOG.warn("Form: '" + form.getName() + "' not present in old " + - "connector. So it and its inputs will not be transferred by the upgrader."); - continue; - } - for (MInput input : inputs) { - try { - MInput originalInput = originalForm.getInput(input.getName()); - input.setValue(originalInput.getValue()); - } catch (SqoopException ex) { - LOG.warn("Input: '" + input.getName() + "' not present in old " + - "connector. So it will not be transferred by the upgrader."); - } - } - } - } -}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/049994a0/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsPartitioner.java ---------------------------------------------------------------------- diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsPartitioner.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsPartitioner.java index df764d2..6828de8 100644 --- a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsPartitioner.java +++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsPartitioner.java @@ -39,7 +39,7 @@ import org.apache.hadoop.io.compress.CompressionCodecFactory; import org.apache.hadoop.net.NodeBase; import org.apache.hadoop.net.NetworkTopology; import org.apache.sqoop.common.SqoopException; -import org.apache.sqoop.connector.hdfs.configuration.ConnectionConfiguration; +import org.apache.sqoop.connector.hdfs.configuration.LinkConfiguration; import org.apache.sqoop.connector.hdfs.configuration.FromJobConfiguration; import org.apache.sqoop.job.etl.Partition; import org.apache.sqoop.job.etl.Partitioner; @@ -50,7 +50,7 @@ import org.apache.sqoop.common.PrefixContext; * This class derives mostly from CombineFileInputFormat of Hadoop, i.e. * org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat. */ -public class HdfsPartitioner extends Partitioner<ConnectionConfiguration, FromJobConfiguration> { +public class HdfsPartitioner extends Partitioner<LinkConfiguration, FromJobConfiguration> { public static final String SPLIT_MINSIZE_PERNODE = "mapreduce.input.fileinputformat.split.minsize.per.node"; @@ -68,12 +68,12 @@ public class HdfsPartitioner extends Partitioner<ConnectionConfiguration, FromJo @Override public List<Partition> getPartitions(PartitionerContext context, - ConnectionConfiguration connectionConfiguration, FromJobConfiguration jobConfiguration) { + LinkConfiguration linkConfiguration, FromJobConfiguration jobConfiguration) { Configuration conf = ((PrefixContext)context.getContext()).getConfiguration(); try { - long numInputBytes = getInputSize(conf, jobConfiguration.input.inputDirectory); + long numInputBytes = getInputSize(conf, jobConfiguration.fromJobConfig.inputDirectory); maxSplitSize = numInputBytes / context.getMaxPartitions(); if(numInputBytes % context.getMaxPartitions() != 0 ) { @@ -118,7 +118,7 @@ public class HdfsPartitioner extends Partitioner<ConnectionConfiguration, FromJo } // all the files in input set - String indir = jobConfiguration.input.inputDirectory; + String indir = jobConfiguration.fromJobConfig.inputDirectory; FileSystem fs = FileSystem.get(conf); List<Path> paths = new LinkedList<Path>(); @@ -147,7 +147,7 @@ public class HdfsPartitioner extends Partitioner<ConnectionConfiguration, FromJo } } - //TODO: Perhaps get the FS from connection configuration so we can support remote HDFS + //TODO: Perhaps get the FS from link configuration so we can support remote HDFS private long getInputSize(Configuration conf, String indir) throws IOException { FileSystem fs = FileSystem.get(conf); FileStatus[] files = fs.listStatus(new Path(indir)); http://git-wip-us.apache.org/repos/asf/sqoop/blob/049994a0/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsValidator.java ---------------------------------------------------------------------- diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsValidator.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsValidator.java index 4efbd33..dfa3659 100644 --- a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsValidator.java +++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsValidator.java @@ -23,13 +23,13 @@ import org.apache.sqoop.validation.Validation; import org.apache.sqoop.validation.Validator; /** - * Validate framework configuration objects + * Validate configuration objects */ public class HdfsValidator extends Validator { @Override - public Validation validateConnection(Object connectionConfiguration) { - Validation validation = new Validation(ConnectionConfiguration.class); + public Validation validateLink(Object connectionConfiguration) { + Validation validation = new Validation(LinkConfiguration.class); // No validation on connection object return validation; } @@ -41,42 +41,39 @@ public class HdfsValidator extends Validator { return super.validateJob(jobConfiguration); } - private Validation validateExportJob(Object jobConfiguration) { + @SuppressWarnings("unused") + private Validation validateFromJob(Object jobConfiguration) { Validation validation = new Validation(FromJobConfiguration.class); FromJobConfiguration configuration = (FromJobConfiguration)jobConfiguration; - - validateInputForm(validation, configuration.input); - - + validateInputForm(validation, configuration.fromJobConfig); return validation; } - private Validation validateImportJob(Object jobConfiguration) { + @SuppressWarnings("unused") + private Validation validateToJob(Object jobConfiguration) { Validation validation = new Validation(ToJobConfiguration.class); ToJobConfiguration configuration = (ToJobConfiguration)jobConfiguration; - - validateOutputForm(validation, configuration.output); - + validateOutputForm(validation, configuration.toJobConfig); return validation; } - private void validateInputForm(Validation validation, InputForm input) { + private void validateInputForm(Validation validation, FromJobConfig input) { if(input.inputDirectory == null || input.inputDirectory.isEmpty()) { validation.addMessage(Status.UNACCEPTABLE, "input", "inputDirectory", "Input directory is empty"); } } - private void validateOutputForm(Validation validation, OutputForm output) { + private void validateOutputForm(Validation validation, ToJobConfig output) { if(output.outputDirectory == null || output.outputDirectory.isEmpty()) { validation.addMessage(Status.UNACCEPTABLE, "output", "outputDirectory", "Output directory is empty"); } if(output.customCompression != null && output.customCompression.trim().length() > 0 && - output.compression != OutputCompression.CUSTOM) { + output.compression != ToCompression.CUSTOM) { validation.addMessage(Status.UNACCEPTABLE, "output", "compression", "custom compression should be blank as " + output.compression + " is being used."); } - if(output.compression == OutputCompression.CUSTOM && + if(output.compression == ToCompression.CUSTOM && (output.customCompression == null || output.customCompression.trim().length() == 0) ) { @@ -84,6 +81,4 @@ public class HdfsValidator extends Validator { "custom compression is blank."); } } - - } http://git-wip-us.apache.org/repos/asf/sqoop/blob/049994a0/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/ConnectionConfiguration.java ---------------------------------------------------------------------- diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/ConnectionConfiguration.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/ConnectionConfiguration.java deleted file mode 100644 index 6dd79d5..0000000 --- a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/ConnectionConfiguration.java +++ /dev/null @@ -1,31 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.sqoop.connector.hdfs.configuration; - -import org.apache.sqoop.model.ConfigurationClass; -import org.apache.sqoop.model.Form; - -@ConfigurationClass -public class ConnectionConfiguration { - @Form - public ConnectionForm connection; - - public ConnectionConfiguration() { - connection = new ConnectionForm(); - } -} http://git-wip-us.apache.org/repos/asf/sqoop/blob/049994a0/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/ConnectionForm.java ---------------------------------------------------------------------- diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/ConnectionForm.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/ConnectionForm.java deleted file mode 100644 index 7dad2a2..0000000 --- a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/ConnectionForm.java +++ /dev/null @@ -1,29 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.sqoop.connector.hdfs.configuration; - -import org.apache.sqoop.model.FormClass; -import org.apache.sqoop.model.Input; - -@FormClass -public class ConnectionForm { - //Todo: Didn't find anything that belongs here... - // Since empty forms don't work (DERBYREPO_0008:The form contains no input metadata), I'm putting a dummy form here - - @Input(size = 255) public String dummy; -} http://git-wip-us.apache.org/repos/asf/sqoop/blob/049994a0/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/FromJobConfig.java ---------------------------------------------------------------------- diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/FromJobConfig.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/FromJobConfig.java new file mode 100644 index 0000000..2c98051 --- /dev/null +++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/FromJobConfig.java @@ -0,0 +1,30 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.connector.hdfs.configuration; + +import org.apache.sqoop.model.FormClass; +import org.apache.sqoop.model.Input; + +/** + * + */ +@FormClass +public class FromJobConfig { + + @Input(size = 255) public String inputDirectory; +} http://git-wip-us.apache.org/repos/asf/sqoop/blob/049994a0/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/FromJobConfiguration.java ---------------------------------------------------------------------- diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/FromJobConfiguration.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/FromJobConfiguration.java index bccb99d..f861237 100644 --- a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/FromJobConfiguration.java +++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/FromJobConfiguration.java @@ -22,11 +22,10 @@ import org.apache.sqoop.model.Form; @ConfigurationClass public class FromJobConfiguration { - @Form public InputForm input; - + @Form public FromJobConfig fromJobConfig; public FromJobConfiguration() { - input = new InputForm(); + fromJobConfig = new FromJobConfig(); } } http://git-wip-us.apache.org/repos/asf/sqoop/blob/049994a0/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/InputForm.java ---------------------------------------------------------------------- diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/InputForm.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/InputForm.java deleted file mode 100644 index 413f04c..0000000 --- a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/InputForm.java +++ /dev/null @@ -1,30 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.sqoop.connector.hdfs.configuration; - -import org.apache.sqoop.model.FormClass; -import org.apache.sqoop.model.Input; - -/** - * - */ -@FormClass -public class InputForm { - - @Input(size = 255) public String inputDirectory; -} http://git-wip-us.apache.org/repos/asf/sqoop/blob/049994a0/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/LinkConfig.java ---------------------------------------------------------------------- diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/LinkConfig.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/LinkConfig.java new file mode 100644 index 0000000..b689854 --- /dev/null +++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/LinkConfig.java @@ -0,0 +1,29 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.connector.hdfs.configuration; + +import org.apache.sqoop.model.FormClass; +import org.apache.sqoop.model.Input; + +@FormClass +public class LinkConfig { + //Todo: Didn't find anything that belongs here... + // Since empty forms don't work (DERBYREPO_0008:The form contains no input metadata), I'm putting a dummy form here + + @Input(size = 255) public String dummy; +} http://git-wip-us.apache.org/repos/asf/sqoop/blob/049994a0/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/LinkConfiguration.java ---------------------------------------------------------------------- diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/LinkConfiguration.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/LinkConfiguration.java new file mode 100644 index 0000000..4970821 --- /dev/null +++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/LinkConfiguration.java @@ -0,0 +1,31 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.connector.hdfs.configuration; + +import org.apache.sqoop.model.ConfigurationClass; +import org.apache.sqoop.model.Form; + +@ConfigurationClass +public class LinkConfiguration { + @Form + public LinkConfig link; + + public LinkConfiguration() { + link = new LinkConfig(); + } +} http://git-wip-us.apache.org/repos/asf/sqoop/blob/049994a0/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/OutputCompression.java ---------------------------------------------------------------------- diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/OutputCompression.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/OutputCompression.java deleted file mode 100644 index 55db1bc..0000000 --- a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/OutputCompression.java +++ /dev/null @@ -1,33 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.sqoop.connector.hdfs.configuration; - -/** - * Supported compressions - */ -public enum OutputCompression { - NONE, - DEFAULT, - DEFLATE, - GZIP, - BZIP2, - LZO, - LZ4, - SNAPPY, - CUSTOM, -} http://git-wip-us.apache.org/repos/asf/sqoop/blob/049994a0/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/OutputForm.java ---------------------------------------------------------------------- diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/OutputForm.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/OutputForm.java deleted file mode 100644 index d57b4c2..0000000 --- a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/OutputForm.java +++ /dev/null @@ -1,36 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.sqoop.connector.hdfs.configuration; - -import org.apache.sqoop.model.FormClass; -import org.apache.sqoop.model.Input; - -/** - * - */ -@FormClass -public class OutputForm { - - @Input public OutputFormat outputFormat; - - @Input public OutputCompression compression; - - @Input(size = 255) public String customCompression; - - @Input(size = 255) public String outputDirectory; -} http://git-wip-us.apache.org/repos/asf/sqoop/blob/049994a0/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/OutputFormat.java ---------------------------------------------------------------------- diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/OutputFormat.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/OutputFormat.java deleted file mode 100644 index 676c33c..0000000 --- a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/OutputFormat.java +++ /dev/null @@ -1,33 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.sqoop.connector.hdfs.configuration; - -/** - * Various supported formats on disk - */ -public enum OutputFormat { - /** - * Comma separated text file - */ - TEXT_FILE, - - /** - * Sequence file - */ - SEQUENCE_FILE, -} http://git-wip-us.apache.org/repos/asf/sqoop/blob/049994a0/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/ToCompression.java ---------------------------------------------------------------------- diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/ToCompression.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/ToCompression.java new file mode 100644 index 0000000..34e629a --- /dev/null +++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/ToCompression.java @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.connector.hdfs.configuration; + +/** + * Supported compressions + */ +public enum ToCompression { + NONE, + DEFAULT, + DEFLATE, + GZIP, + BZIP2, + LZO, + LZ4, + SNAPPY, + CUSTOM, +} http://git-wip-us.apache.org/repos/asf/sqoop/blob/049994a0/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/ToFormat.java ---------------------------------------------------------------------- diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/ToFormat.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/ToFormat.java new file mode 100644 index 0000000..27d121f --- /dev/null +++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/ToFormat.java @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.connector.hdfs.configuration; + +/** + * Various supported formats on disk + */ +public enum ToFormat { + /** + * Comma separated text file + */ + TEXT_FILE, + + /** + * Sequence file + */ + SEQUENCE_FILE, +} http://git-wip-us.apache.org/repos/asf/sqoop/blob/049994a0/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/ToJobConfig.java ---------------------------------------------------------------------- diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/ToJobConfig.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/ToJobConfig.java new file mode 100644 index 0000000..b1308db --- /dev/null +++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/ToJobConfig.java @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.connector.hdfs.configuration; + +import org.apache.sqoop.model.FormClass; +import org.apache.sqoop.model.Input; + +/** + * + */ +@FormClass +public class ToJobConfig { + + @Input public ToFormat outputFormat; + + @Input public ToCompression compression; + + @Input(size = 255) public String customCompression; + + @Input(size = 255) public String outputDirectory; +} http://git-wip-us.apache.org/repos/asf/sqoop/blob/049994a0/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/ToJobConfiguration.java ---------------------------------------------------------------------- diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/ToJobConfiguration.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/ToJobConfiguration.java index 65ee8a7..bba249c 100644 --- a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/ToJobConfiguration.java +++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/ToJobConfiguration.java @@ -23,9 +23,9 @@ import org.apache.sqoop.model.Form; @ConfigurationClass public class ToJobConfiguration { @Form - public OutputForm output; + public ToJobConfig toJobConfig; public ToJobConfiguration() { - output = new OutputForm(); + toJobConfig = new ToJobConfig(); } } http://git-wip-us.apache.org/repos/asf/sqoop/blob/049994a0/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/hdfsWriter/HdfsSequenceWriter.java ---------------------------------------------------------------------- diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/hdfsWriter/HdfsSequenceWriter.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/hdfsWriter/HdfsSequenceWriter.java index eb80121..75c2e7e 100644 --- a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/hdfsWriter/HdfsSequenceWriter.java +++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/hdfsWriter/HdfsSequenceWriter.java @@ -24,7 +24,6 @@ import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.compress.CompressionCodec; - import java.io.IOException; public class HdfsSequenceWriter extends GenericHdfsWriter { @@ -32,6 +31,7 @@ public class HdfsSequenceWriter extends GenericHdfsWriter { private SequenceFile.Writer filewriter; private Text text; + @SuppressWarnings("deprecation") public void initialize(Path filepath, Configuration conf, CompressionCodec codec) throws IOException { if (codec != null) { filewriter = SequenceFile.createWriter(filepath.getFileSystem(conf), http://git-wip-us.apache.org/repos/asf/sqoop/blob/049994a0/connector/connector-hdfs/src/main/resources/hdfs-connector-config.properties ---------------------------------------------------------------------- diff --git a/connector/connector-hdfs/src/main/resources/hdfs-connector-config.properties b/connector/connector-hdfs/src/main/resources/hdfs-connector-config.properties new file mode 100644 index 0000000..b603f2f --- /dev/null +++ b/connector/connector-hdfs/src/main/resources/hdfs-connector-config.properties @@ -0,0 +1,58 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Generic HDFS Connector Resources + +############################ +# Link Config +# +link.label = Link configuration +link.help = You must supply the information requested in order to \ + create a connection object. + +link.dummy.label = Dummy parameter needed to get HDFS connector to register +link.dummy.help = You can write anything here. Doesn't matter. + +# To Job Config +# +toJobConfig.label = ToJob configuration +toJobConfig.help = You must supply the information requested in order to \ + get information where you want to store your data. + +toJobConfig.storageType.label = Storage type +toJobConfig.storageType.help = Target on Hadoop ecosystem where to store data + +toJobConfig.outputFormat.label = Output format +toJobConfig.outputFormat.help = Format in which data should be serialized + +toJobConfig.compression.label = Compression format +toJobConfig.compression.help = Compression that should be used for the data + +toJobConfig.customCompression.label = Custom compression format +toJobConfig.customCompression.help = Full class name of the custom compression + +toJobConfig.outputDirectory.label = Output directory +toJobConfig.outputDirectory.help = Output directory for final data + +toJobConfig.ignored.label = Ignored +toJobConfig.ignored.help = This value is ignored + +# From Job Config +# +fromJobConfig.label = From Job configuration +fromJobConfig.help = Specifies information required to get data from Hadoop ecosystem + +fromJobConfig.inputDirectory.label = Input directory +fromJobConfig.inputDirectory.help = Directory that should be exported http://git-wip-us.apache.org/repos/asf/sqoop/blob/049994a0/connector/connector-hdfs/src/main/resources/hdfs-connector-resources.properties ---------------------------------------------------------------------- diff --git a/connector/connector-hdfs/src/main/resources/hdfs-connector-resources.properties b/connector/connector-hdfs/src/main/resources/hdfs-connector-resources.properties deleted file mode 100644 index 3125911..0000000 --- a/connector/connector-hdfs/src/main/resources/hdfs-connector-resources.properties +++ /dev/null @@ -1,58 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -# Generic HDFS Connector Resources - -############################ -# Connection Form -# -connection.label = Connection configuration -connection.help = You must supply the information requested in order to \ - create a connection object. - -connection.dummy.label = Dummy parameter needed to get HDFS connector to register -connection.dummy.help = You can write anything here. Doesn't matter. - -# Output From -# -output.label = Output configuration -output.help = You must supply the information requested in order to \ - get information where you want to store your data. - -output.storageType.label = Storage type -output.storageType.help = Target on Hadoop ecosystem where to store data - -output.outputFormat.label = Output format -output.outputFormat.help = Format in which data should be serialized - -output.compression.label = Compression format -output.compression.help = Compression that should be used for the data - -output.customCompression.label = Custom compression format -output.customCompression.help = Full class name of the custom compression - -output.outputDirectory.label = Output directory -output.outputDirectory.help = Output directory for final data - -output.ignored.label = Ignored -output.ignored.help = This value is ignored - -# Input Form -# -input.label = Input configuration -input.help = Specifies information required to get data from Hadoop ecosystem - -input.inputDirectory.label = Input directory -input.inputDirectory.help = Directory that should be exported http://git-wip-us.apache.org/repos/asf/sqoop/blob/049994a0/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestExtractor.java ---------------------------------------------------------------------- diff --git a/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestExtractor.java b/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestExtractor.java index 5e21543..7942d59 100644 --- a/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestExtractor.java +++ b/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestExtractor.java @@ -22,9 +22,9 @@ import org.apache.hadoop.io.compress.BZip2Codec; import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.io.compress.DefaultCodec; import org.apache.sqoop.common.PrefixContext; -import org.apache.sqoop.connector.hdfs.configuration.ConnectionConfiguration; +import org.apache.sqoop.connector.hdfs.configuration.LinkConfiguration; import org.apache.sqoop.connector.hdfs.configuration.FromJobConfiguration; -import org.apache.sqoop.connector.hdfs.configuration.OutputFormat; +import org.apache.sqoop.connector.hdfs.configuration.ToFormat; import org.apache.sqoop.etl.io.DataWriter; import org.apache.sqoop.job.etl.Extractor; import org.apache.sqoop.job.etl.ExtractorContext; @@ -40,8 +40,8 @@ import java.util.ArrayList; import java.util.Collection; import java.util.List; -import static org.apache.sqoop.connector.hdfs.configuration.OutputFormat.SEQUENCE_FILE; -import static org.apache.sqoop.connector.hdfs.configuration.OutputFormat.TEXT_FILE; +import static org.apache.sqoop.connector.hdfs.configuration.ToFormat.SEQUENCE_FILE; +import static org.apache.sqoop.connector.hdfs.configuration.ToFormat.TEXT_FILE; @RunWith(Parameterized.class) public class TestExtractor extends TestHdfsBase { @@ -49,12 +49,12 @@ public class TestExtractor extends TestHdfsBase { private static final int NUMBER_OF_FILES = 5; private static final int NUMBER_OF_ROWS_PER_FILE = 1000; - private OutputFormat outputFileType; + private ToFormat outputFileType; private Class<? extends CompressionCodec> compressionClass; private final String inputDirectory; private Extractor extractor; - public TestExtractor(OutputFormat outputFileType, + public TestExtractor(ToFormat outputFileType, Class<? extends CompressionCodec> compressionClass) throws Exception { this.inputDirectory = INPUT_ROOT + getClass().getSimpleName(); @@ -130,7 +130,7 @@ public class TestExtractor extends TestHdfsBase { throw new AssertionError("Should not be writing object."); } }, null); - ConnectionConfiguration connConf = new ConnectionConfiguration(); + LinkConfiguration connConf = new LinkConfiguration(); FromJobConfiguration jobConf = new FromJobConfiguration(); HdfsPartition partition = createPartition(FileUtils.listDir(inputDirectory)); http://git-wip-us.apache.org/repos/asf/sqoop/blob/049994a0/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestLoader.java ---------------------------------------------------------------------- diff --git a/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestLoader.java b/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestLoader.java index 79cf1f1..552a751 100644 --- a/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestLoader.java +++ b/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestLoader.java @@ -27,9 +27,9 @@ import org.apache.hadoop.io.Text; import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.io.compress.CompressionCodecFactory; import org.apache.sqoop.common.PrefixContext; -import org.apache.sqoop.connector.hdfs.configuration.ConnectionConfiguration; -import org.apache.sqoop.connector.hdfs.configuration.OutputCompression; -import org.apache.sqoop.connector.hdfs.configuration.OutputFormat; +import org.apache.sqoop.connector.hdfs.configuration.LinkConfiguration; +import org.apache.sqoop.connector.hdfs.configuration.ToCompression; +import org.apache.sqoop.connector.hdfs.configuration.ToFormat; import org.apache.sqoop.connector.hdfs.configuration.ToJobConfiguration; import org.apache.sqoop.etl.io.DataReader; import org.apache.sqoop.job.etl.Loader; @@ -48,21 +48,21 @@ import java.util.ArrayList; import java.util.Collection; import java.util.List; -import static org.apache.sqoop.connector.hdfs.configuration.OutputFormat.SEQUENCE_FILE; -import static org.apache.sqoop.connector.hdfs.configuration.OutputFormat.TEXT_FILE; +import static org.apache.sqoop.connector.hdfs.configuration.ToFormat.SEQUENCE_FILE; +import static org.apache.sqoop.connector.hdfs.configuration.ToFormat.TEXT_FILE; @RunWith(Parameterized.class) public class TestLoader extends TestHdfsBase { private static final String INPUT_ROOT = System.getProperty("maven.build.directory", "/tmp") + "/sqoop/warehouse/"; private static final int NUMBER_OF_ROWS_PER_FILE = 1000; - private OutputFormat outputFormat; - private OutputCompression compression; + private ToFormat outputFormat; + private ToCompression compression; private final String outputDirectory; private Loader loader; - public TestLoader(OutputFormat outputFormat, - OutputCompression compression) + public TestLoader(ToFormat outputFormat, + ToCompression compression) throws Exception { this.outputDirectory = INPUT_ROOT + getClass().getSimpleName(); this.outputFormat = outputFormat; @@ -73,10 +73,10 @@ public class TestLoader extends TestHdfsBase { @Parameterized.Parameters public static Collection<Object[]> data() { List<Object[]> parameters = new ArrayList<Object[]>(); - for (OutputCompression compression : new OutputCompression[]{ - OutputCompression.DEFAULT, - OutputCompression.BZIP2, - OutputCompression.NONE + for (ToCompression compression : new ToCompression[]{ + ToCompression.DEFAULT, + ToCompression.BZIP2, + ToCompression.NONE }) { for (Object outputFileType : new Object[]{TEXT_FILE, SEQUENCE_FILE}) { parameters.add(new Object[]{outputFileType, compression}); @@ -121,11 +121,11 @@ public class TestLoader extends TestHdfsBase { return null; } }, null); - ConnectionConfiguration connConf = new ConnectionConfiguration(); + LinkConfiguration connConf = new LinkConfiguration(); ToJobConfiguration jobConf = new ToJobConfiguration(); - jobConf.output.outputDirectory = outputDirectory; - jobConf.output.compression = compression; - jobConf.output.outputFormat = outputFormat; + jobConf.toJobConfig.outputDirectory = outputDirectory; + jobConf.toJobConfig.compression = compression; + jobConf.toJobConfig.outputFormat = outputFormat; Path outputPath = new Path(outputDirectory); loader.load(context, connConf, jobConf); http://git-wip-us.apache.org/repos/asf/sqoop/blob/049994a0/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestPartitioner.java ---------------------------------------------------------------------- diff --git a/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestPartitioner.java b/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestPartitioner.java index ae93b0a..9d177ec 100644 --- a/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestPartitioner.java +++ b/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestPartitioner.java @@ -22,9 +22,9 @@ import org.apache.hadoop.io.compress.BZip2Codec; import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.io.compress.DefaultCodec; import org.apache.sqoop.common.PrefixContext; -import org.apache.sqoop.connector.hdfs.configuration.ConnectionConfiguration; +import org.apache.sqoop.connector.hdfs.configuration.LinkConfiguration; import org.apache.sqoop.connector.hdfs.configuration.FromJobConfiguration; -import org.apache.sqoop.connector.hdfs.configuration.OutputFormat; +import org.apache.sqoop.connector.hdfs.configuration.ToFormat; import org.apache.sqoop.job.etl.Partition; import org.apache.sqoop.job.etl.Partitioner; import org.apache.sqoop.job.etl.PartitionerContext; @@ -39,7 +39,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.List; -import static org.apache.sqoop.connector.hdfs.configuration.OutputFormat.*; +import static org.apache.sqoop.connector.hdfs.configuration.ToFormat.*; import static org.junit.Assert.assertEquals; @RunWith(Parameterized.class) @@ -48,13 +48,13 @@ public class TestPartitioner extends TestHdfsBase { private static final int NUMBER_OF_FILES = 5; private static final int NUMBER_OF_ROWS_PER_FILE = 1000; - private OutputFormat outputFileType; + private ToFormat outputFileType; private Class<? extends CompressionCodec> compressionClass; private Partitioner partitioner; private final String inputDirectory; - public TestPartitioner(OutputFormat outputFileType, Class<? extends CompressionCodec> compressionClass) { + public TestPartitioner(ToFormat outputFileType, Class<? extends CompressionCodec> compressionClass) { this.inputDirectory = INPUT_ROOT + getClass().getSimpleName(); this.outputFileType = outputFileType; this.compressionClass = compressionClass; @@ -97,10 +97,10 @@ public class TestPartitioner extends TestHdfsBase { Configuration conf = new Configuration(); PrefixContext prefixContext = new PrefixContext(conf, "org.apache.sqoop.job.connector.from.context."); PartitionerContext context = new PartitionerContext(prefixContext, 5, null); - ConnectionConfiguration connConf = new ConnectionConfiguration(); + LinkConfiguration connConf = new LinkConfiguration(); FromJobConfiguration jobConf = new FromJobConfiguration(); - jobConf.input.inputDirectory = inputDirectory; + jobConf.fromJobConfig.inputDirectory = inputDirectory; List<Partition> partitions = partitioner.getPartitions(context, connConf, jobConf); http://git-wip-us.apache.org/repos/asf/sqoop/blob/049994a0/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/IntermediateDataFormat.java ---------------------------------------------------------------------- diff --git a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/IntermediateDataFormat.java b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/IntermediateDataFormat.java index 66d46a3..74b9518 100644 --- a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/IntermediateDataFormat.java +++ b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/IntermediateDataFormat.java @@ -26,7 +26,7 @@ import java.io.IOException; /** * Abstract class representing a pluggable intermediate data format the Sqoop - * framework will use to move data to/from the connector. All intermediate + * driver will use to move data to/from the connector. All intermediate * data formats are expected to have an internal/native implementation, * but also should minimally be able to return a text (CSV) version of the * data. The data format should also be able to return the data as an object http://git-wip-us.apache.org/repos/asf/sqoop/blob/049994a0/core/src/main/java/org/apache/sqoop/connector/ConnectorHandler.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/sqoop/connector/ConnectorHandler.java b/core/src/main/java/org/apache/sqoop/connector/ConnectorHandler.java index 6c10b54..dbfdc03 100644 --- a/core/src/main/java/org/apache/sqoop/connector/ConnectorHandler.java +++ b/core/src/main/java/org/apache/sqoop/connector/ConnectorHandler.java @@ -106,7 +106,7 @@ public final class ConnectorHandler { } MConnectionForms connectionForms = new MConnectionForms( - FormUtils.toForms(connector.getConnectionConfigurationClass())); + FormUtils.toForms(connector.getLinkConfigurationClass())); String connectorVersion = connector.getVersion(); http://git-wip-us.apache.org/repos/asf/sqoop/blob/049994a0/core/src/main/java/org/apache/sqoop/core/ConfigurationConstants.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/sqoop/core/ConfigurationConstants.java b/core/src/main/java/org/apache/sqoop/core/ConfigurationConstants.java index 898ec73..f341108 100644 --- a/core/src/main/java/org/apache/sqoop/core/ConfigurationConstants.java +++ b/core/src/main/java/org/apache/sqoop/core/ConfigurationConstants.java @@ -73,8 +73,8 @@ public final class ConfigurationConstants { public static final String CONNECTOR_AUTO_UPGRADE = "org.apache.sqoop.connector.autoupgrade"; - public static final String FRAMEWORK_AUTO_UPGRADE = - "org.apache.sqoop.framework.autoupgrade"; + public static final String DRIVER_AUTO_UPGRADE = + "org.apache.sqoop.driver.autoupgrade"; /** * Enable Sqoop App to kill Tomcat in case that it will fail to load. http://git-wip-us.apache.org/repos/asf/sqoop/blob/049994a0/core/src/main/java/org/apache/sqoop/core/SqoopServer.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/sqoop/core/SqoopServer.java b/core/src/main/java/org/apache/sqoop/core/SqoopServer.java index d4c3b39..ac836c7 100644 --- a/core/src/main/java/org/apache/sqoop/core/SqoopServer.java +++ b/core/src/main/java/org/apache/sqoop/core/SqoopServer.java @@ -20,8 +20,8 @@ package org.apache.sqoop.core; import org.apache.log4j.Logger; import org.apache.sqoop.audit.AuditLoggerManager; import org.apache.sqoop.connector.ConnectorManager; -import org.apache.sqoop.framework.FrameworkManager; -import org.apache.sqoop.framework.JobManager; +import org.apache.sqoop.driver.Driver; +import org.apache.sqoop.driver.JobManager; import org.apache.sqoop.repository.RepositoryManager; /** @@ -34,7 +34,7 @@ public class SqoopServer { public static void destroy() { LOG.info("Shutting down Sqoop server"); JobManager.getInstance().destroy(); - FrameworkManager.getInstance().destroy(); + Driver.getInstance().destroy(); ConnectorManager.getInstance().destroy(); RepositoryManager.getInstance().destroy(); AuditLoggerManager.getInstance().destroy(); @@ -49,7 +49,7 @@ public class SqoopServer { AuditLoggerManager.getInstance().initialize(); RepositoryManager.getInstance().initialize(); ConnectorManager.getInstance().initialize(); - FrameworkManager.getInstance().initialize(); + Driver.getInstance().initialize(); JobManager.getInstance().initialize(); LOG.info("Sqoop server has successfully boot up"); } catch (Exception ex) { http://git-wip-us.apache.org/repos/asf/sqoop/blob/049994a0/core/src/main/java/org/apache/sqoop/driver/Driver.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/sqoop/driver/Driver.java b/core/src/main/java/org/apache/sqoop/driver/Driver.java new file mode 100644 index 0000000..5297bde --- /dev/null +++ b/core/src/main/java/org/apache/sqoop/driver/Driver.java @@ -0,0 +1,179 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.driver; + +import java.util.Locale; +import java.util.ResourceBundle; + +import org.apache.log4j.Logger; +import org.apache.sqoop.connector.spi.RepositoryUpgrader; +import org.apache.sqoop.core.ConfigurationConstants; +import org.apache.sqoop.core.Reconfigurable; +import org.apache.sqoop.core.SqoopConfiguration; +import org.apache.sqoop.core.SqoopConfiguration.CoreConfigurationListener; +import org.apache.sqoop.driver.configuration.JobConfiguration; +import org.apache.sqoop.driver.configuration.LinkConfiguration; +import org.apache.sqoop.model.FormUtils; +import org.apache.sqoop.model.MConnectionForms; +import org.apache.sqoop.model.MDriverConfig; +import org.apache.sqoop.model.MJobForms; +import org.apache.sqoop.repository.RepositoryManager; +import org.apache.sqoop.validation.Validator; + +/** + * Sqoop driver that manages the job lifecyle + * + * All Sqoop internals are handled in this class: + * * Submission engine + * * Execution engine + * * Driver config + * + * Current implementation of entire submission engine is using repository + * for keeping track of running submissions. Thus, server might be restarted at + * any time without any affect on running jobs. This approach however might not + * be the fastest way and we might want to introduce internal structures for + * running jobs in case that this approach will be too slow. + */ +public class Driver implements Reconfigurable { + + /** + * Logger object. + */ + private static final Logger LOG = Logger.getLogger(Driver.class); + + /** + * Private instance to singleton of this class. + */ + private static Driver instance; + + /** + * Create default object by default. + * + * Every Sqoop server application needs one so this should not be performance issue. + */ + static { + instance = new Driver(); + } + + /** + * Return current instance. + * + * @return Current instance + */ + public static Driver getInstance() { + return instance; + } + + /** + * Allows to set instance in case that it's need. + * + * This method should not be normally used as the default instance should be sufficient. One target + * user use case for this method are unit tests. + * + * @param newInstance New instance + */ + public static void setInstance(Driver newInstance) { + instance = newInstance; + } + + /** + * Driver config structure + */ + private MDriverConfig mDriverConfig; + + /** + * Validator instance + */ + private final Validator validator; + + /** + * Driver config upgrader instance + */ + private final RepositoryUpgrader driverConfigUpgrader; + + /** + * Default driver config auto upgrade option value + */ + private static final boolean DEFAULT_AUTO_UPGRADE = false; + + public static final String CURRENT_DRIVER_VERSION = "1"; + + public Class getJobConfigurationClass() { + return JobConfiguration.class; + } + + public Class getLinkConfigurationClass() { + return LinkConfiguration.class; + } + + public Driver() { + MConnectionForms connectionForms = new MConnectionForms( + FormUtils.toForms(getLinkConfigurationClass()) + ); + mDriverConfig = new MDriverConfig(connectionForms, new MJobForms(FormUtils.toForms(getJobConfigurationClass())), + CURRENT_DRIVER_VERSION); + + // Build validator + validator = new DriverValidator(); + // Build upgrader + driverConfigUpgrader = new DriverConfigUpgrader(); + } + + public synchronized void initialize() { + initialize(SqoopConfiguration.getInstance().getContext().getBoolean(ConfigurationConstants.DRIVER_AUTO_UPGRADE, DEFAULT_AUTO_UPGRADE)); + } + + public synchronized void initialize(boolean autoUpgrade) { + LOG.trace("Begin Driver Config initialization"); + + // Register driver config in repository + mDriverConfig = RepositoryManager.getInstance().getRepository().registerDriverConfig(mDriverConfig, autoUpgrade); + + SqoopConfiguration.getInstance().getProvider().registerListener(new CoreConfigurationListener(this)); + + LOG.info("Driver Config initialized: OK"); + } + + public synchronized void destroy() { + LOG.trace("Begin Driver Config destroy"); + } + + public Validator getValidator() { + return validator; + } + + public RepositoryUpgrader getDriverConfigRepositoryUpgrader() { + return driverConfigUpgrader; + } + + public MDriverConfig getDriverConfig() { + return mDriverConfig; + } + + public ResourceBundle getBundle(Locale locale) { + return ResourceBundle.getBundle(DriverConstants.DRIVER_CONFIG_BUNDLE, locale); + } + + @Override + public void configurationChanged() { + LOG.info("Begin Driver reconfiguring"); + // If there are configuration options for Driver, + // implement the reconfiguration procedure right here. + LOG.info("Driver reconfigured"); + } +} http://git-wip-us.apache.org/repos/asf/sqoop/blob/049994a0/core/src/main/java/org/apache/sqoop/driver/DriverConfigUpgrader.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/sqoop/driver/DriverConfigUpgrader.java b/core/src/main/java/org/apache/sqoop/driver/DriverConfigUpgrader.java new file mode 100644 index 0000000..8d6eb78 --- /dev/null +++ b/core/src/main/java/org/apache/sqoop/driver/DriverConfigUpgrader.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.sqoop.driver; + +import org.apache.log4j.Logger; +import org.apache.sqoop.common.SqoopException; +import org.apache.sqoop.connector.spi.RepositoryUpgrader; +import org.apache.sqoop.model.MConnectionForms; +import org.apache.sqoop.model.MForm; +import org.apache.sqoop.model.MInput; +import org.apache.sqoop.model.MJobForms; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class DriverConfigUpgrader extends RepositoryUpgrader{ + + private static final Logger LOG = Logger.getLogger(DriverConfigUpgrader.class); + + @Override + public void upgrade(MConnectionForms original, + MConnectionForms upgradeTarget) { + doUpgrade(original.getForms(), upgradeTarget.getForms()); + } + + @Override + public void upgrade(MJobForms original, MJobForms upgradeTarget) { + doUpgrade(original.getForms(), upgradeTarget.getForms()); + + } + + @SuppressWarnings("unchecked") + private void doUpgrade(List<MForm> original, List<MForm> target) { + // Easier to find the form in the original forms list if we use a map. + // Since the constructor of MJobForms takes a list, + // index is not guaranteed to be the same, so we need to look for + // equivalence + Map<String, MForm> formMap = new HashMap<String, MForm>(); + for (MForm form : original) { + formMap.put(form.getName(), form); + } + for (MForm form : target) { + List<MInput<?>> inputs = form.getInputs(); + MForm originalForm = formMap.get(form.getName()); + if(originalForm == null) { + LOG.warn("Form: " + form.getName() + " not present in old " + + "driver config. So it will not be transferred by the upgrader."); + continue; + } + + for (MInput input : inputs) { + try { + MInput originalInput = originalForm.getInput(input.getName()); + input.setValue(originalInput.getValue()); + } catch (SqoopException ex) { + LOG.warn("Input: " + input.getName() + " not present in old " + + "driver config. So it will not be transferred by the upgrader."); + } + } + } + } +} http://git-wip-us.apache.org/repos/asf/sqoop/blob/049994a0/core/src/main/java/org/apache/sqoop/driver/DriverConstants.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/sqoop/driver/DriverConstants.java b/core/src/main/java/org/apache/sqoop/driver/DriverConstants.java new file mode 100644 index 0000000..795944a --- /dev/null +++ b/core/src/main/java/org/apache/sqoop/driver/DriverConstants.java @@ -0,0 +1,68 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.driver; + +import org.apache.sqoop.core.ConfigurationConstants; + +/** + * Constants that are used in driver module. + */ +public final class DriverConstants { + + // Sqoop driver constants + + public static final String PREFIX_SUBMISSION_CONFIG = + ConfigurationConstants.PREFIX_GLOBAL_CONFIG + "submission."; + + public static final String PREFIX_EXECUTION_CONFIG = + ConfigurationConstants.PREFIX_GLOBAL_CONFIG + "execution."; + + public static final String SYSCFG_SUBMISSION_ENGINE = + PREFIX_SUBMISSION_CONFIG + "engine"; + + public static final String PREFIX_SUBMISSION_ENGINE_CONFIG = + SYSCFG_SUBMISSION_ENGINE + "."; + + public static final String PREFIX_SUBMISSION_PURGE_CONFIG = + PREFIX_SUBMISSION_CONFIG + "purge."; + + public static final String SYSCFG_SUBMISSION_PURGE_THRESHOLD = + PREFIX_SUBMISSION_PURGE_CONFIG + "threshold"; + + public static final String SYSCFG_SUBMISSION_PURGE_SLEEP = + PREFIX_SUBMISSION_PURGE_CONFIG + "sleep"; + + public static final String PREFIX_SUBMISSION_UPDATE_CONFIG = + PREFIX_SUBMISSION_CONFIG + "update."; + + public static final String SYSCFG_SUBMISSION_UPDATE_SLEEP = + PREFIX_SUBMISSION_UPDATE_CONFIG + "sleep"; + + public static final String SYSCFG_EXECUTION_ENGINE = + PREFIX_EXECUTION_CONFIG + "engine"; + + public static final String PREFIX_EXECUTION_ENGINE_CONFIG = + SYSCFG_EXECUTION_ENGINE + "."; + + // Bundle name + public static final String DRIVER_CONFIG_BUNDLE = "driver-config"; + + private DriverConstants() { + // Instantiation of this class is prohibited + } +} http://git-wip-us.apache.org/repos/asf/sqoop/blob/049994a0/core/src/main/java/org/apache/sqoop/driver/DriverError.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/sqoop/driver/DriverError.java b/core/src/main/java/org/apache/sqoop/driver/DriverError.java new file mode 100644 index 0000000..56ef9bb --- /dev/null +++ b/core/src/main/java/org/apache/sqoop/driver/DriverError.java @@ -0,0 +1,66 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.driver; + +import org.apache.sqoop.common.ErrorCode; + +/** + * + */ +public enum DriverError implements ErrorCode { + + DRIVER_0000("Metadata are not registered in repository"), + + DRIVER_0001("Invalid submission engine"), + + DRIVER_0002("Given job is already running"), + + DRIVER_0003("Given job is not running"), + + DRIVER_0004("Unknown job id"), + + DRIVER_0005("Unsupported job type"), + + DRIVER_0006("Can't bootstrap job"), + + DRIVER_0007("Invalid execution engine"), + + DRIVER_0008("Invalid combination of submission and execution engines"), + + DRIVER_0009("Job has been disabled. Cannot submit this job."), + + DRIVER_0010("Connection for this job has been disabled. Cannot submit this job."), + + DRIVER_0011("Connector does not support direction. Cannot submit this job."), + + ; + + private final String message; + + private DriverError(String message) { + this.message = message; + } + + public String getCode() { + return name(); + } + + public String getMessage() { + return message; + } +} http://git-wip-us.apache.org/repos/asf/sqoop/blob/049994a0/core/src/main/java/org/apache/sqoop/driver/DriverValidator.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/sqoop/driver/DriverValidator.java b/core/src/main/java/org/apache/sqoop/driver/DriverValidator.java new file mode 100644 index 0000000..9cc51dd --- /dev/null +++ b/core/src/main/java/org/apache/sqoop/driver/DriverValidator.java @@ -0,0 +1,54 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.driver; + +import org.apache.sqoop.driver.configuration.LinkConfiguration; +import org.apache.sqoop.driver.configuration.JobConfiguration; +import org.apache.sqoop.driver.configuration.ThrottlingForm; +import org.apache.sqoop.validation.Status; +import org.apache.sqoop.validation.Validation; +import org.apache.sqoop.validation.Validator; + +public class DriverValidator extends Validator { + @Override + public Validation validateLink(Object linkConfiguration) { + Validation validation = new Validation(LinkConfiguration.class); + // No validation on link object + return validation; + } + + @Override + public Validation validateJob(Object jobConfiguration) { + Validation validation = new Validation(JobConfiguration.class); + JobConfiguration conf = (JobConfiguration)jobConfiguration; + validateThrottlingForm(validation,conf.throttling); + + return validation; + }; + + private void validateThrottlingForm(Validation validation, ThrottlingForm throttling) { + if(throttling.extractors != null && throttling.extractors < 1) { + validation.addMessage(Status.UNACCEPTABLE, "throttling", "extractors", "You need to specify more than one extractor"); + } + + if(throttling.loaders != null && throttling.loaders < 1) { + validation.addMessage(Status.UNACCEPTABLE, "throttling", "loaders", "You need to specify more than one loader"); + } + } + +} http://git-wip-us.apache.org/repos/asf/sqoop/blob/049994a0/core/src/main/java/org/apache/sqoop/driver/ExecutionEngine.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/sqoop/driver/ExecutionEngine.java b/core/src/main/java/org/apache/sqoop/driver/ExecutionEngine.java new file mode 100644 index 0000000..ea38ed4 --- /dev/null +++ b/core/src/main/java/org/apache/sqoop/driver/ExecutionEngine.java @@ -0,0 +1,60 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.driver; + +import org.apache.sqoop.common.ImmutableContext; + +/** + * Execution engine drives execution of sqoop job. It's responsible + * for executing all defined steps in the import/export workflow. + * A successful job execution will be recorded in the job submission entity + */ +public abstract class ExecutionEngine { + + /** + * Initialize execution engine + * + * @param context Configuration context + * @parma prefix Execution engine prefix + */ + public void initialize(ImmutableContext context, String prefix) { + } + + /** + * Destroy execution engine when stopping server + */ + public void destroy() { + } + + /** + * Return new JobRequest class or any subclass if it's needed by + * execution and submission engine combination. + * + * @return new JobRequestobject + */ + public JobRequest createJobRequest() { + return new JobRequest(); + } + + /** + * Prepare given job request. + * + * @param request JobRequest + */ + public abstract void prepareJob(JobRequest request); +}