http://git-wip-us.apache.org/repos/asf/phoenix/blob/578979a1/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/MultiHfileOutputFormat.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/MultiHfileOutputFormat.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/MultiHfileOutputFormat.java index eae58ad..05fbab2 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/MultiHfileOutputFormat.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/MultiHfileOutputFormat.java @@ -66,9 +66,9 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner; -import org.apache.phoenix.mapreduce.CsvBulkLoadTool.TargetTableRef; -import org.apache.phoenix.mapreduce.CsvBulkLoadTool.TargetTableRefFunctions; -import org.apache.phoenix.mapreduce.bulkload.CsvTableRowkeyPair; +import org.apache.phoenix.mapreduce.bulkload.TableRowkeyPair; +import org.apache.phoenix.mapreduce.bulkload.TargetTableRef; +import org.apache.phoenix.mapreduce.bulkload.TargetTableRefFunctions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -81,7 +81,7 @@ import com.google.common.collect.Sets; * It has been adapted from {#link HFileOutputFormat2} but differs from the fact it creates * HFiles for multiple tables. */ -public class MultiHfileOutputFormat extends FileOutputFormat<CsvTableRowkeyPair, Cell> { +public class MultiHfileOutputFormat extends FileOutputFormat<TableRowkeyPair, Cell> { private static final Logger LOG = LoggerFactory.getLogger(MultiHfileOutputFormat.class); @@ -101,7 +101,7 @@ public class MultiHfileOutputFormat extends FileOutputFormat<CsvTableRowkeyPair, private static final String AT_DELIMITER = "@"; @Override - public RecordWriter<CsvTableRowkeyPair, Cell> getRecordWriter(TaskAttemptContext context) + public RecordWriter<TableRowkeyPair, Cell> getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException { return createRecordWriter(context); } @@ -112,7 +112,7 @@ public class MultiHfileOutputFormat extends FileOutputFormat<CsvTableRowkeyPair, * @return * @throws IOException */ - static <V extends Cell> RecordWriter<CsvTableRowkeyPair, V> createRecordWriter(final TaskAttemptContext context) + static <V extends Cell> RecordWriter<TableRowkeyPair, V> createRecordWriter(final TaskAttemptContext context) throws IOException { // Get the path of the temporary output file final Path outputPath = FileOutputFormat.getOutputPath(context); @@ -130,7 +130,7 @@ public class MultiHfileOutputFormat extends FileOutputFormat<CsvTableRowkeyPair, final boolean compactionExclude = conf.getBoolean( "hbase.mapreduce.hfileoutputformat.compaction.exclude", false); - return new RecordWriter<CsvTableRowkeyPair, V>() { + return new RecordWriter<TableRowkeyPair, V>() { // Map of families to writers and how much has been output on the writer. private final Map<byte [], WriterLength> writers = new TreeMap<byte [], WriterLength>(Bytes.BYTES_COMPARATOR); @@ -139,7 +139,7 @@ public class MultiHfileOutputFormat extends FileOutputFormat<CsvTableRowkeyPair, private boolean rollRequested = false; @Override - public void write(CsvTableRowkeyPair row, V cell) + public void write(TableRowkeyPair row, V cell) throws IOException { KeyValue kv = KeyValueUtil.ensureKeyValue(cell); // null input == user explicitly wants to flush @@ -450,7 +450,7 @@ public class MultiHfileOutputFormat extends FileOutputFormat<CsvTableRowkeyPair, * Configure <code>job</code> with a TotalOrderPartitioner, partitioning against * <code>splitPoints</code>. Cleans up the partitions file after job exists. */ - static void configurePartitioner(Job job, Set<CsvTableRowkeyPair> tablesStartKeys) + static void configurePartitioner(Job job, Set<TableRowkeyPair> tablesStartKeys) throws IOException { Configuration conf = job.getConfiguration(); @@ -467,7 +467,7 @@ public class MultiHfileOutputFormat extends FileOutputFormat<CsvTableRowkeyPair, } private static void writePartitions(Configuration conf, Path partitionsPath, - Set<CsvTableRowkeyPair> tablesStartKeys) throws IOException { + Set<TableRowkeyPair> tablesStartKeys) throws IOException { LOG.info("Writing partition information to " + partitionsPath); if (tablesStartKeys.isEmpty()) { @@ -478,9 +478,9 @@ public class MultiHfileOutputFormat extends FileOutputFormat<CsvTableRowkeyPair, // have keys < the first region (which has an empty start key) // so we need to remove it. Otherwise we would end up with an // empty reducer with index 0 - TreeSet<CsvTableRowkeyPair> sorted = new TreeSet<CsvTableRowkeyPair>(tablesStartKeys); + TreeSet<TableRowkeyPair> sorted = new TreeSet<TableRowkeyPair>(tablesStartKeys); - CsvTableRowkeyPair first = sorted.first(); + TableRowkeyPair first = sorted.first(); if (!first.getRowkey().equals(HConstants.EMPTY_BYTE_ARRAY)) { throw new IllegalArgumentException( "First region of table should have empty start key. Instead has: " @@ -491,11 +491,11 @@ public class MultiHfileOutputFormat extends FileOutputFormat<CsvTableRowkeyPair, // Write the actual file FileSystem fs = partitionsPath.getFileSystem(conf); SequenceFile.Writer writer = SequenceFile.createWriter( - fs, conf, partitionsPath, CsvTableRowkeyPair.class, + fs, conf, partitionsPath, TableRowkeyPair.class, NullWritable.class); try { - for (CsvTableRowkeyPair startKey : sorted) { + for (TableRowkeyPair startKey : sorted) { writer.append(startKey, NullWritable.get()); } } finally { @@ -658,11 +658,11 @@ public class MultiHfileOutputFormat extends FileOutputFormat<CsvTableRowkeyPair, KeyValueSerialization.class.getName()); // tableStartKeys for all tables. - Set<CsvTableRowkeyPair> tablesStartKeys = Sets.newTreeSet(); + Set<TableRowkeyPair> tablesStartKeys = Sets.newTreeSet(); for(TargetTableRef table : tablesToBeLoaded) { final String tableName = table.getPhysicalName(); try(HTable htable = new HTable(conf,tableName);){ - Set<CsvTableRowkeyPair> startKeys = getRegionStartKeys(tableName , htable.getRegionLocator()); + Set<TableRowkeyPair> startKeys = getRegionStartKeys(tableName , htable.getRegionLocator()); tablesStartKeys.addAll(startKeys); String compressionConfig = configureCompression(htable.getTableDescriptor()); String bloomTypeConfig = configureBloomType(htable.getTableDescriptor()); @@ -704,12 +704,12 @@ public class MultiHfileOutputFormat extends FileOutputFormat<CsvTableRowkeyPair, * Return the start keys of all of the regions in this table, * as a list of ImmutableBytesWritable. */ - private static Set<CsvTableRowkeyPair> getRegionStartKeys(String tableName , RegionLocator table) throws IOException { + private static Set<TableRowkeyPair> getRegionStartKeys(String tableName , RegionLocator table) throws IOException { byte[][] byteKeys = table.getStartKeys(); - Set<CsvTableRowkeyPair> ret = new TreeSet<CsvTableRowkeyPair>(); + Set<TableRowkeyPair> ret = new TreeSet<TableRowkeyPair>(); for (byte[] byteKey : byteKeys) { // phoenix-2216: start : passing the table name and startkey - ret.add(new CsvTableRowkeyPair(tableName, new ImmutableBytesWritable(byteKey))); + ret.add(new TableRowkeyPair(tableName, new ImmutableBytesWritable(byteKey))); } return ret; }
http://git-wip-us.apache.org/repos/asf/phoenix/blob/578979a1/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/bulkload/CsvTableRowkeyPair.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/bulkload/CsvTableRowkeyPair.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/bulkload/CsvTableRowkeyPair.java deleted file mode 100644 index 3ae74b6..0000000 --- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/bulkload/CsvTableRowkeyPair.java +++ /dev/null @@ -1,139 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.phoenix.mapreduce.bulkload; - -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; - -import org.apache.hadoop.hbase.io.ImmutableBytesWritable; -import org.apache.hadoop.io.BytesWritable; -import org.apache.hadoop.io.WritableComparable; -import org.apache.hadoop.io.WritableComparator; -import org.apache.hadoop.io.WritableUtils; - -import com.google.common.base.Preconditions; - - -/** - * A WritableComparable to hold the table name and the rowkey. - * - */ -public class CsvTableRowkeyPair implements WritableComparable<CsvTableRowkeyPair> { - - /* The qualified table name */ - private String tableName; - - /* The rowkey for the record */ - private ImmutableBytesWritable rowkey; - - /** - * Default constructor - */ - public CsvTableRowkeyPair() { - super(); - } - - /** - * @param tableName - * @param rowkey - */ - public CsvTableRowkeyPair(String tableName, ImmutableBytesWritable rowkey) { - super(); - Preconditions.checkNotNull(tableName); - Preconditions.checkNotNull(rowkey); - this.tableName = tableName; - this.rowkey = rowkey; - } - - public String getTableName() { - return tableName; - } - - public void setTableName(String tableName) { - this.tableName = tableName; - } - - public ImmutableBytesWritable getRowkey() { - return rowkey; - } - - public void setRowkey(ImmutableBytesWritable rowkey) { - this.rowkey = rowkey; - } - - @Override - public void readFields(DataInput input) throws IOException { - tableName = WritableUtils.readString(input); - rowkey = new ImmutableBytesWritable(); - rowkey.readFields(input); - } - - @Override - public void write(DataOutput output) throws IOException { - WritableUtils.writeString(output,tableName); - rowkey.write(output); - } - - @Override - public int compareTo(CsvTableRowkeyPair other) { - String otherTableName = other.getTableName(); - if(this.tableName.equals(otherTableName)) { - return this.rowkey.compareTo(other.getRowkey()); - } else { - return this.tableName.compareTo(otherTableName); - } - } - - /** Comparator optimized for <code>CsvTableRowkeyPair</code>. */ - public static class Comparator extends WritableComparator { - private BytesWritable.Comparator comparator = new BytesWritable.Comparator(); - - public Comparator() { - super(CsvTableRowkeyPair.class); - } - - @Override - public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { - try { - int vintL1 = WritableUtils.decodeVIntSize(b1[s1]); - int vintL2 = WritableUtils.decodeVIntSize(b2[s2]); - int strL1 = readVInt(b1, s1); - int strL2 = readVInt(b2, s2); - int cmp = compareBytes(b1, s1 + vintL1, strL1, b2, s2 + vintL2, strL2); - if (cmp != 0) { - return cmp; - } - int vintL3 = WritableUtils.decodeVIntSize(b1[s1 + vintL1 + strL1]); - int vintL4 = WritableUtils.decodeVIntSize(b2[s2 + vintL2 + strL2]); - int strL3 = readVInt(b1, s1 + vintL1 + strL1); - int strL4 = readVInt(b2, s2 + vintL2 + strL2); - return comparator.compare(b1, s1 + vintL1 + strL1 + vintL3, strL3, b2, s2 - + vintL2 + strL2 + vintL4, strL4); - - } catch(Exception ex) { - throw new IllegalArgumentException(ex); - } - } - } - - static { - WritableComparator.define(CsvTableRowkeyPair.class, new Comparator()); - } - -} http://git-wip-us.apache.org/repos/asf/phoenix/blob/578979a1/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/bulkload/TableRowkeyPair.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/bulkload/TableRowkeyPair.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/bulkload/TableRowkeyPair.java new file mode 100644 index 0000000..412226f --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/bulkload/TableRowkeyPair.java @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.mapreduce.bulkload; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.io.WritableComparator; +import org.apache.hadoop.io.WritableUtils; + +import com.google.common.base.Preconditions; + + +/** + * A WritableComparable to hold the table name and the rowkey. + */ +public class TableRowkeyPair implements WritableComparable<TableRowkeyPair> { + + /* The qualified table name */ + private String tableName; + + /* The rowkey for the record */ + private ImmutableBytesWritable rowkey; + + /** + * Default constructor + */ + public TableRowkeyPair() { + super(); + } + + public TableRowkeyPair(String tableName, ImmutableBytesWritable rowkey) { + super(); + Preconditions.checkNotNull(tableName); + Preconditions.checkNotNull(rowkey); + this.tableName = tableName; + this.rowkey = rowkey; + } + + public String getTableName() { + return tableName; + } + + public void setTableName(String tableName) { + this.tableName = tableName; + } + + public ImmutableBytesWritable getRowkey() { + return rowkey; + } + + public void setRowkey(ImmutableBytesWritable rowkey) { + this.rowkey = rowkey; + } + + @Override + public void readFields(DataInput input) throws IOException { + tableName = WritableUtils.readString(input); + rowkey = new ImmutableBytesWritable(); + rowkey.readFields(input); + } + + @Override + public void write(DataOutput output) throws IOException { + WritableUtils.writeString(output,tableName); + rowkey.write(output); + } + + @Override + public int compareTo(TableRowkeyPair other) { + String otherTableName = other.getTableName(); + if(this.tableName.equals(otherTableName)) { + return this.rowkey.compareTo(other.getRowkey()); + } else { + return this.tableName.compareTo(otherTableName); + } + } + + /** Comparator optimized for <code>TableRowkeyPair</code>. */ + public static class Comparator extends WritableComparator { + private BytesWritable.Comparator comparator = new BytesWritable.Comparator(); + + public Comparator() { + super(TableRowkeyPair.class); + } + + @Override + public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { + try { + int vintL1 = WritableUtils.decodeVIntSize(b1[s1]); + int vintL2 = WritableUtils.decodeVIntSize(b2[s2]); + int strL1 = readVInt(b1, s1); + int strL2 = readVInt(b2, s2); + int cmp = compareBytes(b1, s1 + vintL1, strL1, b2, s2 + vintL2, strL2); + if (cmp != 0) { + return cmp; + } + int vintL3 = WritableUtils.decodeVIntSize(b1[s1 + vintL1 + strL1]); + int vintL4 = WritableUtils.decodeVIntSize(b2[s2 + vintL2 + strL2]); + int strL3 = readVInt(b1, s1 + vintL1 + strL1); + int strL4 = readVInt(b2, s2 + vintL2 + strL2); + return comparator.compare(b1, s1 + vintL1 + strL1 + vintL3, strL3, b2, s2 + + vintL2 + strL2 + vintL4, strL4); + + } catch(Exception ex) { + throw new IllegalArgumentException(ex); + } + } + } + + static { + WritableComparator.define(TableRowkeyPair.class, new Comparator()); + } + +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/578979a1/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/bulkload/TargetTableRef.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/bulkload/TargetTableRef.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/bulkload/TargetTableRef.java new file mode 100644 index 0000000..1a846f9 --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/bulkload/TargetTableRef.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.mapreduce.bulkload; + +import java.util.Map; + +import org.codehaus.jackson.annotate.JsonCreator; +import org.codehaus.jackson.annotate.JsonProperty; + +import com.google.common.collect.Maps; + +/** + * Represents the logical and physical name of a single table to which data is to be loaded. + * + * This class exists to allow for the difference between HBase physical table names and + * Phoenix logical table names. + */ +public class TargetTableRef { + + @JsonProperty + private final String logicalName; + + @JsonProperty + private final String physicalName; + + @JsonProperty + private Map<String,String> configuration = Maps.newHashMap(); + + public TargetTableRef(String name) { + this(name, name); + } + + @JsonCreator + private TargetTableRef(@JsonProperty("logicalName") String logicalName, + @JsonProperty("physicalName") String physicalName) { + this.logicalName = logicalName; + this.physicalName = physicalName; + } + + public String getLogicalName() { + return logicalName; + } + + public String getPhysicalName() { + return physicalName; + } + + public Map<String, String> getConfiguration() { + return configuration; + } + + public void setConfiguration(Map<String, String> configuration) { + this.configuration = configuration; + } +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/578979a1/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/bulkload/TargetTableRefFunctions.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/bulkload/TargetTableRefFunctions.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/bulkload/TargetTableRefFunctions.java new file mode 100644 index 0000000..d786842 --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/bulkload/TargetTableRefFunctions.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.mapreduce.bulkload; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.codehaus.jackson.map.ObjectMapper; + +import com.google.common.base.Function; +import com.google.common.collect.Lists; + +/** + * Utility functions to get/put json. + * + */ +public class TargetTableRefFunctions { + + public static Function<TargetTableRef,String> TO_JSON = new Function<TargetTableRef,String>() { + + @Override + public String apply(TargetTableRef input) { + try { + ObjectMapper mapper = new ObjectMapper(); + return mapper.writeValueAsString(input); + } catch (IOException e) { + throw new RuntimeException(e); + } + + } + }; + + public static Function<String,TargetTableRef> FROM_JSON = new Function<String,TargetTableRef>() { + + @Override + public TargetTableRef apply(String json) { + try { + ObjectMapper mapper = new ObjectMapper(); + return mapper.readValue(json, TargetTableRef.class); + } catch (IOException e) { + throw new RuntimeException(e); + } + + } + }; + + public static Function<List<TargetTableRef>,String> NAMES_TO_JSON = new Function<List<TargetTableRef>,String>() { + + @Override + public String apply(List<TargetTableRef> input) { + try { + List<String> tableNames = Lists.newArrayListWithCapacity(input.size()); + for(TargetTableRef table : input) { + tableNames.add(table.getPhysicalName()); + } + ObjectMapper mapper = new ObjectMapper(); + return mapper.writeValueAsString(tableNames); + } catch (IOException e) { + throw new RuntimeException(e); + } + + } + }; + + public static Function<String,List<String>> NAMES_FROM_JSON = new Function<String,List<String>>() { + + @SuppressWarnings("unchecked") + @Override + public List<String> apply(String json) { + try { + ObjectMapper mapper = new ObjectMapper(); + return mapper.readValue(json, ArrayList.class); + } catch (IOException e) { + throw new RuntimeException(e); + } + + } + }; + } http://git-wip-us.apache.org/repos/asf/phoenix/blob/578979a1/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java index 8bf786b..9e29fba 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java @@ -23,13 +23,6 @@ import java.sql.SQLException; import java.util.List; import java.util.Map; -import com.google.common.base.Function; -import com.google.common.base.Joiner; -import com.google.common.base.Preconditions; -import com.google.common.base.Splitter; -import com.google.common.collect.Iterables; -import com.google.common.collect.Lists; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -41,13 +34,17 @@ import org.apache.hadoop.mapreduce.lib.db.DBInputFormat.NullDBWritable; import org.apache.hadoop.mapreduce.lib.db.DBWritable; import org.apache.hadoop.util.ReflectionUtils; import org.apache.phoenix.jdbc.PhoenixConnection; -import org.apache.phoenix.mapreduce.CsvToKeyValueMapper.DefaultImportPreUpsertKeyValueProcessor; +import org.apache.phoenix.mapreduce.FormatToKeyValueMapper; import org.apache.phoenix.mapreduce.ImportPreUpsertKeyValueProcessor; import org.apache.phoenix.mapreduce.PhoenixInputFormat; import org.apache.phoenix.util.ColumnInfo; import org.apache.phoenix.util.PhoenixRuntime; import org.apache.phoenix.util.QueryUtil; +import com.google.common.base.Joiner; +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; + import static org.apache.commons.lang.StringUtils.isNotEmpty; /** @@ -419,7 +416,7 @@ public final class PhoenixConfigurationUtil { Class<? extends ImportPreUpsertKeyValueProcessor> processorClass = null; try { processorClass = conf.getClass( - UPSERT_HOOK_CLASS_CONFKEY, DefaultImportPreUpsertKeyValueProcessor.class, + UPSERT_HOOK_CLASS_CONFKEY, FormatToKeyValueMapper.DefaultImportPreUpsertKeyValueProcessor.class, ImportPreUpsertKeyValueProcessor.class); } catch (Exception e) { throw new IllegalStateException("Couldn't load upsert hook class", e); http://git-wip-us.apache.org/repos/asf/phoenix/blob/578979a1/phoenix-core/src/main/java/org/apache/phoenix/util/CSVCommonsLoader.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/CSVCommonsLoader.java b/phoenix-core/src/main/java/org/apache/phoenix/util/CSVCommonsLoader.java index b8b284a..cdd9d7b 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/util/CSVCommonsLoader.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/util/CSVCommonsLoader.java @@ -17,36 +17,25 @@ */ package org.apache.phoenix.util; -import com.google.common.base.Charsets; -import com.google.common.base.Joiner; -import com.google.common.base.Throwables; -import com.google.common.collect.ImmutableMap; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; +import java.io.File; +import java.io.Reader; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + import org.apache.commons.csv.CSVFormat; import org.apache.commons.csv.CSVParser; import org.apache.commons.csv.CSVRecord; -import org.apache.phoenix.exception.SQLExceptionCode; -import org.apache.phoenix.exception.SQLExceptionInfo; import org.apache.phoenix.jdbc.PhoenixConnection; -import org.apache.phoenix.schema.types.PDataType; import org.apache.phoenix.util.csv.CsvUpsertExecutor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.File; -import java.io.Reader; -import java.sql.Connection; -import java.sql.DatabaseMetaData; -import java.sql.ResultSet; -import java.sql.SQLException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.TreeSet; +import com.google.common.base.Charsets; +import com.google.common.base.Joiner; +import com.google.common.base.Throwables; +import com.google.common.collect.ImmutableMap; /*** * Upserts CSV data using Phoenix JDBC connection @@ -222,7 +211,7 @@ public class CSVCommonsLoader { long start = System.currentTimeMillis(); CsvUpsertListener upsertListener = new CsvUpsertListener(conn, conn.getMutateBatchSize(), isStrict); - CsvUpsertExecutor csvUpsertExecutor = CsvUpsertExecutor.create(conn, tableName, + CsvUpsertExecutor csvUpsertExecutor = new CsvUpsertExecutor(conn, tableName, columnInfoList, upsertListener, arrayElementSeparator); csvUpsertExecutor.execute(csvParser); @@ -267,131 +256,10 @@ public class CSVCommonsLoader { default: throw new IllegalStateException("parser has unknown column source."); } - return generateColumnInfo(conn, tableName, columns, isStrict); - } - - /** - * Get list of ColumnInfos that contain Column Name and its associated - * PDataType for an import. The supplied list of columns can be null -- if it is non-null, - * it represents a user-supplied list of columns to be imported. - * - * @param conn Phoenix connection from which metadata will be read - * @param tableName Phoenix table name whose columns are to be checked. Can include a schema - * name - * @param columns user-supplied list of import columns, can be null - * @param strict if true, an exception will be thrown if unknown columns are supplied - */ - public static List<ColumnInfo> generateColumnInfo(Connection conn, - String tableName, List<String> columns, boolean strict) - throws SQLException { - Map<String, Integer> columnNameToTypeMap = Maps.newLinkedHashMap(); - Set<String> ambiguousColumnNames = new HashSet<String>(); - Map<String, Integer> fullColumnNameToTypeMap = Maps.newLinkedHashMap(); - DatabaseMetaData dbmd = conn.getMetaData(); - int unfoundColumnCount = 0; - // TODO: escape wildcard characters here because we don't want that - // behavior here - String escapedTableName = StringUtil.escapeLike(tableName); - String[] schemaAndTable = escapedTableName.split("\\."); - ResultSet rs = null; - try { - rs = dbmd.getColumns(null, (schemaAndTable.length == 1 ? "" - : schemaAndTable[0]), - (schemaAndTable.length == 1 ? escapedTableName - : schemaAndTable[1]), null); - while (rs.next()) { - String colName = rs.getString(QueryUtil.COLUMN_NAME_POSITION); - String colFam = rs.getString(QueryUtil.COLUMN_FAMILY_POSITION); - - // use family qualifier, if available, otherwise, use column name - String fullColumn = (colFam==null?colName:String.format("%s.%s",colFam,colName)); - String sqlTypeName = rs.getString(QueryUtil.DATA_TYPE_NAME_POSITION); - - // allow for both bare and family qualified names. - if (columnNameToTypeMap.keySet().contains(colName)) { - ambiguousColumnNames.add(colName); - } - columnNameToTypeMap.put( - colName, - PDataType.fromSqlTypeName(sqlTypeName).getSqlType()); - fullColumnNameToTypeMap.put( - fullColumn, - PDataType.fromSqlTypeName(sqlTypeName).getSqlType()); - } - if (columnNameToTypeMap.isEmpty()) { - throw new IllegalArgumentException("Table " + tableName + " not found"); - } - } finally { - if (rs != null) { - rs.close(); - } - } - List<ColumnInfo> columnInfoList = Lists.newArrayList(); - Set<String> unresolvedColumnNames = new TreeSet<String>(); - if (columns == null) { - // use family qualified names by default, if no columns are specified. - for (Map.Entry<String, Integer> entry : fullColumnNameToTypeMap - .entrySet()) { - columnInfoList.add(new ColumnInfo(entry.getKey(), entry.getValue())); - } - } else { - // Leave "null" as indication to skip b/c it doesn't exist - for (int i = 0; i < columns.size(); i++) { - String columnName = columns.get(i).trim(); - Integer sqlType = null; - if (fullColumnNameToTypeMap.containsKey(columnName)) { - sqlType = fullColumnNameToTypeMap.get(columnName); - } else if (columnNameToTypeMap.containsKey(columnName)) { - if (ambiguousColumnNames.contains(columnName)) { - unresolvedColumnNames.add(columnName); - } - // fall back to bare column name. - sqlType = columnNameToTypeMap.get(columnName); - } - if (unresolvedColumnNames.size()>0) { - StringBuilder exceptionMessage = new StringBuilder(); - boolean first = true; - exceptionMessage.append("Unable to resolve these column names to a single column family:\n"); - for (String col : unresolvedColumnNames) { - if (first) first = false; - else exceptionMessage.append(","); - exceptionMessage.append(col); - } - exceptionMessage.append("\nAvailable columns with column families:\n"); - first = true; - for (String col : fullColumnNameToTypeMap.keySet()) { - if (first) first = false; - else exceptionMessage.append(","); - exceptionMessage.append(col); - } - throw new SQLException(exceptionMessage.toString()); - } - - if (sqlType == null) { - if (strict) { - throw new SQLExceptionInfo.Builder( - SQLExceptionCode.COLUMN_NOT_FOUND) - .setColumnName(columnName) - .setTableName(tableName).build() - .buildException(); - } - unfoundColumnCount++; - } else { - columnInfoList.add(new ColumnInfo(columnName, sqlType)); - } - } - if (unfoundColumnCount == columns.size()) { - throw new SQLExceptionInfo.Builder( - SQLExceptionCode.COLUMN_NOT_FOUND) - .setColumnName( - Arrays.toString(columns.toArray(new String[0]))) - .setTableName(tableName).build().buildException(); - } - } - return columnInfoList; + return SchemaUtil.generateColumnInfo(conn, tableName, columns, isStrict); } - static class CsvUpsertListener implements CsvUpsertExecutor.UpsertListener { + static class CsvUpsertListener implements UpsertExecutor.UpsertListener<CSVRecord> { private final PhoenixConnection conn; private final int upsertBatchSize; http://git-wip-us.apache.org/repos/asf/phoenix/blob/578979a1/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java index 1693600..cb8aced 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java @@ -17,21 +17,20 @@ */ package org.apache.phoenix.util; -import static com.google.common.base.Preconditions.checkArgument; -import static com.google.common.base.Preconditions.checkNotNull; -import static com.google.common.base.Strings.isNullOrEmpty; -import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES; -import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_FUNCTION_NAME_BYTES; -import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_STATS_NAME_BYTES; - +import java.sql.Connection; +import java.sql.DatabaseMetaData; +import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashSet; import java.util.Iterator; import java.util.LinkedHashSet; import java.util.List; import java.util.Map; - +import java.util.Set; +import java.util.TreeSet; import javax.annotation.Nullable; import org.apache.hadoop.hbase.KeyValue; @@ -64,6 +63,15 @@ import org.apache.phoenix.schema.types.PVarbinary; import org.apache.phoenix.schema.types.PVarchar; import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; +import static com.google.common.base.Strings.isNullOrEmpty; +import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES; +import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_FUNCTION_NAME_BYTES; +import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_STATS_NAME_BYTES; /** * @@ -757,4 +765,125 @@ public class SchemaUtil { public static byte getSeparatorByte(boolean rowKeyOrderOptimizable, boolean isNullValue, Expression e) { return getSeparatorByte(rowKeyOrderOptimizable, isNullValue, e.getSortOrder()); } + + /** + * Get list of ColumnInfos that contain Column Name and its associated + * PDataType for an import. The supplied list of columns can be null -- if it is non-null, + * it represents a user-supplied list of columns to be imported. + * + * @param conn Phoenix connection from which metadata will be read + * @param tableName Phoenix table name whose columns are to be checked. Can include a schema + * name + * @param columns user-supplied list of import columns, can be null + * @param strict if true, an exception will be thrown if unknown columns are supplied + */ + public static List<ColumnInfo> generateColumnInfo(Connection conn, + String tableName, List<String> columns, boolean strict) + throws SQLException { + Map<String, Integer> columnNameToTypeMap = Maps.newLinkedHashMap(); + Set<String> ambiguousColumnNames = new HashSet<String>(); + Map<String, Integer> fullColumnNameToTypeMap = Maps.newLinkedHashMap(); + DatabaseMetaData dbmd = conn.getMetaData(); + int unfoundColumnCount = 0; + // TODO: escape wildcard characters here because we don't want that + // behavior here + String escapedTableName = StringUtil.escapeLike(tableName); + String[] schemaAndTable = escapedTableName.split("\\."); + ResultSet rs = null; + try { + rs = dbmd.getColumns(null, (schemaAndTable.length == 1 ? "" + : schemaAndTable[0]), + (schemaAndTable.length == 1 ? escapedTableName + : schemaAndTable[1]), null); + while (rs.next()) { + String colName = rs.getString(QueryUtil.COLUMN_NAME_POSITION); + String colFam = rs.getString(QueryUtil.COLUMN_FAMILY_POSITION); + + // use family qualifier, if available, otherwise, use column name + String fullColumn = (colFam==null?colName:String.format("%s.%s",colFam,colName)); + String sqlTypeName = rs.getString(QueryUtil.DATA_TYPE_NAME_POSITION); + + // allow for both bare and family qualified names. + if (columnNameToTypeMap.keySet().contains(colName)) { + ambiguousColumnNames.add(colName); + } + columnNameToTypeMap.put( + colName, + PDataType.fromSqlTypeName(sqlTypeName).getSqlType()); + fullColumnNameToTypeMap.put( + fullColumn, + PDataType.fromSqlTypeName(sqlTypeName).getSqlType()); + } + if (columnNameToTypeMap.isEmpty()) { + throw new IllegalArgumentException("Table " + tableName + " not found"); + } + } finally { + if (rs != null) { + rs.close(); + } + } + List<ColumnInfo> columnInfoList = Lists.newArrayList(); + Set<String> unresolvedColumnNames = new TreeSet<String>(); + if (columns == null) { + // use family qualified names by default, if no columns are specified. + for (Map.Entry<String, Integer> entry : fullColumnNameToTypeMap + .entrySet()) { + columnInfoList.add(new ColumnInfo(entry.getKey(), entry.getValue())); + } + } else { + // Leave "null" as indication to skip b/c it doesn't exist + for (int i = 0; i < columns.size(); i++) { + String columnName = columns.get(i).trim(); + Integer sqlType = null; + if (fullColumnNameToTypeMap.containsKey(columnName)) { + sqlType = fullColumnNameToTypeMap.get(columnName); + } else if (columnNameToTypeMap.containsKey(columnName)) { + if (ambiguousColumnNames.contains(columnName)) { + unresolvedColumnNames.add(columnName); + } + // fall back to bare column name. + sqlType = columnNameToTypeMap.get(columnName); + } + if (unresolvedColumnNames.size()>0) { + StringBuilder exceptionMessage = new StringBuilder(); + boolean first = true; + exceptionMessage.append("Unable to resolve these column names to a single column family:\n"); + for (String col : unresolvedColumnNames) { + if (first) first = false; + else exceptionMessage.append(","); + exceptionMessage.append(col); + } + exceptionMessage.append("\nAvailable columns with column families:\n"); + first = true; + for (String col : fullColumnNameToTypeMap.keySet()) { + if (first) first = false; + else exceptionMessage.append(","); + exceptionMessage.append(col); + } + throw new SQLException(exceptionMessage.toString()); + } + + if (sqlType == null) { + if (strict) { + throw new SQLExceptionInfo.Builder( + SQLExceptionCode.COLUMN_NOT_FOUND) + .setColumnName(columnName) + .setTableName(tableName).build() + .buildException(); + } + unfoundColumnCount++; + } else { + columnInfoList.add(new ColumnInfo(columnName, sqlType)); + } + } + if (unfoundColumnCount == columns.size()) { + throw new SQLExceptionInfo.Builder( + SQLExceptionCode.COLUMN_NOT_FOUND) + .setColumnName( + Arrays.toString(columns.toArray(new String[0]))) + .setTableName(tableName).build().buildException(); + } + } + return columnInfoList; + } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/578979a1/phoenix-core/src/main/java/org/apache/phoenix/util/UpsertExecutor.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/UpsertExecutor.java b/phoenix-core/src/main/java/org/apache/phoenix/util/UpsertExecutor.java new file mode 100644 index 0000000..d9ce5f2 --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/util/UpsertExecutor.java @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.util; + +import java.io.Closeable; +import java.io.IOException; +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.util.List; + +import org.apache.phoenix.schema.types.PDataType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Function; +import com.google.common.collect.Lists; + +/** + * Executes upsert statements on a provided {@code PreparedStatement} based on incoming + * {@code RECORDS}. An {@link UpsertListener} is notified each time the prepared statement + * is executed. + */ +public abstract class UpsertExecutor<RECORD, FIELD> implements Closeable { + + /** + * A listener that is called for events based on incoming JSON data. + */ + public interface UpsertListener<RECORD> { + + /** + * Called when an upsert has been sucessfully completed. The given upsertCount is the total number of upserts + * completed on the caller up to this point. + * + * @param upsertCount total number of upserts that have been completed + */ + void upsertDone(long upsertCount); + + + /** + * Called when executing a prepared statement has failed on a given record. + * + * @param record the JSON record that was being upserted when the error occurred + */ + void errorOnRecord(RECORD record, Throwable throwable); + } + + private static final Logger LOG = LoggerFactory.getLogger(UpsertExecutor.class); + + protected final Connection conn; + protected final List<ColumnInfo> columnInfos; + protected final List<PDataType> dataTypes; + protected final List<Function<FIELD, Object>> conversionFunctions; + protected final PreparedStatement preparedStatement; + protected final UpsertListener<RECORD> upsertListener; + protected long upsertCount = 0L; + protected boolean initFinished = false; // allow subclasses to finish initialization + + private static PreparedStatement createStatement(Connection conn, String tableName, + List<ColumnInfo> columnInfoList) { + PreparedStatement preparedStatement; + try { + String upsertSql = QueryUtil.constructUpsertStatement(tableName, columnInfoList); + LOG.info("Upserting SQL data with {}", upsertSql); + preparedStatement = conn.prepareStatement(upsertSql); + } catch (SQLException e) { + throw new RuntimeException(e); + } + return preparedStatement; + } + + /** + * Construct with the definition of incoming columns, and the statement upon which upsert + * statements are to be performed. + */ + public UpsertExecutor(Connection conn, String tableName, + List<ColumnInfo> columnInfoList, UpsertListener<RECORD> upsertListener) { + this(conn, columnInfoList, createStatement(conn, tableName, columnInfoList), upsertListener); + } + + /** Testing constructor. Do not use in prod. */ + @VisibleForTesting + protected UpsertExecutor(Connection conn, List<ColumnInfo> columnInfoList, + PreparedStatement preparedStatement, UpsertListener<RECORD> upsertListener) { + this.conn = conn; + this.upsertListener = upsertListener; + this.columnInfos = columnInfoList; + this.preparedStatement = preparedStatement; + this.dataTypes = Lists.newArrayList(); + this.conversionFunctions = Lists.newArrayList(); + } + + /** + * Awkward protocol allows subclass constructors to finish initializing context before + * proceeding to record processing. + */ + protected void finishInit() { + for (ColumnInfo columnInfo : columnInfos) { + PDataType dataType = PDataType.fromTypeId(columnInfo.getSqlType()); + dataTypes.add(dataType); + conversionFunctions.add(createConversionFunction(dataType)); + } + this.initFinished = true; + } + + /** + * Execute upserts for each JSON record contained in the given iterable, notifying this instance's + * {@code UpsertListener} for each completed upsert. + * + * @param records iterable of JSON records to be upserted + */ + public void execute(Iterable<RECORD> records) { + if (!initFinished) { + finishInit(); + } + for (RECORD record : records) { + execute(record); + } + } + + /** + * Upsert a single record. + * + * @param record JSON record containing the data to be upserted + */ + protected abstract void execute(RECORD record); + + @Override + public void close() throws IOException { + try { + preparedStatement.close(); + } catch (SQLException e) { + // An exception while closing the prepared statement is most likely a sign of a real problem, so we don't + // want to hide it with closeQuietly or something similar + throw new RuntimeException(e); + } + } + + protected abstract Function<FIELD, Object> createConversionFunction(PDataType dataType); +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/578979a1/phoenix-core/src/main/java/org/apache/phoenix/util/csv/CsvUpsertExecutor.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/csv/CsvUpsertExecutor.java b/phoenix-core/src/main/java/org/apache/phoenix/util/csv/CsvUpsertExecutor.java index e680f5c..4a3af21 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/util/csv/CsvUpsertExecutor.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/util/csv/CsvUpsertExecutor.java @@ -17,132 +17,55 @@ */ package org.apache.phoenix.util.csv; -import java.io.Closeable; -import java.io.IOException; import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.SQLException; import java.sql.Types; import java.util.List; import java.util.Properties; - import javax.annotation.Nullable; import org.apache.commons.csv.CSVRecord; -import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.query.QueryServices; import org.apache.phoenix.query.QueryServicesOptions; import org.apache.phoenix.schema.types.PDataType; import org.apache.phoenix.schema.types.PTimestamp; import org.apache.phoenix.util.ColumnInfo; import org.apache.phoenix.util.DateUtil; -import org.apache.phoenix.util.QueryUtil; +import org.apache.phoenix.util.UpsertExecutor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Function; -import com.google.common.collect.Lists; -/** - * Executes upsert statements on a provided {@code PreparedStatement} based on incoming CSV records, notifying a - * listener each time the prepared statement is executed. - */ -public class CsvUpsertExecutor implements Closeable { +/** {@link UpsertExecutor} over {@link CSVRecord}s. */ +public class CsvUpsertExecutor extends UpsertExecutor<CSVRecord, String> { private static final Logger LOG = LoggerFactory.getLogger(CsvUpsertExecutor.class); - private final String arrayElementSeparator; - private final Connection conn; - private final List<PDataType> dataTypes; - private final List<Function<String,Object>> conversionFunctions; - private final PreparedStatement preparedStatement; - private final UpsertListener upsertListener; - private long upsertCount = 0L; - - /** - * A listener that is called for events based on incoming CSV data. - */ - public static interface UpsertListener { - - /** - * Called when an upsert has been sucessfully completed. The given upsertCount is the total number of upserts - * completed on the caller up to this point. - * - * @param upsertCount total number of upserts that have been completed - */ - void upsertDone(long upsertCount); - - - /** - * Called when executing a prepared statement has failed on a given record. - * - * @param csvRecord the CSV record that was being upserted when the error occurred - */ - void errorOnRecord(CSVRecord csvRecord, Throwable throwable); - } - - - /** - * Static constructor method for creating a CsvUpsertExecutor. - * - * @param conn Phoenix connection upon which upserts are to be performed - * @param tableName name of the table in which upserts are to be performed - * @param columnInfoList description of the columns to be upserted to, in the same order as in the CSV input - * @param upsertListener listener that will be notified of upserts, can be null - * @param arrayElementSeparator separator string to delimit string representations of arrays - * @return the created CsvUpsertExecutor - */ - public static CsvUpsertExecutor create(PhoenixConnection conn, String tableName, List<ColumnInfo> columnInfoList, - UpsertListener upsertListener, String arrayElementSeparator) { - PreparedStatement preparedStatement = null; - try { - String upsertSql = QueryUtil.constructUpsertStatement(tableName, columnInfoList); - LOG.info("Upserting SQL data with {}", upsertSql); - preparedStatement = conn.prepareStatement(upsertSql); - } catch (SQLException e) { - throw new RuntimeException(e); - } - return new CsvUpsertExecutor(conn, columnInfoList, preparedStatement, upsertListener, - arrayElementSeparator); - } + protected final String arrayElementSeparator; - /** - * Construct with the definition of incoming columns, and the statement upon which upsert statements - * are to be performed. - */ - CsvUpsertExecutor(Connection conn, List<ColumnInfo> columnInfoList, PreparedStatement preparedStatement, - UpsertListener upsertListener, String arrayElementSeparator) { - this.conn = conn; - this.preparedStatement = preparedStatement; - this.upsertListener = upsertListener; + /** Testing constructor. Do not use in prod. */ + @VisibleForTesting + protected CsvUpsertExecutor(Connection conn, List<ColumnInfo> columnInfoList, + PreparedStatement stmt, UpsertListener<CSVRecord> upsertListener, + String arrayElementSeparator) { + super(conn, columnInfoList, stmt, upsertListener); this.arrayElementSeparator = arrayElementSeparator; - this.dataTypes = Lists.newArrayList(); - this.conversionFunctions = Lists.newArrayList(); - for (ColumnInfo columnInfo : columnInfoList) { - PDataType dataType = PDataType.fromTypeId(columnInfo.getSqlType()); - dataTypes.add(dataType); - conversionFunctions.add(createConversionFunction(dataType)); - } + finishInit(); } - /** - * Execute upserts for each CSV record contained in the given iterable, notifying this instance's - * {@code UpsertListener} for each completed upsert. - * - * @param csvRecords iterable of CSV records to be upserted - */ - public void execute(Iterable<CSVRecord> csvRecords) { - for (CSVRecord csvRecord : csvRecords) { - execute(csvRecord); - } + public CsvUpsertExecutor(Connection conn, String tableName, + List<ColumnInfo> columnInfoList, UpsertListener<CSVRecord> upsertListener, + String arrayElementSeparator) { + super(conn, tableName, columnInfoList, upsertListener); + this.arrayElementSeparator = arrayElementSeparator; + finishInit(); } - /** - * Upsert a single record. - * - * @param csvRecord CSV record containing the data to be upserted - */ - void execute(CSVRecord csvRecord) { + @Override + protected void execute(CSVRecord csvRecord) { try { if (csvRecord.size() < conversionFunctions.size()) { String message = String.format("CSV record does not have enough values (has %d, but needs %d)", @@ -170,17 +93,7 @@ public class CsvUpsertExecutor implements Closeable { } @Override - public void close() throws IOException { - try { - preparedStatement.close(); - } catch (SQLException e) { - // An exception while closing the prepared statement is most likely a sign of a real problem, so we don't - // want to hide it with closeQuietly or something similar - throw new RuntimeException(e); - } - } - - private Function<String, Object> createConversionFunction(PDataType dataType) { + protected Function<String, Object> createConversionFunction(PDataType dataType) { if (dataType.isArrayType()) { return new ArrayDatatypeConversionFunction( new StringToArrayConverter( @@ -201,7 +114,7 @@ public class CsvUpsertExecutor implements Closeable { private final DateUtil.DateTimeParser dateTimeParser; SimpleDatatypeConversionFunction(PDataType dataType, Connection conn) { - Properties props = null; + Properties props; try { props = conn.getClientInfo(); } catch (SQLException e) { http://git-wip-us.apache.org/repos/asf/phoenix/blob/578979a1/phoenix-core/src/main/java/org/apache/phoenix/util/json/JsonUpsertExecutor.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/json/JsonUpsertExecutor.java b/phoenix-core/src/main/java/org/apache/phoenix/util/json/JsonUpsertExecutor.java new file mode 100644 index 0000000..bbe0e30 --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/util/json/JsonUpsertExecutor.java @@ -0,0 +1,209 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.util.json; + +import java.io.IOException; +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.sql.Types; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import javax.annotation.Nullable; + +import org.apache.phoenix.query.QueryServices; +import org.apache.phoenix.query.QueryServicesOptions; +import org.apache.phoenix.schema.types.PDataType; +import org.apache.phoenix.schema.types.PTimestamp; +import org.apache.phoenix.util.ColumnInfo; +import org.apache.phoenix.util.DateUtil; +import org.apache.phoenix.util.UpsertExecutor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.CaseFormat; +import com.google.common.base.Function; + +/** {@link UpsertExecutor} over {@link Map} objects, as parsed from JSON. */ +public class JsonUpsertExecutor extends UpsertExecutor<Map<?, ?>, Object> { + + protected static final Logger LOG = LoggerFactory.getLogger(JsonUpsertExecutor.class); + + /** Testing constructor. Do not use in prod. */ + @VisibleForTesting + protected JsonUpsertExecutor(Connection conn, List<ColumnInfo> columnInfoList, + PreparedStatement stmt, UpsertListener<Map<?, ?>> upsertListener) { + super(conn, columnInfoList, stmt, upsertListener); + finishInit(); + } + + public JsonUpsertExecutor(Connection conn, String tableName, List<ColumnInfo> columnInfoList, + UpsertExecutor.UpsertListener<Map<?, ?>> upsertListener) { + super(conn, tableName, columnInfoList, upsertListener); + finishInit(); + } + + @Override + protected void execute(Map<?, ?> record) { + int fieldIndex = 0; + String colName = null; + try { + if (record.size() < conversionFunctions.size()) { + String message = String.format("JSON record does not have enough values (has %d, but needs %d)", + record.size(), conversionFunctions.size()); + throw new IllegalArgumentException(message); + } + for (fieldIndex = 0; fieldIndex < conversionFunctions.size(); fieldIndex++) { + colName = CaseFormat.UPPER_UNDERSCORE.to( + CaseFormat.LOWER_UNDERSCORE, columnInfos.get(fieldIndex).getColumnName()); + if (colName.contains(".")) { + StringBuilder sb = new StringBuilder(); + String[] parts = colName.split("\\."); + // assume first part is the column family name; omita + for (int i = 1; i < parts.length; i++) { + sb.append(parts[i]); + if (i != parts.length - 1) { + sb.append("."); + } + } + colName = sb.toString(); + } + if (colName.contains("\"")) { + colName = colName.replace("\"", ""); + } + Object sqlValue = conversionFunctions.get(fieldIndex).apply(record.get(colName)); + if (sqlValue != null) { + preparedStatement.setObject(fieldIndex + 1, sqlValue); + } else { + preparedStatement.setNull(fieldIndex + 1, dataTypes.get(fieldIndex).getSqlType()); + } + } + preparedStatement.execute(); + upsertListener.upsertDone(++upsertCount); + } catch (Exception e) { + if (LOG.isDebugEnabled()) { + // Even though this is an error we only log it with debug logging because we're notifying the + // listener, and it can do its own logging if needed + LOG.debug("Error on record " + record + ", fieldIndex " + fieldIndex + ", colName " + colName, e); + } + upsertListener.errorOnRecord(record, new Exception("fieldIndex: " + fieldIndex + ", colName " + colName, e)); + } + } + + @Override + public void close() throws IOException { + try { + preparedStatement.close(); + } catch (SQLException e) { + // An exception while closing the prepared statement is most likely a sign of a real problem, so we don't + // want to hide it with closeQuietly or something similar + throw new RuntimeException(e); + } + } + + @Override + protected Function<Object, Object> createConversionFunction(PDataType dataType) { + if (dataType.isArrayType()) { + return new ArrayDatatypeConversionFunction( + new ObjectToArrayConverter( + conn, + PDataType.fromTypeId(dataType.getSqlType() - PDataType.ARRAY_TYPE_BASE))); + } else { + return new SimpleDatatypeConversionFunction(dataType, this.conn); + } + } + + /** + * Performs typed conversion from String values to a given column value type. + */ + static class SimpleDatatypeConversionFunction implements Function<Object, Object> { + + private final PDataType dataType; + private final DateUtil.DateTimeParser dateTimeParser; + + SimpleDatatypeConversionFunction(PDataType dataType, Connection conn) { + Properties props; + try { + props = conn.getClientInfo(); + } catch (SQLException e) { + throw new RuntimeException(e); + } + this.dataType = dataType; + if (dataType.isCoercibleTo(PTimestamp.INSTANCE)) { + // TODO: move to DateUtil + String dateFormat; + int dateSqlType = dataType.getResultSetSqlType(); + if (dateSqlType == Types.DATE) { + dateFormat = props.getProperty(QueryServices.DATE_FORMAT_ATTRIB, + DateUtil.DEFAULT_DATE_FORMAT); + } else if (dateSqlType == Types.TIME) { + dateFormat = props.getProperty(QueryServices.TIME_FORMAT_ATTRIB, + DateUtil.DEFAULT_TIME_FORMAT); + } else { + dateFormat = props.getProperty(QueryServices.TIMESTAMP_FORMAT_ATTRIB, + DateUtil.DEFAULT_TIMESTAMP_FORMAT); + } + String timeZoneId = props.getProperty(QueryServices.DATE_FORMAT_TIMEZONE_ATTRIB, + QueryServicesOptions.DEFAULT_DATE_FORMAT_TIMEZONE); + this.dateTimeParser = DateUtil.getDateTimeParser(dateFormat, dataType, timeZoneId); + } else { + this.dateTimeParser = null; + } + } + + @Nullable + @Override + public Object apply(@Nullable Object input) { + if (input == null) { + return null; + } + if (dateTimeParser != null && input instanceof String) { + final String s = (String) input; + long epochTime = dateTimeParser.parseDateTime(s); + byte[] byteValue = new byte[dataType.getByteSize()]; + dataType.getCodec().encodeLong(epochTime, byteValue, 0); + return dataType.toObject(byteValue); + } + return dataType.toObject(input, dataType); + } + } + + /** + * Converts string representations of arrays into Phoenix arrays of the correct type. + */ + private static class ArrayDatatypeConversionFunction implements Function<Object, Object> { + + private final ObjectToArrayConverter arrayConverter; + + private ArrayDatatypeConversionFunction(ObjectToArrayConverter arrayConverter) { + this.arrayConverter = arrayConverter; + } + + @Nullable + @Override + public Object apply(@Nullable Object input) { + try { + return arrayConverter.toArray(input); + } catch (SQLException e) { + throw new RuntimeException(e); + } + } + } +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/578979a1/phoenix-core/src/main/java/org/apache/phoenix/util/json/ObjectToArrayConverter.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/json/ObjectToArrayConverter.java b/phoenix-core/src/main/java/org/apache/phoenix/util/json/ObjectToArrayConverter.java new file mode 100644 index 0000000..16bef15 --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/util/json/ObjectToArrayConverter.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.util.json; + +import java.sql.Array; +import java.sql.Connection; +import java.sql.SQLException; +import java.util.List; + +import org.apache.phoenix.schema.types.PDataType; + +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; + +/** + * Converts Objects (presumably lists) into Phoenix arrays. + */ +class ObjectToArrayConverter { + + private final Connection conn; + private final PDataType elementDataType; + private final JsonUpsertExecutor.SimpleDatatypeConversionFunction elementConvertFunction; + + /** + * Instantiate with the array value separator and data type. + * + * @param conn Phoenix connection to target database + * @param elementDataType datatype of the elements of arrays to be created + */ + public ObjectToArrayConverter(Connection conn, PDataType elementDataType) { + this.conn = conn; + this.elementDataType = elementDataType; + this.elementConvertFunction = + new JsonUpsertExecutor.SimpleDatatypeConversionFunction(elementDataType, this.conn); + } + + /** + * Convert an input delimited string into a phoenix array of the configured type. + * + * @param input string containing delimited array values + * @return the array containing the values represented in the input string + */ + public Array toArray(Object input) throws SQLException { + if (input == null) { + return conn.createArrayOf(elementDataType.getSqlTypeName(), new Object[0]); + } + List<?> list = (List<?>) input; + if (list.isEmpty()) { + return conn.createArrayOf(elementDataType.getSqlTypeName(), new Object[0]); + } + return conn.createArrayOf(elementDataType.getSqlTypeName(), + Lists.newArrayList(Iterables.transform(list, elementConvertFunction)).toArray()); + } +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/578979a1/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/BulkLoadToolTest.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/BulkLoadToolTest.java b/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/BulkLoadToolTest.java new file mode 100644 index 0000000..95e9b43 --- /dev/null +++ b/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/BulkLoadToolTest.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.mapreduce; + +import java.util.Arrays; +import java.util.Collection; + +import org.apache.commons.cli.CommandLine; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import static org.junit.Assert.assertEquals; + +@RunWith(Parameterized.class) +public class BulkLoadToolTest { + + @Parameterized.Parameters + public static Collection<Object[]> params() { + return Arrays.asList(new Object[][]{ + { new CsvBulkLoadTool() }, + { new JsonBulkLoadTool() }, + }); + } + + @Parameterized.Parameter(value = 0) + public AbstractBulkLoadTool bulkLoadTool; + + @Test + public void testParseOptions() { + CommandLine cmdLine = bulkLoadTool.parseOptions(new String[] { "--input", "/input", + "--table", "mytable" }); + + assertEquals("mytable", cmdLine.getOptionValue(CsvBulkLoadTool.TABLE_NAME_OPT.getOpt())); + assertEquals("/input", cmdLine.getOptionValue(CsvBulkLoadTool.INPUT_PATH_OPT.getOpt())); + } + + @Test(expected=IllegalStateException.class) + public void testParseOptions_ExtraArguments() { + bulkLoadTool.parseOptions(new String[] { "--input", "/input", + "--table", "mytable", "these", "shouldnt", "be", "here" }); + } + + @Test(expected=IllegalStateException.class) + public void testParseOptions_NoInput() { + bulkLoadTool.parseOptions(new String[] { "--table", "mytable" }); + } + + @Test(expected=IllegalStateException.class) + public void testParseOptions_NoTable() { + bulkLoadTool.parseOptions(new String[] { "--input", "/input" }); + } + + @Test + public void testGetQualifiedTableName() { + assertEquals("MYSCHEMA.MYTABLE", CsvBulkLoadTool.getQualifiedTableName("mySchema", "myTable")); + } + + @Test + public void testGetQualifiedTableName_NullSchema() { + assertEquals("MYTABLE", CsvBulkLoadTool.getQualifiedTableName(null, "myTable")); + } +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/578979a1/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/CsvBulkImportUtilTest.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/CsvBulkImportUtilTest.java b/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/CsvBulkImportUtilTest.java index f52a837..3c6271a 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/CsvBulkImportUtilTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/CsvBulkImportUtilTest.java @@ -17,9 +17,6 @@ */ package org.apache.phoenix.mapreduce; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNull; - import java.io.File; import java.io.FileInputStream; import java.io.FileOutputStream; @@ -29,11 +26,10 @@ import java.util.List; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.KeyValue; import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil; -import org.apache.phoenix.schema.types.PInteger; -import org.apache.phoenix.util.ColumnInfo; import org.junit.Test; -import com.google.common.collect.ImmutableList; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; public class CsvBulkImportUtilTest { @@ -41,16 +37,11 @@ public class CsvBulkImportUtilTest { public void testInitCsvImportJob() throws IOException { Configuration conf = new Configuration(); - String tableName = "SCHEMANAME.TABLENAME"; char delimiter = '\001'; char quote = '\002'; char escape = '!'; - List<ColumnInfo> columnInfoList = ImmutableList.of( - new ColumnInfo("MYCOL", PInteger.INSTANCE.getSqlType())); - - CsvBulkImportUtil.initCsvImportJob( - conf, tableName, delimiter, quote, escape, null, columnInfoList, true); + CsvBulkImportUtil.initCsvImportJob(conf, delimiter, quote, escape, null); // Serialize and deserialize the config to ensure that there aren't any issues // with non-printable characters as delimiters @@ -61,7 +52,6 @@ public class CsvBulkImportUtilTest { Configuration deserialized = new Configuration(); deserialized.addResource(new FileInputStream(tempFile)); - assertEquals(tableName, deserialized.get(CsvToKeyValueMapper.TABLE_NAME_CONFKEY)); assertEquals(Character.valueOf('\001'), CsvBulkImportUtil.getCharacter(deserialized, CsvToKeyValueMapper.FIELD_DELIMITER_CONFKEY)); assertEquals(Character.valueOf('\002'), @@ -69,8 +59,6 @@ public class CsvBulkImportUtilTest { assertEquals(Character.valueOf('!'), CsvBulkImportUtil.getCharacter(deserialized, CsvToKeyValueMapper.ESCAPE_CHAR_CONFKEY)); assertNull(deserialized.get(CsvToKeyValueMapper.ARRAY_DELIMITER_CONFKEY)); - assertEquals(columnInfoList, CsvToKeyValueMapper.buildColumnInfoList(deserialized)); - assertEquals(true, deserialized.getBoolean(CsvToKeyValueMapper.IGNORE_INVALID_ROW_CONFKEY, false)); tempFile.delete(); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/578979a1/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/CsvBulkLoadToolTest.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/CsvBulkLoadToolTest.java b/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/CsvBulkLoadToolTest.java deleted file mode 100644 index 33bb976..0000000 --- a/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/CsvBulkLoadToolTest.java +++ /dev/null @@ -1,69 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.phoenix.mapreduce; - -import org.apache.commons.cli.CommandLine; -import org.junit.Before; -import org.junit.Test; - -import static org.junit.Assert.assertEquals; - -public class CsvBulkLoadToolTest { - - private CsvBulkLoadTool bulkLoadTool; - - @Before - public void setUp() { - bulkLoadTool = new CsvBulkLoadTool(); - } - - @Test - public void testParseOptions() { - CommandLine cmdLine = bulkLoadTool.parseOptions(new String[] { "--input", "/input", - "--table", "mytable" }); - - assertEquals("mytable", cmdLine.getOptionValue(CsvBulkLoadTool.TABLE_NAME_OPT.getOpt())); - assertEquals("/input", cmdLine.getOptionValue(CsvBulkLoadTool.INPUT_PATH_OPT.getOpt())); - } - - @Test(expected=IllegalStateException.class) - public void testParseOptions_ExtraArguments() { - bulkLoadTool.parseOptions(new String[] { "--input", "/input", - "--table", "mytable", "these", "shouldnt", "be", "here" }); - } - - @Test(expected=IllegalStateException.class) - public void testParseOptions_NoInput() { - bulkLoadTool.parseOptions(new String[] { "--table", "mytable" }); - } - - @Test(expected=IllegalStateException.class) - public void testParseOptions_NoTable() { - bulkLoadTool.parseOptions(new String[] { "--input", "/input" }); - } - - @Test - public void testGetQualifiedTableName() { - assertEquals("MYSCHEMA.MYTABLE", CsvBulkLoadTool.getQualifiedTableName("mySchema", "myTable")); - } - - @Test - public void testGetQualifiedTableName_NullSchema() { - assertEquals("MYTABLE", CsvBulkLoadTool.getQualifiedTableName(null, "myTable")); - } -} http://git-wip-us.apache.org/repos/asf/phoenix/blob/578979a1/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/CsvToKeyValueMapperTest.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/CsvToKeyValueMapperTest.java b/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/CsvToKeyValueMapperTest.java index dc6f497..fe4e068 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/CsvToKeyValueMapperTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/CsvToKeyValueMapperTest.java @@ -17,25 +17,13 @@ */ package org.apache.phoenix.mapreduce; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - import java.io.IOException; -import java.util.List; import org.apache.commons.csv.CSVRecord; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.KeyValue; -import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil; -import org.apache.phoenix.schema.types.PInteger; -import org.apache.phoenix.schema.types.PIntegerArray; -import org.apache.phoenix.schema.types.PUnsignedInt; -import org.apache.phoenix.util.ColumnInfo; import org.junit.Test; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Lists; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; public class CsvToKeyValueMapperTest { @@ -62,72 +50,4 @@ public class CsvToKeyValueMapperTest { assertTrue(parsed.isConsistent()); assertEquals(1, parsed.getRecordNumber()); } - - - @Test - public void testBuildColumnInfoList() { - List<ColumnInfo> columnInfoList = ImmutableList.of( - new ColumnInfo("idCol", PInteger.INSTANCE.getSqlType()), - new ColumnInfo("unsignedIntCol", PUnsignedInt.INSTANCE.getSqlType()), - new ColumnInfo("stringArrayCol", PIntegerArray.INSTANCE.getSqlType())); - - Configuration conf = new Configuration(); - CsvToKeyValueMapper.configureColumnInfoList(conf, columnInfoList); - List<ColumnInfo> fromConfig = CsvToKeyValueMapper.buildColumnInfoList(conf); - - assertEquals(columnInfoList, fromConfig); - } - - @Test - public void testBuildColumnInfoList_ContainingNulls() { - // A null value in the column info list means "skip that column in the input" - List<ColumnInfo> columnInfoListWithNull = Lists.newArrayList( - new ColumnInfo("idCol", PInteger.INSTANCE.getSqlType()), - null, - new ColumnInfo("unsignedIntCol", PUnsignedInt.INSTANCE.getSqlType()), - new ColumnInfo("stringArrayCol", PIntegerArray.INSTANCE.getSqlType())); - - Configuration conf = new Configuration(); - CsvToKeyValueMapper.configureColumnInfoList(conf, columnInfoListWithNull); - List<ColumnInfo> fromConfig = CsvToKeyValueMapper.buildColumnInfoList(conf); - - assertEquals(columnInfoListWithNull, fromConfig); - } - - @Test - public void testLoadPreUpdateProcessor() { - Configuration conf = new Configuration(); - conf.setClass(PhoenixConfigurationUtil.UPSERT_HOOK_CLASS_CONFKEY, MockUpsertProcessor.class, - ImportPreUpsertKeyValueProcessor.class); - - ImportPreUpsertKeyValueProcessor processor = PhoenixConfigurationUtil.loadPreUpsertProcessor(conf); - assertEquals(MockUpsertProcessor.class, processor.getClass()); - } - - @Test - public void testLoadPreUpdateProcessor_NotConfigured() { - - Configuration conf = new Configuration(); - ImportPreUpsertKeyValueProcessor processor = PhoenixConfigurationUtil.loadPreUpsertProcessor(conf); - - assertEquals(CsvToKeyValueMapper.DefaultImportPreUpsertKeyValueProcessor.class, - processor.getClass()); - } - - @Test(expected=IllegalStateException.class) - public void testLoadPreUpdateProcessor_ClassNotFound() { - Configuration conf = new Configuration(); - conf.set(PhoenixConfigurationUtil.UPSERT_HOOK_CLASS_CONFKEY, "MyUndefinedClass"); - - PhoenixConfigurationUtil.loadPreUpsertProcessor(conf); - } - - - static class MockUpsertProcessor implements ImportPreUpsertKeyValueProcessor { - - @Override - public List<KeyValue> preUpsert(byte[] rowKey, List<KeyValue> keyValues) { - throw new UnsupportedOperationException("Not yet implemented"); - } - } }