SQOOP-1375: Sqoop2: From/To: Create HDFS connector
Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/5c29a2a2 Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/5c29a2a2 Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/5c29a2a2 Branch: refs/heads/SQOOP-1367 Commit: 5c29a2a291b54d1c15f7e2482893bc074a011a76 Parents: 7127948 Author: Gwen Shapira <csh...@gmail.com> Authored: Wed Aug 20 13:23:41 2014 -0700 Committer: Abraham Elmahrek <abra...@elmahrek.com> Committed: Wed Aug 20 13:23:41 2014 -0700 ---------------------------------------------------------------------- .../org/apache/sqoop/client/SqoopClient.java | 2 +- common/pom.xml | 66 +++ .../org/apache/sqoop/common/PrefixContext.java | 70 +++ connector/connector-hdfs/pom.xml | 83 +++ .../sqoop/connector/hdfs/HdfsConnector.java | 132 +++++ .../connector/hdfs/HdfsConnectorError.java | 52 ++ .../sqoop/connector/hdfs/HdfsConstants.java | 31 ++ .../sqoop/connector/hdfs/HdfsDestroyer.java | 36 ++ .../sqoop/connector/hdfs/HdfsExtractor.java | 199 +++++++ .../sqoop/connector/hdfs/HdfsInitializer.java | 45 ++ .../apache/sqoop/connector/hdfs/HdfsLoader.java | 140 +++++ .../sqoop/connector/hdfs/HdfsPartition.java | 161 ++++++ .../sqoop/connector/hdfs/HdfsPartitioner.java | 555 +++++++++++++++++++ .../sqoop/connector/hdfs/HdfsValidator.java | 89 +++ .../configuration/ConnectionConfiguration.java | 31 ++ .../hdfs/configuration/ConnectionForm.java | 29 + .../configuration/FromJobConfiguration.java | 32 ++ .../connector/hdfs/configuration/InputForm.java | 30 + .../hdfs/configuration/OutputCompression.java | 33 ++ .../hdfs/configuration/OutputForm.java | 36 ++ .../hdfs/configuration/OutputFormat.java | 33 ++ .../hdfs/configuration/StorageType.java | 28 + .../hdfs/configuration/ToJobConfiguration.java | 31 ++ .../hdfs/hdfsWriter/GenericHdfsWriter.java | 34 ++ .../hdfs/hdfsWriter/HdfsSequenceWriter.java | 57 ++ .../hdfs/hdfsWriter/HdfsTextWriter.java | 61 ++ .../hdfs-connector-resources.properties | 58 ++ .../main/resources/sqoopconnector.properties | 18 + .../idf/CSVIntermediateDataFormat.java | 7 +- .../idf/IntermediateDataFormatError.java | 4 +- connector/pom.xml | 9 +- .../sqoop/framework/FrameworkValidator.java | 63 +-- .../org/apache/sqoop/framework/JobManager.java | 29 +- .../sqoop/framework/SubmissionRequest.java | 14 +- .../configuration/ExportJobConfiguration.java | 37 -- .../configuration/ImportJobConfiguration.java | 37 -- .../framework/configuration/InputForm.java | 30 - .../configuration/JobConfiguration.java | 4 +- .../configuration/OutputCompression.java | 33 -- .../framework/configuration/OutputForm.java | 38 -- .../framework/configuration/OutputFormat.java | 33 -- .../framework/configuration/StorageType.java | 28 - .../sqoop/framework/TestFrameworkValidator.java | 10 +- .../sqoop/repository/TestJdbcRepository.java | 2 +- .../mapreduce/MapreduceExecutionEngine.java | 42 +- .../org/apache/sqoop/job/PrefixContext.java | 70 --- .../sqoop/job/etl/HdfsExportExtractor.java | 194 ------- .../sqoop/job/etl/HdfsExportPartition.java | 160 ------ .../sqoop/job/etl/HdfsExportPartitioner.java | 552 ------------------ .../sqoop/job/etl/HdfsSequenceImportLoader.java | 94 ---- .../sqoop/job/etl/HdfsTextImportLoader.java | 101 ---- .../sqoop/job/mr/SqoopDestroyerExecutor.java | 2 +- .../apache/sqoop/job/mr/SqoopInputFormat.java | 2 +- .../org/apache/sqoop/job/mr/SqoopMapper.java | 11 +- .../job/mr/SqoopOutputFormatLoadExecutor.java | 10 +- .../mapreduce/MapreduceExecutionEngineTest.java | 13 +- .../org/apache/sqoop/job/TestHdfsExtract.java | 31 +- .../java/org/apache/sqoop/job/TestHdfsLoad.java | 33 +- pom.xml | 11 + .../derby/DerbyRepositoryHandler.java | 50 +- server/pom.xml | 5 + .../apache/sqoop/handler/JobRequestHandler.java | 1 + test/pom.xml | 5 + .../sqoop/test/testcases/ConnectorTestCase.java | 4 +- .../connector/jdbc/generic/TableImportTest.java | 8 - .../jdbc/generic/imports/PartitionerTest.java | 8 - .../SubmissionWithDisabledModelObjectsTest.java | 11 - 67 files changed, 2309 insertions(+), 1659 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/sqoop/blob/5c29a2a2/client/src/main/java/org/apache/sqoop/client/SqoopClient.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/sqoop/client/SqoopClient.java b/client/src/main/java/org/apache/sqoop/client/SqoopClient.java index 1d93ae3..2b3171c 100644 --- a/client/src/main/java/org/apache/sqoop/client/SqoopClient.java +++ b/client/src/main/java/org/apache/sqoop/client/SqoopClient.java @@ -369,7 +369,7 @@ public class SqoopClient { fromConnection.getPersistenceId(), toConnection.getPersistenceId(), getConnector(fromConnection.getConnectorId()).getJobForms(Direction.FROM), - getConnector(fromConnection.getConnectorId()).getJobForms(Direction.TO), + getConnector(toConnection.getConnectorId()).getJobForms(Direction.TO), getFramework().getJobForms() ); } http://git-wip-us.apache.org/repos/asf/sqoop/blob/5c29a2a2/common/pom.xml ---------------------------------------------------------------------- diff --git a/common/pom.xml b/common/pom.xml index 9bfa07d..151a649 100644 --- a/common/pom.xml +++ b/common/pom.xml @@ -106,5 +106,71 @@ limitations under the License. </plugin> </plugins> </build> + <!-- Profiles for various supported Hadoop distributions --> + <profiles> + + <!-- Hadoop 1.x --> + <profile> + <id>hadoop100</id> + + <activation> + <property> + <name>hadoop.profile</name> + <value>100</value> + </property> + </activation> + + <dependencies> + <dependency> + <groupId>com.google.guava</groupId> + <artifactId>guava</artifactId> + </dependency> + + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-core</artifactId> + <scope>provided</scope> + </dependency> + </dependencies> + </profile> + + <!-- Hadoop 2.x (active by default) --> + <profile> + <id>hadoop200</id> + + <activation> + <activeByDefault>true</activeByDefault> + <property> + <name>hadoop.profile</name> + <value>200</value> + </property> + </activation> + + <properties> + <hadoop.profile>200</hadoop.profile> + </properties> + + <dependencies> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-common</artifactId> + <scope>provided</scope> + </dependency> + + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-mapreduce-client-core</artifactId> + <scope>provided</scope> + </dependency> + + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-mapreduce-client-jobclient</artifactId> + <scope>provided</scope> + </dependency> + + </dependencies> + </profile> + </profiles> </project> http://git-wip-us.apache.org/repos/asf/sqoop/blob/5c29a2a2/common/src/main/java/org/apache/sqoop/common/PrefixContext.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/sqoop/common/PrefixContext.java b/common/src/main/java/org/apache/sqoop/common/PrefixContext.java new file mode 100644 index 0000000..6434e6d --- /dev/null +++ b/common/src/main/java/org/apache/sqoop/common/PrefixContext.java @@ -0,0 +1,70 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.common; + +import org.apache.hadoop.conf.Configuration; +import org.apache.sqoop.common.ImmutableContext; + +/** + * Implementation of immutable context that is based on Hadoop configuration + * object. Each context property is prefixed with special prefix and loaded + * directly. + */ +public class PrefixContext implements ImmutableContext { + + Configuration configuration; + String prefix; + + public PrefixContext(Configuration configuration, String prefix) { + this.configuration = configuration; + this.prefix = prefix; + } + + @Override + public String getString(String key) { + return configuration.get(prefix + key); + } + + @Override + public String getString(String key, String defaultValue) { + return configuration.get(prefix + key, defaultValue); + } + + @Override + public long getLong(String key, long defaultValue) { + return configuration.getLong(prefix + key, defaultValue); + } + + @Override + public int getInt(String key, int defaultValue) { + return configuration.getInt(prefix + key, defaultValue); + } + + @Override + public boolean getBoolean(String key, boolean defaultValue) { + return configuration.getBoolean(prefix + key, defaultValue); + } + + /* + * TODO: Use getter methods for retrieval instead of + * exposing configuration directly. + */ + public Configuration getConfiguration() { + return configuration; + } +} http://git-wip-us.apache.org/repos/asf/sqoop/blob/5c29a2a2/connector/connector-hdfs/pom.xml ---------------------------------------------------------------------- diff --git a/connector/connector-hdfs/pom.xml b/connector/connector-hdfs/pom.xml new file mode 100644 index 0000000..8df9f11 --- /dev/null +++ b/connector/connector-hdfs/pom.xml @@ -0,0 +1,83 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to You under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.sqoop</groupId> + <artifactId>connector</artifactId> + <version>2.0.0-SNAPSHOT</version> + </parent> + + <groupId>org.apache.sqoop.connector</groupId> + <artifactId>sqoop-connector-hdfs</artifactId> + <name>Sqoop HDFS Connector</name> + + <!-- TODO: Hardcoding Hadoop200 for now --> + + <dependencies> + <dependency> + <groupId>org.apache.sqoop</groupId> + <artifactId>sqoop-spi</artifactId> + </dependency> + + <dependency> + <groupId>org.apache.sqoop</groupId> + <artifactId>connector-sdk</artifactId> + </dependency> + + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-common</artifactId> + <scope>provided</scope> + </dependency> + + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-mapreduce-client-core</artifactId> + <scope>provided</scope> + </dependency> + + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-mapreduce-client-jobclient</artifactId> + <scope>provided</scope> + </dependency> + + </dependencies> + <build> + <finalName>sqoop</finalName> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-jar-plugin</artifactId> + <executions> + <execution> + <goals> + <goal>test-jar</goal> + </goals> + </execution> + </executions> + </plugin> + </plugins> + </build> + +</project> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/sqoop/blob/5c29a2a2/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsConnector.java ---------------------------------------------------------------------- diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsConnector.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsConnector.java new file mode 100644 index 0000000..557091e --- /dev/null +++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsConnector.java @@ -0,0 +1,132 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.sqoop.connector.hdfs; + +import org.apache.sqoop.common.Direction; +import org.apache.sqoop.common.VersionInfo; +import org.apache.sqoop.connector.hdfs.configuration.ConnectionConfiguration; +import org.apache.sqoop.connector.hdfs.configuration.FromJobConfiguration; +import org.apache.sqoop.connector.hdfs.configuration.ToJobConfiguration; +import org.apache.sqoop.connector.spi.MetadataUpgrader; +import org.apache.sqoop.connector.spi.SqoopConnector; +import org.apache.sqoop.job.etl.From; +import org.apache.sqoop.job.etl.To; +import org.apache.sqoop.validation.Validator; + +import java.util.Locale; +import java.util.ResourceBundle; + +public class HdfsConnector extends SqoopConnector { + + private static final From FROM = new From( + HdfsInitializer.class, + HdfsPartitioner.class, + HdfsExtractor.class, + HdfsDestroyer.class); + + private static final To TO = new To( + HdfsInitializer.class, + HdfsLoader.class, + HdfsDestroyer.class); + + private static final HdfsValidator hdfsValidator = new HdfsValidator(); + + /** + * Retrieve connector version. + * + * @return Version encoded as a string + */ + @Override + public String getVersion() { + return VersionInfo.getVersion(); + } + + /** + * @param locale + * @return the resource bundle associated with the given locale. + */ + @Override + public ResourceBundle getBundle(Locale locale) { + return ResourceBundle.getBundle( + HdfsConstants.RESOURCE_BUNDLE_NAME, locale); + } + + /** + * @return Get connection configuration class + */ + @Override + public Class getConnectionConfigurationClass() { + return ConnectionConfiguration.class; + } + + /** + * @param jobType + * @return Get job configuration class for given type or null if not supported + */ + @Override + public Class getJobConfigurationClass(Direction jobType) { + switch (jobType) { + case FROM: + return FromJobConfiguration.class; + case TO: + return ToJobConfiguration.class; + default: + return null; + } + } + + /** + * @return an <tt>From</tt> that provides classes for performing import. + */ + @Override + public From getFrom() { + return FROM; + } + + /** + * @return an <tt>To</tt> that provides classes for performing export. + */ + @Override + public To getTo() { + return TO; + } + + /** + * Returns validation object that Sqoop framework can use to validate user + * supplied forms before accepting them. This object will be used both for + * connection and job forms. + * + * @return Validator object + */ + @Override + public Validator getValidator() { + return hdfsValidator; + } + + /** + * Returns an {@linkplain org.apache.sqoop.connector.spi.MetadataUpgrader} object that can upgrade the + * connection and job metadata. + * + * @return MetadataUpgrader object + */ + @Override + public MetadataUpgrader getMetadataUpgrader() { + return null; + } +} http://git-wip-us.apache.org/repos/asf/sqoop/blob/5c29a2a2/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsConnectorError.java ---------------------------------------------------------------------- diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsConnectorError.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsConnectorError.java new file mode 100644 index 0000000..8a095d2 --- /dev/null +++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsConnectorError.java @@ -0,0 +1,52 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.connector.hdfs; + +import org.apache.sqoop.common.ErrorCode; + +public enum HdfsConnectorError implements ErrorCode{ + /** Error occurs during partitioner run */ + GENERIC_HDFS_CONNECTOR_0000("Error occurs during partitioner run"), + /** Error occurs during extractor run */ + GENERIC_HDFS_CONNECTOR_0001("Error occurs during extractor run"), + /** Unsupported output format type found **/ + GENERIC_HDFS_CONNECTOR_0002("Unknown output format type"), + /** The system was unable to load the specified class. */ + GENERIC_HDFS_CONNECTOR_0003("Unable to load the specified class"), + /** The system was unable to instantiate the specified class. */ + GENERIC_HDFS_CONNECTOR_0004("Unable to instantiate the specified class"), + /** Error occurs during loader run */ + GENERIC_HDFS_CONNECTOR_0005("Error occurs during loader run") + + ; + + private final String message; + + private HdfsConnectorError(String message) { + this.message = message; + } + + public String getCode() { + return name(); + } + + public String getMessage() { + return message; + } + +} http://git-wip-us.apache.org/repos/asf/sqoop/blob/5c29a2a2/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsConstants.java ---------------------------------------------------------------------- diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsConstants.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsConstants.java new file mode 100644 index 0000000..a27aff1 --- /dev/null +++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsConstants.java @@ -0,0 +1,31 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.sqoop.connector.hdfs; + +import org.apache.sqoop.job.Constants; + +public final class HdfsConstants extends Constants { + + // Resource bundle name + public static final String RESOURCE_BUNDLE_NAME = + "hdfs-connector-resources"; + + public static final char DEFAULT_RECORD_DELIMITER = '\n'; + +} http://git-wip-us.apache.org/repos/asf/sqoop/blob/5c29a2a2/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsDestroyer.java ---------------------------------------------------------------------- diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsDestroyer.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsDestroyer.java new file mode 100644 index 0000000..74b1cb8 --- /dev/null +++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsDestroyer.java @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.connector.hdfs; + +import org.apache.sqoop.job.etl.Destroyer; +import org.apache.sqoop.job.etl.DestroyerContext; + +public class HdfsDestroyer extends Destroyer { + /** + * Callback to clean up after job execution. + * + * @param context Destroyer context + * @param o Connection configuration object + * @param o2 Job configuration object + */ + @Override + public void destroy(DestroyerContext context, Object o, Object o2) { + //TODO: Add a "success" flag? + + } +} http://git-wip-us.apache.org/repos/asf/sqoop/blob/5c29a2a2/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsExtractor.java ---------------------------------------------------------------------- diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsExtractor.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsExtractor.java new file mode 100644 index 0000000..fc12381 --- /dev/null +++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsExtractor.java @@ -0,0 +1,199 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.connector.hdfs; + +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.Seekable; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.compress.CompressionCodec; +import org.apache.hadoop.io.compress.CompressionCodecFactory; +import org.apache.hadoop.util.LineReader; +import org.apache.sqoop.common.SqoopException; +import org.apache.sqoop.connector.hdfs.configuration.ConnectionConfiguration; +import org.apache.sqoop.connector.hdfs.configuration.FromJobConfiguration; +import org.apache.sqoop.etl.io.DataWriter; +import org.apache.sqoop.job.etl.Extractor; +import org.apache.sqoop.job.etl.ExtractorContext; +import org.apache.log4j.Logger; +import org.apache.hadoop.conf.Configuration; +import org.apache.sqoop.common.PrefixContext; + +import java.io.IOException; + +/** + * Extract from HDFS. + * Default field delimiter of a record is comma. + */ + + +public class HdfsExtractor extends Extractor<ConnectionConfiguration, FromJobConfiguration, HdfsPartition> { + + public static final Logger LOG = Logger.getLogger(HdfsExtractor.class); + + private Configuration conf; + private DataWriter dataWriter; + private long rowRead = 0; + + @Override + public void extract(ExtractorContext context, + ConnectionConfiguration connectionConfiguration, + FromJobConfiguration jobConfiguration, HdfsPartition partition) { + + conf = ((PrefixContext) context.getContext()).getConfiguration(); + dataWriter = context.getDataWriter(); + + try { + HdfsPartition p = partition; + LOG.info("Working on partition: " + p); + int numFiles = p.getNumberOfFiles(); + for (int i = 0; i < numFiles; i++) { + extractFile(p.getFile(i), p.getOffset(i), p.getLength(i)); + } + } catch (IOException e) { + throw new SqoopException(HdfsConnectorError.GENERIC_HDFS_CONNECTOR_0001, e); + } + } + + private void extractFile(Path file, long start, long length) + throws IOException { + long end = start + length; + LOG.info("Extracting file " + file); + LOG.info("\t from offset " + start); + LOG.info("\t to offset " + end); + LOG.info("\t of length " + length); + if(isSequenceFile(file)) { + extractSequenceFile(file, start, length); + } else { + extractTextFile(file, start, length); + } + } + + /** + * Extracts Sequence file + * @param file + * @param start + * @param length + * @throws IOException + */ + private void extractSequenceFile(Path file, long start, long length) + throws IOException { + LOG.info("Extracting sequence file"); + long end = start + length; + SequenceFile.Reader filereader = new SequenceFile.Reader( + file.getFileSystem(conf), file, conf); + + if (start > filereader.getPosition()) { + filereader.sync(start); // sync to start + } + + Text line = new Text(); + boolean hasNext = filereader.next(line); + while (hasNext) { + rowRead++; + dataWriter.writeStringRecord(line.toString()); + line = new Text(); + hasNext = filereader.next(line); + if (filereader.getPosition() >= end && filereader.syncSeen()) { + break; + } + } + filereader.close(); + } + + /** + * Extracts Text file + * @param file + * @param start + * @param length + * @throws IOException + */ + private void extractTextFile(Path file, long start, long length) + throws IOException { + LOG.info("Extracting text file"); + long end = start + length; + FileSystem fs = file.getFileSystem(conf); + FSDataInputStream filestream = fs.open(file); + CompressionCodec codec = (new CompressionCodecFactory(conf)).getCodec(file); + LineReader filereader; + Seekable fileseeker = filestream; + + // Hadoop 1.0 does not have support for custom record delimiter and thus + // we + // are supporting only default one. + // We might add another "else if" case for SplittableCompressionCodec once + // we drop support for Hadoop 1.0. + if (codec == null) { + filestream.seek(start); + filereader = new LineReader(filestream); + } else { + filereader = new LineReader(codec.createInputStream(filestream, + codec.createDecompressor()), conf); + fileseeker = filestream; + } + if (start != 0) { + // always throw away first record because + // one extra line is read in previous split + start += filereader.readLine(new Text(), 0); + } + int size; + LOG.info("Start position: " + String.valueOf(start)); + long next = start; + while (next <= end) { + Text line = new Text(); + size = filereader.readLine(line, Integer.MAX_VALUE); + if (size == 0) { + break; + } + if (codec == null) { + next += size; + } else { + next = fileseeker.getPos(); + } + rowRead++; + dataWriter.writeStringRecord(line.toString()); + } + LOG.info("Extracting ended on position: " + fileseeker.getPos()); + filestream.close(); + } + + @Override + public long getRowsRead() { + return rowRead; + } + + /** + * Returns true if given file is sequence + * @param file + * @return boolean + */ + private boolean isSequenceFile(Path file) { + SequenceFile.Reader filereader = null; + try { + filereader = new SequenceFile.Reader(file.getFileSystem(conf), file, conf); + filereader.close(); + } catch (IOException e) { + return false; + } + return true; + } + + +} http://git-wip-us.apache.org/repos/asf/sqoop/blob/5c29a2a2/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsInitializer.java ---------------------------------------------------------------------- diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsInitializer.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsInitializer.java new file mode 100644 index 0000000..d2d12a8 --- /dev/null +++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsInitializer.java @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.connector.hdfs; + +import org.apache.sqoop.job.etl.Initializer; +import org.apache.sqoop.job.etl.InitializerContext; +import org.apache.sqoop.schema.Schema; + + +public class HdfsInitializer extends Initializer { + /** + * Initialize new submission based on given configuration properties. Any + * needed temporary values might be saved to context object and they will be + * promoted to all other part of the workflow automatically. + * + * @param context Initializer context object + * @param connection Connector's connection configuration object + * @param job Connector's job configuration object + */ + @Override + public void initialize(InitializerContext context, Object connection, Object job) { + + } + + + @Override + public Schema getSchema(InitializerContext context, Object connection, Object job) { + return null; + } +} http://git-wip-us.apache.org/repos/asf/sqoop/blob/5c29a2a2/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsLoader.java ---------------------------------------------------------------------- diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsLoader.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsLoader.java new file mode 100644 index 0000000..5a924f9 --- /dev/null +++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsLoader.java @@ -0,0 +1,140 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.connector.hdfs; + +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.compress.CompressionCodec; +import org.apache.sqoop.common.SqoopException; +import org.apache.sqoop.connector.hdfs.configuration.ConnectionConfiguration; +import org.apache.sqoop.connector.hdfs.configuration.OutputFormat; +import org.apache.sqoop.connector.hdfs.configuration.ToJobConfiguration; +import org.apache.sqoop.connector.hdfs.hdfsWriter.GenericHdfsWriter; +import org.apache.sqoop.connector.hdfs.hdfsWriter.HdfsSequenceWriter; +import org.apache.sqoop.connector.hdfs.hdfsWriter.HdfsTextWriter; +import org.apache.sqoop.etl.io.DataReader; +import org.apache.sqoop.job.etl.Loader; +import org.apache.sqoop.job.etl.LoaderContext; +import org.apache.sqoop.utils.ClassUtils; + +import java.io.IOException; +import java.util.UUID; + +public class HdfsLoader extends Loader<ConnectionConfiguration, ToJobConfiguration> { + /** + * Load data to target. + * + * @param context Loader context object + * @param connection Connection configuration + * @param job Job configuration + * @throws Exception + */ + @Override + public void load(LoaderContext context, ConnectionConfiguration connection, ToJobConfiguration job) throws Exception { + + DataReader reader = context.getDataReader(); + + Configuration conf = new Configuration(); + + String directoryName = job.output.outputDirectory; + String codecname = getCompressionCodecName(job); + + CompressionCodec codec = null; + if (codecname != null) { + Class<?> clz = ClassUtils.loadClass(codecname); + if (clz == null) { + throw new SqoopException(HdfsConnectorError.GENERIC_HDFS_CONNECTOR_0003, codecname); + } + + try { + codec = (CompressionCodec) clz.newInstance(); + if (codec instanceof Configurable) { + ((Configurable) codec).setConf(conf); + } + } catch (Exception e) { + throw new SqoopException(HdfsConnectorError.GENERIC_HDFS_CONNECTOR_0004, codecname, e); + } + } + + String filename = directoryName + "/" + UUID.randomUUID() + getExtension(job,codec); + + try { + Path filepath = new Path(filename); + + GenericHdfsWriter filewriter = getWriter(job); + + filewriter.initialize(filepath,conf,codec); + + String csv; + + while ((csv = reader.readTextRecord()) != null) { + filewriter.write(csv); + } + filewriter.destroy(); + + } catch (IOException e) { + throw new SqoopException(HdfsConnectorError.GENERIC_HDFS_CONNECTOR_0005, e); + } + + } + + private GenericHdfsWriter getWriter(ToJobConfiguration job) { + if (job.output.outputFormat == OutputFormat.SEQUENCE_FILE) + return new HdfsSequenceWriter(); + else + return new HdfsTextWriter(); + } + + + private String getCompressionCodecName(ToJobConfiguration jobConf) { + if(jobConf.output.compression == null) + return null; + switch(jobConf.output.compression) { + case NONE: + return null; + case DEFAULT: + return "org.apache.hadoop.io.compress.DefaultCodec"; + case DEFLATE: + return "org.apache.hadoop.io.compress.DeflateCodec"; + case GZIP: + return "org.apache.hadoop.io.compress.GzipCodec"; + case BZIP2: + return "org.apache.hadoop.io.compress.BZip2Codec"; + case LZO: + return "com.hadoop.compression.lzo.LzoCodec"; + case LZ4: + return "org.apache.hadoop.io.compress.Lz4Codec"; + case SNAPPY: + return "org.apache.hadoop.io.compress.SnappyCodec"; + case CUSTOM: + return jobConf.output.customCompression.trim(); + } + return null; + } + + //TODO: We should probably support configurable extensions at some point + private static String getExtension(ToJobConfiguration job, CompressionCodec codec) { + if (job.output.outputFormat == OutputFormat.SEQUENCE_FILE) + return ".seq"; + if (codec == null) + return ".txt"; + return codec.getDefaultExtension(); + } + +} http://git-wip-us.apache.org/repos/asf/sqoop/blob/5c29a2a2/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsPartition.java ---------------------------------------------------------------------- diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsPartition.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsPartition.java new file mode 100644 index 0000000..b801356 --- /dev/null +++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsPartition.java @@ -0,0 +1,161 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.sqoop.connector.hdfs; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +import org.apache.hadoop.fs.Path; +import org.apache.sqoop.job.etl.Partition; + +/** + * This class derives mostly from CombineFileSplit of Hadoop, i.e. + * org.apache.hadoop.mapreduce.lib.input.CombineFileSplit. + */ +public class HdfsPartition extends Partition { + + private long lenFiles; + private int numFiles; + private Path[] files; + private long[] offsets; + private long[] lengths; + private String[] locations; + + public HdfsPartition() {} + + public HdfsPartition(Path[] files, long[] offsets, + long[] lengths, String[] locations) { + for(long length : lengths) { + this.lenFiles += length; + } + this.numFiles = files.length; + this.files = files; + this.offsets = offsets; + this.lengths = lengths; + this.locations = locations; + } + + public long getLengthOfFiles() { + return lenFiles; + } + + public int getNumberOfFiles() { + return numFiles; + } + + public Path getFile(int i) { + return files[i]; + } + + public long getOffset(int i) { + return offsets[i]; + } + + public long getLength(int i) { + return lengths[i]; + } + + public String[] getLocations() { + return locations; + } + + @Override + public void readFields(DataInput in) throws IOException { + numFiles = in.readInt(); + + files = new Path[numFiles]; + for(int i=0; i<numFiles; i++) { + files[i] = new Path(in.readUTF()); + } + + offsets = new long[numFiles]; + for(int i=0; i<numFiles; i++) { + offsets[i] = in.readLong(); + } + + lengths = new long[numFiles]; + for(int i=0; i<numFiles; i++) { + lengths[i] = in.readLong(); + } + + for(long length : lengths) { + lenFiles += length; + } + + int numLocations = in.readInt(); + if (numLocations == 0) { + locations = null; + } else { + locations = new String[numLocations]; + for(int i=0; i<numLocations; i++) { + locations[i] = in.readUTF(); + } + } + } + + @Override + public void write(DataOutput out) throws IOException { + out.writeInt(numFiles); + + for(Path file : files) { + out.writeUTF(file.toString()); + } + + for(long offset : offsets) { + out.writeLong(offset); + } + + for(long length : lengths) { + out.writeLong(length); + } + + if (locations == null || locations.length == 0) { + out.writeInt(0); + } else { + out.writeInt(locations.length); + for(String location : locations) { + out.writeUTF(location); + } + } + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append("{"); + boolean first = true; + for(int i = 0; i < files.length; i++) { + if(first) { + first = false; + } else { + sb.append(", "); + } + + sb.append(files[i]); + sb.append(" (offset=").append(offsets[i]); + sb.append(", end=").append(offsets[i] + lengths[i]); + sb.append(", length=").append(lengths[i]); + sb.append(")"); + } + sb.append("}"); + return sb.toString(); + } + +} http://git-wip-us.apache.org/repos/asf/sqoop/blob/5c29a2a2/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsPartitioner.java ---------------------------------------------------------------------- diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsPartitioner.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsPartitioner.java new file mode 100644 index 0000000..df764d2 --- /dev/null +++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsPartitioner.java @@ -0,0 +1,555 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.sqoop.connector.hdfs; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.HashMap; +import java.util.Set; +import java.util.Iterator; +import java.util.Map; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.BlockLocation; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.io.compress.CompressionCodec; +import org.apache.hadoop.io.compress.CompressionCodecFactory; +import org.apache.hadoop.net.NodeBase; +import org.apache.hadoop.net.NetworkTopology; +import org.apache.sqoop.common.SqoopException; +import org.apache.sqoop.connector.hdfs.configuration.ConnectionConfiguration; +import org.apache.sqoop.connector.hdfs.configuration.FromJobConfiguration; +import org.apache.sqoop.job.etl.Partition; +import org.apache.sqoop.job.etl.Partitioner; +import org.apache.sqoop.job.etl.PartitionerContext; +import org.apache.sqoop.common.PrefixContext; + +/** + * This class derives mostly from CombineFileInputFormat of Hadoop, i.e. + * org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat. + */ +public class HdfsPartitioner extends Partitioner<ConnectionConfiguration, FromJobConfiguration> { + + public static final String SPLIT_MINSIZE_PERNODE = + "mapreduce.input.fileinputformat.split.minsize.per.node"; + public static final String SPLIT_MINSIZE_PERRACK = + "mapreduce.input.fileinputformat.split.minsize.per.rack"; + + // ability to limit the size of a single split + private long maxSplitSize = 0; + private long minSplitSizeNode = 0; + private long minSplitSizeRack = 0; + + // mapping from a rack name to the set of Nodes in the rack + private HashMap<String, Set<String>> rackToNodes = + new HashMap<String, Set<String>>(); + + @Override + public List<Partition> getPartitions(PartitionerContext context, + ConnectionConfiguration connectionConfiguration, FromJobConfiguration jobConfiguration) { + + Configuration conf = ((PrefixContext)context.getContext()).getConfiguration(); + + try { + long numInputBytes = getInputSize(conf, jobConfiguration.input.inputDirectory); + maxSplitSize = numInputBytes / context.getMaxPartitions(); + + if(numInputBytes % context.getMaxPartitions() != 0 ) { + maxSplitSize += 1; + } + + long minSizeNode = 0; + long minSizeRack = 0; + long maxSize = 0; + + // the values specified by setxxxSplitSize() takes precedence over the + // values that might have been specified in the config + if (minSplitSizeNode != 0) { + minSizeNode = minSplitSizeNode; + } else { + minSizeNode = conf.getLong(SPLIT_MINSIZE_PERNODE, 0); + } + if (minSplitSizeRack != 0) { + minSizeRack = minSplitSizeRack; + } else { + minSizeRack = conf.getLong(SPLIT_MINSIZE_PERRACK, 0); + } + if (maxSplitSize != 0) { + maxSize = maxSplitSize; + } else { + maxSize = conf.getLong("mapreduce.input.fileinputformat.split.maxsize", 0); + } + if (minSizeNode != 0 && maxSize != 0 && minSizeNode > maxSize) { + throw new IOException("Minimum split size pernode " + minSizeNode + + " cannot be larger than maximum split size " + + maxSize); + } + if (minSizeRack != 0 && maxSize != 0 && minSizeRack > maxSize) { + throw new IOException("Minimum split size per rack" + minSizeRack + + " cannot be larger than maximum split size " + + maxSize); + } + if (minSizeRack != 0 && minSizeNode > minSizeRack) { + throw new IOException("Minimum split size per node" + minSizeNode + + " cannot be smaller than minimum split " + + "size per rack " + minSizeRack); + } + + // all the files in input set + String indir = jobConfiguration.input.inputDirectory; + FileSystem fs = FileSystem.get(conf); + + List<Path> paths = new LinkedList<Path>(); + for(FileStatus status : fs.listStatus(new Path(indir))) { + if(!status.isDir()) { + paths.add(status.getPath()); + } + } + + List<Partition> partitions = new ArrayList<Partition>(); + if (paths.size() == 0) { + return partitions; + } + + // create splits for all files that are not in any pool. + getMoreSplits(conf, paths, + maxSize, minSizeNode, minSizeRack, partitions); + + // free up rackToNodes map + rackToNodes.clear(); + + return partitions; + + } catch (IOException e) { + throw new SqoopException(HdfsConnectorError.GENERIC_HDFS_CONNECTOR_0000, e); + } + } + + //TODO: Perhaps get the FS from connection configuration so we can support remote HDFS + private long getInputSize(Configuration conf, String indir) throws IOException { + FileSystem fs = FileSystem.get(conf); + FileStatus[] files = fs.listStatus(new Path(indir)); + long count = 0; + for (FileStatus file : files) { + count += file.getLen(); + } + return count; + } + + /** + * Return all the splits in the specified set of paths + */ + private void getMoreSplits(Configuration conf, List<Path> paths, + long maxSize, long minSizeNode, long minSizeRack, + List<Partition> partitions) throws IOException { + + // all blocks for all the files in input set + OneFileInfo[] files; + + // mapping from a rack name to the list of blocks it has + HashMap<String, List<OneBlockInfo>> rackToBlocks = + new HashMap<String, List<OneBlockInfo>>(); + + // mapping from a block to the nodes on which it has replicas + HashMap<OneBlockInfo, String[]> blockToNodes = + new HashMap<OneBlockInfo, String[]>(); + + // mapping from a node to the list of blocks that it contains + HashMap<String, List<OneBlockInfo>> nodeToBlocks = + new HashMap<String, List<OneBlockInfo>>(); + + files = new OneFileInfo[paths.size()]; + if (paths.size() == 0) { + return; + } + + // populate all the blocks for all files + for (int i = 0; i < paths.size(); i++) { + files[i] = new OneFileInfo(paths.get(i), conf, isSplitable(conf, paths.get(i)), + rackToBlocks, blockToNodes, nodeToBlocks, + rackToNodes, maxSize); + } + + ArrayList<OneBlockInfo> validBlocks = new ArrayList<OneBlockInfo>(); + Set<String> nodes = new HashSet<String>(); + long curSplitSize = 0; + + // process all nodes and create splits that are local + // to a node. + for (Iterator<Map.Entry<String, + List<OneBlockInfo>>> iter = nodeToBlocks.entrySet().iterator(); + iter.hasNext();) { + + Map.Entry<String, List<OneBlockInfo>> one = iter.next(); + nodes.add(one.getKey()); + List<OneBlockInfo> blocksInNode = one.getValue(); + + // for each block, copy it into validBlocks. Delete it from + // blockToNodes so that the same block does not appear in + // two different splits. + for (OneBlockInfo oneblock : blocksInNode) { + if (blockToNodes.containsKey(oneblock)) { + validBlocks.add(oneblock); + blockToNodes.remove(oneblock); + curSplitSize += oneblock.length; + + // if the accumulated split size exceeds the maximum, then + // create this split. + if (maxSize != 0 && curSplitSize >= maxSize) { + // create an input split and add it to the splits array + addCreatedSplit(partitions, nodes, validBlocks); + curSplitSize = 0; + validBlocks.clear(); + } + } + } + // if there were any blocks left over and their combined size is + // larger than minSplitNode, then combine them into one split. + // Otherwise add them back to the unprocessed pool. It is likely + // that they will be combined with other blocks from the + // same rack later on. + if (minSizeNode != 0 && curSplitSize >= minSizeNode) { + // create an input split and add it to the splits array + addCreatedSplit(partitions, nodes, validBlocks); + } else { + for (OneBlockInfo oneblock : validBlocks) { + blockToNodes.put(oneblock, oneblock.hosts); + } + } + validBlocks.clear(); + nodes.clear(); + curSplitSize = 0; + } + + // if blocks in a rack are below the specified minimum size, then keep them + // in 'overflow'. After the processing of all racks is complete, these + // overflow blocks will be combined into splits. + ArrayList<OneBlockInfo> overflowBlocks = new ArrayList<OneBlockInfo>(); + Set<String> racks = new HashSet<String>(); + + // Process all racks over and over again until there is no more work to do. + while (blockToNodes.size() > 0) { + + // Create one split for this rack before moving over to the next rack. + // Come back to this rack after creating a single split for each of the + // remaining racks. + // Process one rack location at a time, Combine all possible blocks that + // reside on this rack as one split. (constrained by minimum and maximum + // split size). + + // iterate over all racks + for (Iterator<Map.Entry<String, List<OneBlockInfo>>> iter = + rackToBlocks.entrySet().iterator(); iter.hasNext();) { + + Map.Entry<String, List<OneBlockInfo>> one = iter.next(); + racks.add(one.getKey()); + List<OneBlockInfo> blocks = one.getValue(); + + // for each block, copy it into validBlocks. Delete it from + // blockToNodes so that the same block does not appear in + // two different splits. + boolean createdSplit = false; + for (OneBlockInfo oneblock : blocks) { + if (blockToNodes.containsKey(oneblock)) { + validBlocks.add(oneblock); + blockToNodes.remove(oneblock); + curSplitSize += oneblock.length; + + // if the accumulated split size exceeds the maximum, then + // create this split. + if (maxSize != 0 && curSplitSize >= maxSize) { + // create an input split and add it to the splits array + addCreatedSplit(partitions, getHosts(racks), validBlocks); + createdSplit = true; + break; + } + } + } + + // if we created a split, then just go to the next rack + if (createdSplit) { + curSplitSize = 0; + validBlocks.clear(); + racks.clear(); + continue; + } + + if (!validBlocks.isEmpty()) { + if (minSizeRack != 0 && curSplitSize >= minSizeRack) { + // if there is a minimum size specified, then create a single split + // otherwise, store these blocks into overflow data structure + addCreatedSplit(partitions, getHosts(racks), validBlocks); + } else { + // There were a few blocks in this rack that + // remained to be processed. Keep them in 'overflow' block list. + // These will be combined later. + overflowBlocks.addAll(validBlocks); + } + } + curSplitSize = 0; + validBlocks.clear(); + racks.clear(); + } + } + + assert blockToNodes.isEmpty(); + assert curSplitSize == 0; + assert validBlocks.isEmpty(); + assert racks.isEmpty(); + + // Process all overflow blocks + for (OneBlockInfo oneblock : overflowBlocks) { + validBlocks.add(oneblock); + curSplitSize += oneblock.length; + + // This might cause an exiting rack location to be re-added, + // but it should be ok. + for (int i = 0; i < oneblock.racks.length; i++) { + racks.add(oneblock.racks[i]); + } + + // if the accumulated split size exceeds the maximum, then + // create this split. + if (maxSize != 0 && curSplitSize >= maxSize) { + // create an input split and add it to the splits array + addCreatedSplit(partitions, getHosts(racks), validBlocks); + curSplitSize = 0; + validBlocks.clear(); + racks.clear(); + } + } + + // Process any remaining blocks, if any. + if (!validBlocks.isEmpty()) { + addCreatedSplit(partitions, getHosts(racks), validBlocks); + } + } + + private boolean isSplitable(Configuration conf, Path file) { + final CompressionCodec codec = + new CompressionCodecFactory(conf).getCodec(file); + + // This method might be improved for SplittableCompression codec when we + // drop support for Hadoop 1.0 + return null == codec; + + } + + /** + * Create a single split from the list of blocks specified in validBlocks + * Add this new split into list. + */ + private void addCreatedSplit(List<Partition> partitions, + Collection<String> locations, + ArrayList<OneBlockInfo> validBlocks) { + // create an input split + Path[] files = new Path[validBlocks.size()]; + long[] offsets = new long[validBlocks.size()]; + long[] lengths = new long[validBlocks.size()]; + for (int i = 0; i < validBlocks.size(); i++) { + files[i] = validBlocks.get(i).onepath; + offsets[i] = validBlocks.get(i).offset; + lengths[i] = validBlocks.get(i).length; + } + + // add this split to the list that is returned + HdfsPartition partition = new HdfsPartition( + files, offsets, lengths, locations.toArray(new String[0])); + partitions.add(partition); + } + + private Set<String> getHosts(Set<String> racks) { + Set<String> hosts = new HashSet<String>(); + for (String rack : racks) { + if (rackToNodes.containsKey(rack)) { + hosts.addAll(rackToNodes.get(rack)); + } + } + return hosts; + } + + private static void addHostToRack(HashMap<String, Set<String>> rackToNodes, + String rack, String host) { + Set<String> hosts = rackToNodes.get(rack); + if (hosts == null) { + hosts = new HashSet<String>(); + rackToNodes.put(rack, hosts); + } + hosts.add(host); + } + + /** + * information about one file from the File System + */ + private static class OneFileInfo { + private long fileSize; // size of the file + private OneBlockInfo[] blocks; // all blocks in this file + + OneFileInfo(Path path, Configuration conf, + boolean isSplitable, + HashMap<String, List<OneBlockInfo>> rackToBlocks, + HashMap<OneBlockInfo, String[]> blockToNodes, + HashMap<String, List<OneBlockInfo>> nodeToBlocks, + HashMap<String, Set<String>> rackToNodes, + long maxSize) + throws IOException { + this.fileSize = 0; + + // get block locations from file system + FileSystem fs = path.getFileSystem(conf); + FileStatus stat = fs.getFileStatus(path); + BlockLocation[] locations = fs.getFileBlockLocations(stat, 0, + stat.getLen()); + // create a list of all block and their locations + if (locations == null) { + blocks = new OneBlockInfo[0]; + } else { + if (!isSplitable) { + // if the file is not splitable, just create the one block with + // full file length + blocks = new OneBlockInfo[1]; + fileSize = stat.getLen(); + blocks[0] = new OneBlockInfo(path, 0, fileSize, locations[0] + .getHosts(), locations[0].getTopologyPaths()); + } else { + ArrayList<OneBlockInfo> blocksList = new ArrayList<OneBlockInfo>( + locations.length); + for (int i = 0; i < locations.length; i++) { + fileSize += locations[i].getLength(); + + // each split can be a maximum of maxSize + long left = locations[i].getLength(); + long myOffset = locations[i].getOffset(); + long myLength = 0; + do { + if (maxSize == 0) { + myLength = left; + } else { + if (left > maxSize && left < 2 * maxSize) { + // if remainder is between max and 2*max - then + // instead of creating splits of size max, left-max we + // create splits of size left/2 and left/2. This is + // a heuristic to avoid creating really really small + // splits. + myLength = left / 2; + } else { + myLength = Math.min(maxSize, left); + } + } + OneBlockInfo oneblock = new OneBlockInfo(path, myOffset, + myLength, locations[i].getHosts(), locations[i] + .getTopologyPaths()); + left -= myLength; + myOffset += myLength; + + blocksList.add(oneblock); + } while (left > 0); + } + blocks = blocksList.toArray(new OneBlockInfo[blocksList.size()]); + } + + for (OneBlockInfo oneblock : blocks) { + // add this block to the block --> node locations map + blockToNodes.put(oneblock, oneblock.hosts); + + // For blocks that do not have host/rack information, + // assign to default rack. + String[] racks = null; + if (oneblock.hosts.length == 0) { + racks = new String[]{NetworkTopology.DEFAULT_RACK}; + } else { + racks = oneblock.racks; + } + + // add this block to the rack --> block map + for (int j = 0; j < racks.length; j++) { + String rack = racks[j]; + List<OneBlockInfo> blklist = rackToBlocks.get(rack); + if (blklist == null) { + blklist = new ArrayList<OneBlockInfo>(); + rackToBlocks.put(rack, blklist); + } + blklist.add(oneblock); + if (!racks[j].equals(NetworkTopology.DEFAULT_RACK)) { + // Add this host to rackToNodes map + addHostToRack(rackToNodes, racks[j], oneblock.hosts[j]); + } + } + + // add this block to the node --> block map + for (int j = 0; j < oneblock.hosts.length; j++) { + String node = oneblock.hosts[j]; + List<OneBlockInfo> blklist = nodeToBlocks.get(node); + if (blklist == null) { + blklist = new ArrayList<OneBlockInfo>(); + nodeToBlocks.put(node, blklist); + } + blklist.add(oneblock); + } + } + } + } + + } + + /** + * information about one block from the File System + */ + private static class OneBlockInfo { + Path onepath; // name of this file + long offset; // offset in file + long length; // length of this block + String[] hosts; // nodes on which this block resides + String[] racks; // network topology of hosts + + OneBlockInfo(Path path, long offset, long len, + String[] hosts, String[] topologyPaths) { + this.onepath = path; + this.offset = offset; + this.hosts = hosts; + this.length = len; + assert (hosts.length == topologyPaths.length || + topologyPaths.length == 0); + + // if the file system does not have any rack information, then + // use dummy rack location. + if (topologyPaths.length == 0) { + topologyPaths = new String[hosts.length]; + for (int i = 0; i < topologyPaths.length; i++) { + topologyPaths[i] = (new NodeBase(hosts[i], + NetworkTopology.DEFAULT_RACK)).toString(); + } + } + + // The topology paths have the host name included as the last + // component. Strip it. + this.racks = new String[topologyPaths.length]; + for (int i = 0; i < topologyPaths.length; i++) { + this.racks[i] = (new NodeBase(topologyPaths[i])).getNetworkLocation(); + } + } + } + +} http://git-wip-us.apache.org/repos/asf/sqoop/blob/5c29a2a2/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsValidator.java ---------------------------------------------------------------------- diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsValidator.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsValidator.java new file mode 100644 index 0000000..4efbd33 --- /dev/null +++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsValidator.java @@ -0,0 +1,89 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.connector.hdfs; + +import org.apache.sqoop.connector.hdfs.configuration.*; +import org.apache.sqoop.validation.Status; +import org.apache.sqoop.validation.Validation; +import org.apache.sqoop.validation.Validator; + +/** + * Validate framework configuration objects + */ +public class HdfsValidator extends Validator { + + @Override + public Validation validateConnection(Object connectionConfiguration) { + Validation validation = new Validation(ConnectionConfiguration.class); + // No validation on connection object + return validation; + } + + + @Override + public Validation validateJob(Object jobConfiguration) { + //TODO: I'm pretty sure this needs to call either validateExportJob or validateImportJob, depending on context + return super.validateJob(jobConfiguration); + } + + private Validation validateExportJob(Object jobConfiguration) { + Validation validation = new Validation(FromJobConfiguration.class); + FromJobConfiguration configuration = (FromJobConfiguration)jobConfiguration; + + validateInputForm(validation, configuration.input); + + + return validation; + } + + private Validation validateImportJob(Object jobConfiguration) { + Validation validation = new Validation(ToJobConfiguration.class); + ToJobConfiguration configuration = (ToJobConfiguration)jobConfiguration; + + validateOutputForm(validation, configuration.output); + + return validation; + } + + private void validateInputForm(Validation validation, InputForm input) { + if(input.inputDirectory == null || input.inputDirectory.isEmpty()) { + validation.addMessage(Status.UNACCEPTABLE, "input", "inputDirectory", "Input directory is empty"); + } + } + + private void validateOutputForm(Validation validation, OutputForm output) { + if(output.outputDirectory == null || output.outputDirectory.isEmpty()) { + validation.addMessage(Status.UNACCEPTABLE, "output", "outputDirectory", "Output directory is empty"); + } + if(output.customCompression != null && + output.customCompression.trim().length() > 0 && + output.compression != OutputCompression.CUSTOM) { + validation.addMessage(Status.UNACCEPTABLE, "output", "compression", + "custom compression should be blank as " + output.compression + " is being used."); + } + if(output.compression == OutputCompression.CUSTOM && + (output.customCompression == null || + output.customCompression.trim().length() == 0) + ) { + validation.addMessage(Status.UNACCEPTABLE, "output", "compression", + "custom compression is blank."); + } + } + + +} http://git-wip-us.apache.org/repos/asf/sqoop/blob/5c29a2a2/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/ConnectionConfiguration.java ---------------------------------------------------------------------- diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/ConnectionConfiguration.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/ConnectionConfiguration.java new file mode 100644 index 0000000..6dd79d5 --- /dev/null +++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/ConnectionConfiguration.java @@ -0,0 +1,31 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.connector.hdfs.configuration; + +import org.apache.sqoop.model.ConfigurationClass; +import org.apache.sqoop.model.Form; + +@ConfigurationClass +public class ConnectionConfiguration { + @Form + public ConnectionForm connection; + + public ConnectionConfiguration() { + connection = new ConnectionForm(); + } +} http://git-wip-us.apache.org/repos/asf/sqoop/blob/5c29a2a2/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/ConnectionForm.java ---------------------------------------------------------------------- diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/ConnectionForm.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/ConnectionForm.java new file mode 100644 index 0000000..7dad2a2 --- /dev/null +++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/ConnectionForm.java @@ -0,0 +1,29 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.connector.hdfs.configuration; + +import org.apache.sqoop.model.FormClass; +import org.apache.sqoop.model.Input; + +@FormClass +public class ConnectionForm { + //Todo: Didn't find anything that belongs here... + // Since empty forms don't work (DERBYREPO_0008:The form contains no input metadata), I'm putting a dummy form here + + @Input(size = 255) public String dummy; +} http://git-wip-us.apache.org/repos/asf/sqoop/blob/5c29a2a2/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/FromJobConfiguration.java ---------------------------------------------------------------------- diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/FromJobConfiguration.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/FromJobConfiguration.java new file mode 100644 index 0000000..bccb99d --- /dev/null +++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/FromJobConfiguration.java @@ -0,0 +1,32 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.connector.hdfs.configuration; + +import org.apache.sqoop.model.ConfigurationClass; +import org.apache.sqoop.model.Form; + +@ConfigurationClass +public class FromJobConfiguration { + @Form public InputForm input; + + + public FromJobConfiguration() { + input = new InputForm(); + + } +} http://git-wip-us.apache.org/repos/asf/sqoop/blob/5c29a2a2/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/InputForm.java ---------------------------------------------------------------------- diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/InputForm.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/InputForm.java new file mode 100644 index 0000000..413f04c --- /dev/null +++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/InputForm.java @@ -0,0 +1,30 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.connector.hdfs.configuration; + +import org.apache.sqoop.model.FormClass; +import org.apache.sqoop.model.Input; + +/** + * + */ +@FormClass +public class InputForm { + + @Input(size = 255) public String inputDirectory; +} http://git-wip-us.apache.org/repos/asf/sqoop/blob/5c29a2a2/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/OutputCompression.java ---------------------------------------------------------------------- diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/OutputCompression.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/OutputCompression.java new file mode 100644 index 0000000..55db1bc --- /dev/null +++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/OutputCompression.java @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.connector.hdfs.configuration; + +/** + * Supported compressions + */ +public enum OutputCompression { + NONE, + DEFAULT, + DEFLATE, + GZIP, + BZIP2, + LZO, + LZ4, + SNAPPY, + CUSTOM, +} http://git-wip-us.apache.org/repos/asf/sqoop/blob/5c29a2a2/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/OutputForm.java ---------------------------------------------------------------------- diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/OutputForm.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/OutputForm.java new file mode 100644 index 0000000..d57b4c2 --- /dev/null +++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/OutputForm.java @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.connector.hdfs.configuration; + +import org.apache.sqoop.model.FormClass; +import org.apache.sqoop.model.Input; + +/** + * + */ +@FormClass +public class OutputForm { + + @Input public OutputFormat outputFormat; + + @Input public OutputCompression compression; + + @Input(size = 255) public String customCompression; + + @Input(size = 255) public String outputDirectory; +} http://git-wip-us.apache.org/repos/asf/sqoop/blob/5c29a2a2/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/OutputFormat.java ---------------------------------------------------------------------- diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/OutputFormat.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/OutputFormat.java new file mode 100644 index 0000000..676c33c --- /dev/null +++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/OutputFormat.java @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.connector.hdfs.configuration; + +/** + * Various supported formats on disk + */ +public enum OutputFormat { + /** + * Comma separated text file + */ + TEXT_FILE, + + /** + * Sequence file + */ + SEQUENCE_FILE, +} http://git-wip-us.apache.org/repos/asf/sqoop/blob/5c29a2a2/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/StorageType.java ---------------------------------------------------------------------- diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/StorageType.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/StorageType.java new file mode 100644 index 0000000..d4aaa0a --- /dev/null +++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/StorageType.java @@ -0,0 +1,28 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.connector.hdfs.configuration; + +/** + * Various storage types that Sqoop is supporting + */ +public enum StorageType { + /** + * Direct HDFS import + */ + HDFS, +} http://git-wip-us.apache.org/repos/asf/sqoop/blob/5c29a2a2/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/ToJobConfiguration.java ---------------------------------------------------------------------- diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/ToJobConfiguration.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/ToJobConfiguration.java new file mode 100644 index 0000000..65ee8a7 --- /dev/null +++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/ToJobConfiguration.java @@ -0,0 +1,31 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.connector.hdfs.configuration; + +import org.apache.sqoop.model.ConfigurationClass; +import org.apache.sqoop.model.Form; + +@ConfigurationClass +public class ToJobConfiguration { + @Form + public OutputForm output; + + public ToJobConfiguration() { + output = new OutputForm(); + } +} http://git-wip-us.apache.org/repos/asf/sqoop/blob/5c29a2a2/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/hdfsWriter/GenericHdfsWriter.java ---------------------------------------------------------------------- diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/hdfsWriter/GenericHdfsWriter.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/hdfsWriter/GenericHdfsWriter.java new file mode 100644 index 0000000..2ccccc4 --- /dev/null +++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/hdfsWriter/GenericHdfsWriter.java @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.connector.hdfs.hdfsWriter; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.compress.CompressionCodec; + +import java.io.IOException; + +public abstract class GenericHdfsWriter { + + public abstract void initialize(Path filepath, Configuration conf, CompressionCodec codec) throws IOException; + + public abstract void write(String csv) throws IOException; + + public abstract void destroy() throws IOException; + +}