http://git-wip-us.apache.org/repos/asf/hbase/blob/516d370b/hbase-spark/src/main/scala/org/apache/spark/sql/datasources/hbase/HBaseTableCatalog.scala ---------------------------------------------------------------------- diff --git a/hbase-spark/src/main/scala/org/apache/spark/sql/datasources/hbase/HBaseTableCatalog.scala b/hbase-spark/src/main/scala/org/apache/spark/sql/datasources/hbase/HBaseTableCatalog.scala deleted file mode 100644 index 65a3bc7..0000000 --- a/hbase-spark/src/main/scala/org/apache/spark/sql/datasources/hbase/HBaseTableCatalog.scala +++ /dev/null @@ -1,377 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.datasources.hbase - -import org.apache.avro.Schema -import org.apache.yetus.audience.InterfaceAudience; -import org.apache.hadoop.hbase.spark.SchemaConverters -import org.apache.hadoop.hbase.spark.datasources._ -import org.apache.hadoop.hbase.spark.hbase._ -import org.apache.hadoop.hbase.util.Bytes -import org.apache.spark.Logging -import org.apache.spark.sql.catalyst.util.DataTypeParser -import org.apache.spark.sql.types._ -import org.json4s.jackson.JsonMethods._ - -import scala.collection.mutable - -// Due the access issue defined in spark, we have to locate the file in this package. -// The definition of each column cell, which may be composite type -// TODO: add avro support -@InterfaceAudience.Private -case class Field( - colName: String, - cf: String, - col: String, - sType: Option[String] = None, - avroSchema: Option[String] = None, - serdes: Option[SerDes]= None, - len: Int = -1) extends Logging { - override def toString = s"$colName $cf $col" - val isRowKey = cf == HBaseTableCatalog.rowKey - var start: Int = _ - def schema: Option[Schema] = avroSchema.map { x => - logDebug(s"avro: $x") - val p = new Schema.Parser - p.parse(x) - } - - lazy val exeSchema = schema - - // converter from avro to catalyst structure - lazy val avroToCatalyst: Option[Any => Any] = { - schema.map(SchemaConverters.createConverterToSQL(_)) - } - - // converter from catalyst to avro - lazy val catalystToAvro: (Any) => Any ={ - SchemaConverters.createConverterToAvro(dt, colName, "recordNamespace") - } - - def cfBytes: Array[Byte] = { - if (isRowKey) { - Bytes.toBytes("") - } else { - Bytes.toBytes(cf) - } - } - def colBytes: Array[Byte] = { - if (isRowKey) { - Bytes.toBytes("key") - } else { - Bytes.toBytes(col) - } - } - - val dt = { - sType.map(DataTypeParser.parse(_)).getOrElse{ - schema.map{ x=> - SchemaConverters.toSqlType(x).dataType - }.get - } - } - - var length: Int = { - if (len == -1) { - dt match { - case BinaryType | StringType => -1 - case BooleanType => Bytes.SIZEOF_BOOLEAN - case ByteType => 1 - case DoubleType => Bytes.SIZEOF_DOUBLE - case FloatType => Bytes.SIZEOF_FLOAT - case IntegerType => Bytes.SIZEOF_INT - case LongType => Bytes.SIZEOF_LONG - case ShortType => Bytes.SIZEOF_SHORT - case _ => -1 - } - } else { - len - } - - } - - override def equals(other: Any): Boolean = other match { - case that: Field => - colName == that.colName && cf == that.cf && col == that.col - case _ => false - } -} - -// The row key definition, with each key refer to the col defined in Field, e.g., -// key1:key2:key3 -@InterfaceAudience.Private -case class RowKey(k: String) { - val keys = k.split(":") - var fields: Seq[Field] = _ - var varLength = false - def length = { - if (varLength) { - -1 - } else { - fields.foldLeft(0){case (x, y) => - x + y.length - } - } - } -} -// The map between the column presented to Spark and the HBase field -@InterfaceAudience.Private -case class SchemaMap(map: mutable.HashMap[String, Field]) { - def toFields = map.map { case (name, field) => - StructField(name, field.dt) - }.toSeq - - def fields = map.values - - def getField(name: String) = map(name) -} - - -// The definition of HBase and Relation relation schema -@InterfaceAudience.Private -case class HBaseTableCatalog( - namespace: String, - name: String, - row: RowKey, - sMap: SchemaMap, - @transient params: Map[String, String]) extends Logging { - def toDataType = StructType(sMap.toFields) - def getField(name: String) = sMap.getField(name) - def getRowKey: Seq[Field] = row.fields - def getPrimaryKey= row.keys(0) - def getColumnFamilies = { - sMap.fields.map(_.cf).filter(_ != HBaseTableCatalog.rowKey).toSeq.distinct - } - - def get(key: String) = params.get(key) - - // Setup the start and length for each dimension of row key at runtime. - def dynSetupRowKey(rowKey: Array[Byte]) { - logDebug(s"length: ${rowKey.length}") - if(row.varLength) { - var start = 0 - row.fields.foreach { f => - logDebug(s"start: $start") - f.start = start - f.length = { - // If the length is not defined - if (f.length == -1) { - f.dt match { - case StringType => - var pos = rowKey.indexOf(HBaseTableCatalog.delimiter, start) - if (pos == -1 || pos > rowKey.length) { - // this is at the last dimension - pos = rowKey.length - } - pos - start - // We don't know the length, assume it extend to the end of the rowkey. - case _ => rowKey.length - start - } - } else { - f.length - } - } - start += f.length - } - } - } - - def initRowKey = { - val fields = sMap.fields.filter(_.cf == HBaseTableCatalog.rowKey) - row.fields = row.keys.flatMap(n => fields.find(_.col == n)) - // The length is determined at run time if it is string or binary and the length is undefined. - if (row.fields.filter(_.length == -1).isEmpty) { - var start = 0 - row.fields.foreach { f => - f.start = start - start += f.length - } - } else { - row.varLength = true - } - } - initRowKey -} - -@InterfaceAudience.Public -object HBaseTableCatalog { - // If defined and larger than 3, a new table will be created with the nubmer of region specified. - val newTable = "newtable" - // The json string specifying hbase catalog information - val regionStart = "regionStart" - val defaultRegionStart = "aaaaaaa" - val regionEnd = "regionEnd" - val defaultRegionEnd = "zzzzzzz" - val tableCatalog = "catalog" - // The row key with format key1:key2 specifying table row key - val rowKey = "rowkey" - // The key for hbase table whose value specify namespace and table name - val table = "table" - // The namespace of hbase table - val nameSpace = "namespace" - // The name of hbase table - val tableName = "name" - // The name of columns in hbase catalog - val columns = "columns" - val cf = "cf" - val col = "col" - val `type` = "type" - // the name of avro schema json string - val avro = "avro" - val delimiter: Byte = 0 - val serdes = "serdes" - val length = "length" - - /** - * User provide table schema definition - * {"tablename":"name", "rowkey":"key1:key2", - * "columns":{"col1":{"cf":"cf1", "col":"col1", "type":"type1"}, - * "col2":{"cf":"cf2", "col":"col2", "type":"type2"}}} - * Note that any col in the rowKey, there has to be one corresponding col defined in columns - */ - def apply(params: Map[String, String]): HBaseTableCatalog = { - val parameters = convert(params) - // println(jString) - val jString = parameters(tableCatalog) - val map = parse(jString).values.asInstanceOf[Map[String, _]] - val tableMeta = map.get(table).get.asInstanceOf[Map[String, _]] - val nSpace = tableMeta.get(nameSpace).getOrElse("default").asInstanceOf[String] - val tName = tableMeta.get(tableName).get.asInstanceOf[String] - val cIter = map.get(columns).get.asInstanceOf[Map[String, Map[String, String]]].toIterator - val schemaMap = mutable.HashMap.empty[String, Field] - cIter.foreach { case (name, column) => - val sd = { - column.get(serdes).asInstanceOf[Option[String]].map(n => - Class.forName(n).newInstance().asInstanceOf[SerDes] - ) - } - val len = column.get(length).map(_.toInt).getOrElse(-1) - val sAvro = column.get(avro).map(parameters(_)) - val f = Field(name, column.getOrElse(cf, rowKey), - column.get(col).get, - column.get(`type`), - sAvro, sd, len) - schemaMap.+=((name, f)) - } - val rKey = RowKey(map.get(rowKey).get.asInstanceOf[String]) - HBaseTableCatalog(nSpace, tName, rKey, SchemaMap(schemaMap), parameters) - } - - val TABLE_KEY: String = "hbase.table" - val SCHEMA_COLUMNS_MAPPING_KEY: String = "hbase.columns.mapping" - - /* for backward compatibility. Convert the old definition to new json based definition formated as below - val catalog = s"""{ - |"table":{"namespace":"default", "name":"htable"}, - |"rowkey":"key1:key2", - |"columns":{ - |"col1":{"cf":"rowkey", "col":"key1", "type":"string"}, - |"col2":{"cf":"rowkey", "col":"key2", "type":"double"}, - |"col3":{"cf":"cf1", "col":"col2", "type":"binary"}, - |"col4":{"cf":"cf1", "col":"col3", "type":"timestamp"}, - |"col5":{"cf":"cf1", "col":"col4", "type":"double", "serdes":"${classOf[DoubleSerDes].getName}"}, - |"col6":{"cf":"cf1", "col":"col5", "type":"$map"}, - |"col7":{"cf":"cf1", "col":"col6", "type":"$array"}, - |"col8":{"cf":"cf1", "col":"col7", "type":"$arrayMap"} - |} - |}""".stripMargin - */ - @deprecated("Please use new json format to define HBaseCatalog") - // TODO: There is no need to deprecate since this is the first release. - def convert(parameters: Map[String, String]): Map[String, String] = { - val tableName = parameters.get(TABLE_KEY).getOrElse(null) - // if the hbase.table is not defined, we assume it is json format already. - if (tableName == null) return parameters - val schemaMappingString = parameters.getOrElse(SCHEMA_COLUMNS_MAPPING_KEY, "") - import scala.collection.JavaConverters._ - val schemaMap = generateSchemaMappingMap(schemaMappingString).asScala.map(_._2.asInstanceOf[SchemaQualifierDefinition]) - - val rowkey = schemaMap.filter { - _.columnFamily == "rowkey" - }.map(_.columnName) - val cols = schemaMap.map { x => - s""""${x.columnName}":{"cf":"${x.columnFamily}", "col":"${x.qualifier}", "type":"${x.colType}"}""".stripMargin - } - val jsonCatalog = - s"""{ - |"table":{"namespace":"default", "name":"${tableName}"}, - |"rowkey":"${rowkey.mkString(":")}", - |"columns":{ - |${cols.mkString(",")} - |} - |} - """.stripMargin - parameters ++ Map(HBaseTableCatalog.tableCatalog->jsonCatalog) - } - - /** - * Reads the SCHEMA_COLUMNS_MAPPING_KEY and converts it to a map of - * SchemaQualifierDefinitions with the original sql column name as the key - * - * @param schemaMappingString The schema mapping string from the SparkSQL map - * @return A map of definitions keyed by the SparkSQL column name - */ - @InterfaceAudience.Private - def generateSchemaMappingMap(schemaMappingString:String): - java.util.HashMap[String, SchemaQualifierDefinition] = { - println(schemaMappingString) - try { - val columnDefinitions = schemaMappingString.split(',') - val resultingMap = new java.util.HashMap[String, SchemaQualifierDefinition]() - columnDefinitions.map(cd => { - val parts = cd.trim.split(' ') - - //Make sure we get three parts - //<ColumnName> <ColumnType> <ColumnFamily:Qualifier> - if (parts.length == 3) { - val hbaseDefinitionParts = if (parts(2).charAt(0) == ':') { - Array[String]("rowkey", parts(0)) - } else { - parts(2).split(':') - } - resultingMap.put(parts(0), new SchemaQualifierDefinition(parts(0), - parts(1), hbaseDefinitionParts(0), hbaseDefinitionParts(1))) - } else { - throw new IllegalArgumentException("Invalid value for schema mapping '" + cd + - "' should be '<columnName> <columnType> <columnFamily>:<qualifier>' " + - "for columns and '<columnName> <columnType> :<qualifier>' for rowKeys") - } - }) - resultingMap - } catch { - case e:Exception => throw - new IllegalArgumentException("Invalid value for " + SCHEMA_COLUMNS_MAPPING_KEY + - " '" + - schemaMappingString + "'", e ) - } - } -} - -/** - * Construct to contains column data that spend SparkSQL and HBase - * - * @param columnName SparkSQL column name - * @param colType SparkSQL column type - * @param columnFamily HBase column family - * @param qualifier HBase qualifier name - */ -@InterfaceAudience.Private -case class SchemaQualifierDefinition(columnName:String, - colType:String, - columnFamily:String, - qualifier:String)
http://git-wip-us.apache.org/repos/asf/hbase/blob/516d370b/hbase-spark/src/main/scala/org/apache/spark/sql/datasources/hbase/Utils.scala ---------------------------------------------------------------------- diff --git a/hbase-spark/src/main/scala/org/apache/spark/sql/datasources/hbase/Utils.scala b/hbase-spark/src/main/scala/org/apache/spark/sql/datasources/hbase/Utils.scala deleted file mode 100644 index 36b8bbf..0000000 --- a/hbase-spark/src/main/scala/org/apache/spark/sql/datasources/hbase/Utils.scala +++ /dev/null @@ -1,100 +0,0 @@ - -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.datasources.hbase - -import org.apache.hadoop.hbase.spark.AvroSerdes -import org.apache.hadoop.hbase.util.Bytes -import org.apache.spark.sql.execution.SparkSqlSerializer -import org.apache.spark.sql.types._ -import org.apache.spark.unsafe.types.UTF8String - -import org.apache.yetus.audience.InterfaceAudience; - -@InterfaceAudience.Private -object Utils { - - - /** - * Parses the hbase field to it's corresponding - * scala type which can then be put into a Spark GenericRow - * which is then automatically converted by Spark. - */ - def hbaseFieldToScalaType( - f: Field, - src: Array[Byte], - offset: Int, - length: Int): Any = { - if (f.exeSchema.isDefined) { - // If we have avro schema defined, use it to get record, and then convert them to catalyst data type - val m = AvroSerdes.deserialize(src, f.exeSchema.get) - val n = f.avroToCatalyst.map(_(m)) - n.get - } else { - // Fall back to atomic type - f.dt match { - case BooleanType => toBoolean(src, offset) - case ByteType => src(offset) - case DoubleType => Bytes.toDouble(src, offset) - case FloatType => Bytes.toFloat(src, offset) - case IntegerType => Bytes.toInt(src, offset) - case LongType|TimestampType => Bytes.toLong(src, offset) - case ShortType => Bytes.toShort(src, offset) - case StringType => toUTF8String(src, offset, length) - case BinaryType => - val newArray = new Array[Byte](length) - System.arraycopy(src, offset, newArray, 0, length) - newArray - // TODO: add more data type support - case _ => SparkSqlSerializer.deserialize[Any](src) - } - } - } - - // convert input to data type - def toBytes(input: Any, field: Field): Array[Byte] = { - if (field.schema.isDefined) { - // Here we assume the top level type is structType - val record = field.catalystToAvro(input) - AvroSerdes.serialize(record, field.schema.get) - } else { - input match { - case data: Boolean => Bytes.toBytes(data) - case data: Byte => Array(data) - case data: Array[Byte] => data - case data: Double => Bytes.toBytes(data) - case data: Float => Bytes.toBytes(data) - case data: Int => Bytes.toBytes(data) - case data: Long => Bytes.toBytes(data) - case data: Short => Bytes.toBytes(data) - case data: UTF8String => data.getBytes - case data: String => Bytes.toBytes(data) - // TODO: add more data type support - case _ => throw new Exception(s"unsupported data type ${field.dt}") - } - } - } - - def toBoolean(input: Array[Byte], offset: Int): Boolean = { - input(offset) != 0 - } - - def toUTF8String(input: Array[Byte], offset: Int, length: Int): UTF8String = { - UTF8String.fromBytes(input.slice(offset, offset + length)) - } -} http://git-wip-us.apache.org/repos/asf/hbase/blob/516d370b/hbase-spark/src/test/java/org/apache/hadoop/hbase/spark/TestJavaHBaseContext.java ---------------------------------------------------------------------- diff --git a/hbase-spark/src/test/java/org/apache/hadoop/hbase/spark/TestJavaHBaseContext.java b/hbase-spark/src/test/java/org/apache/hadoop/hbase/spark/TestJavaHBaseContext.java deleted file mode 100644 index e383b5e..0000000 --- a/hbase-spark/src/test/java/org/apache/hadoop/hbase/spark/TestJavaHBaseContext.java +++ /dev/null @@ -1,520 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.spark; - -import java.io.File; -import java.io.IOException; -import java.io.Serializable; -import java.util.ArrayList; -import java.util.Iterator; -import java.util.HashMap; -import java.util.List; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.CellUtil; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.HBaseTestingUtility; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.Admin; -import org.apache.hadoop.hbase.client.Connection; -import org.apache.hadoop.hbase.client.ConnectionFactory; -import org.apache.hadoop.hbase.client.Delete; -import org.apache.hadoop.hbase.client.Get; -import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.client.Table; -import org.apache.hadoop.hbase.io.ImmutableBytesWritable; -import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles; -import org.apache.hadoop.hbase.spark.example.hbasecontext.JavaHBaseBulkDeleteExample; -import org.apache.hadoop.hbase.testclassification.MediumTests; -import org.apache.hadoop.hbase.testclassification.MiscTests; -import org.apache.hadoop.hbase.util.Bytes; - -import org.apache.hadoop.hbase.util.Pair; -import org.apache.spark.api.java.JavaRDD; -import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.api.java.function.Function; - -import org.junit.After; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; -import org.junit.experimental.categories.Category; - -import scala.Tuple2; -import org.apache.hadoop.hbase.shaded.com.google.common.io.Files; - -@Category({MiscTests.class, MediumTests.class}) -public class TestJavaHBaseContext implements Serializable { - private transient JavaSparkContext jsc; - HBaseTestingUtility htu; - protected static final Log LOG = LogFactory.getLog(TestJavaHBaseContext.class); - - - - byte[] tableName = Bytes.toBytes("t1"); - byte[] columnFamily = Bytes.toBytes("c"); - byte[] columnFamily1 = Bytes.toBytes("d"); - String columnFamilyStr = Bytes.toString(columnFamily); - String columnFamilyStr1 = Bytes.toString(columnFamily1); - - - @Before - public void setUp() { - jsc = new JavaSparkContext("local", "JavaHBaseContextSuite"); - - File tempDir = Files.createTempDir(); - tempDir.deleteOnExit(); - - htu = new HBaseTestingUtility(); - try { - LOG.info("cleaning up test dir"); - - htu.cleanupTestDir(); - - LOG.info("starting minicluster"); - - htu.startMiniZKCluster(); - htu.startMiniHBaseCluster(1, 1); - - LOG.info(" - minicluster started"); - - try { - htu.deleteTable(TableName.valueOf(tableName)); - } catch (Exception e) { - LOG.info(" - no table " + Bytes.toString(tableName) + " found"); - } - - LOG.info(" - creating table " + Bytes.toString(tableName)); - htu.createTable(TableName.valueOf(tableName), - new byte[][]{columnFamily, columnFamily1}); - LOG.info(" - created table"); - } catch (Exception e1) { - throw new RuntimeException(e1); - } - } - - @After - public void tearDown() { - try { - htu.deleteTable(TableName.valueOf(tableName)); - LOG.info("shuting down minicluster"); - htu.shutdownMiniHBaseCluster(); - htu.shutdownMiniZKCluster(); - LOG.info(" - minicluster shut down"); - htu.cleanupTestDir(); - } catch (Exception e) { - throw new RuntimeException(e); - } - jsc.stop(); - jsc = null; - } - - @Test - public void testBulkPut() throws IOException { - - List<String> list = new ArrayList<>(5); - list.add("1," + columnFamilyStr + ",a,1"); - list.add("2," + columnFamilyStr + ",a,2"); - list.add("3," + columnFamilyStr + ",a,3"); - list.add("4," + columnFamilyStr + ",a,4"); - list.add("5," + columnFamilyStr + ",a,5"); - - JavaRDD<String> rdd = jsc.parallelize(list); - - Configuration conf = htu.getConfiguration(); - - JavaHBaseContext hbaseContext = new JavaHBaseContext(jsc, conf); - - Connection conn = ConnectionFactory.createConnection(conf); - Table table = conn.getTable(TableName.valueOf(tableName)); - - try { - List<Delete> deletes = new ArrayList<>(5); - for (int i = 1; i < 6; i++) { - deletes.add(new Delete(Bytes.toBytes(Integer.toString(i)))); - } - table.delete(deletes); - } finally { - table.close(); - } - - hbaseContext.bulkPut(rdd, - TableName.valueOf(tableName), - new PutFunction()); - - table = conn.getTable(TableName.valueOf(tableName)); - - try { - Result result1 = table.get(new Get(Bytes.toBytes("1"))); - Assert.assertNotNull("Row 1 should had been deleted", result1.getRow()); - - Result result2 = table.get(new Get(Bytes.toBytes("2"))); - Assert.assertNotNull("Row 2 should had been deleted", result2.getRow()); - - Result result3 = table.get(new Get(Bytes.toBytes("3"))); - Assert.assertNotNull("Row 3 should had been deleted", result3.getRow()); - - Result result4 = table.get(new Get(Bytes.toBytes("4"))); - Assert.assertNotNull("Row 4 should had been deleted", result4.getRow()); - - Result result5 = table.get(new Get(Bytes.toBytes("5"))); - Assert.assertNotNull("Row 5 should had been deleted", result5.getRow()); - } finally { - table.close(); - conn.close(); - } - } - - public static class PutFunction implements Function<String, Put> { - - private static final long serialVersionUID = 1L; - - public Put call(String v) throws Exception { - String[] cells = v.split(","); - Put put = new Put(Bytes.toBytes(cells[0])); - - put.addColumn(Bytes.toBytes(cells[1]), Bytes.toBytes(cells[2]), - Bytes.toBytes(cells[3])); - return put; - } - } - - @Test - public void testBulkDelete() throws IOException { - List<byte[]> list = new ArrayList<>(3); - list.add(Bytes.toBytes("1")); - list.add(Bytes.toBytes("2")); - list.add(Bytes.toBytes("3")); - - JavaRDD<byte[]> rdd = jsc.parallelize(list); - - Configuration conf = htu.getConfiguration(); - - populateTableWithMockData(conf, TableName.valueOf(tableName)); - - JavaHBaseContext hbaseContext = new JavaHBaseContext(jsc, conf); - - hbaseContext.bulkDelete(rdd, TableName.valueOf(tableName), - new JavaHBaseBulkDeleteExample.DeleteFunction(), 2); - - - - try ( - Connection conn = ConnectionFactory.createConnection(conf); - Table table = conn.getTable(TableName.valueOf(tableName)) - ){ - Result result1 = table.get(new Get(Bytes.toBytes("1"))); - Assert.assertNull("Row 1 should had been deleted", result1.getRow()); - - Result result2 = table.get(new Get(Bytes.toBytes("2"))); - Assert.assertNull("Row 2 should had been deleted", result2.getRow()); - - Result result3 = table.get(new Get(Bytes.toBytes("3"))); - Assert.assertNull("Row 3 should had been deleted", result3.getRow()); - - Result result4 = table.get(new Get(Bytes.toBytes("4"))); - Assert.assertNotNull("Row 4 should had been deleted", result4.getRow()); - - Result result5 = table.get(new Get(Bytes.toBytes("5"))); - Assert.assertNotNull("Row 5 should had been deleted", result5.getRow()); - } - } - - @Test - public void testDistributedScan() throws IOException { - Configuration conf = htu.getConfiguration(); - - populateTableWithMockData(conf, TableName.valueOf(tableName)); - - JavaHBaseContext hbaseContext = new JavaHBaseContext(jsc, conf); - - Scan scan = new Scan(); - scan.setCaching(100); - - JavaRDD<String> javaRdd = - hbaseContext.hbaseRDD(TableName.valueOf(tableName), scan) - .map(new ScanConvertFunction()); - - List<String> results = javaRdd.collect(); - - Assert.assertEquals(results.size(), 5); - } - - private static class ScanConvertFunction implements - Function<Tuple2<ImmutableBytesWritable, Result>, String> { - @Override - public String call(Tuple2<ImmutableBytesWritable, Result> v1) throws Exception { - return Bytes.toString(v1._1().copyBytes()); - } - } - - @Test - public void testBulkGet() throws IOException { - List<byte[]> list = new ArrayList<>(5); - list.add(Bytes.toBytes("1")); - list.add(Bytes.toBytes("2")); - list.add(Bytes.toBytes("3")); - list.add(Bytes.toBytes("4")); - list.add(Bytes.toBytes("5")); - - JavaRDD<byte[]> rdd = jsc.parallelize(list); - - Configuration conf = htu.getConfiguration(); - - populateTableWithMockData(conf, TableName.valueOf(tableName)); - - JavaHBaseContext hbaseContext = new JavaHBaseContext(jsc, conf); - - final JavaRDD<String> stringJavaRDD = - hbaseContext.bulkGet(TableName.valueOf(tableName), 2, rdd, - new GetFunction(), - new ResultFunction()); - - Assert.assertEquals(stringJavaRDD.count(), 5); - } - - @Test - public void testBulkLoad() throws Exception { - - Path output = htu.getDataTestDir("testBulkLoad"); - // Add cell as String: "row,falmily,qualifier,value" - List<String> list= new ArrayList<String>(); - // row1 - list.add("1," + columnFamilyStr + ",b,1"); - // row3 - list.add("3," + columnFamilyStr + ",a,2"); - list.add("3," + columnFamilyStr + ",b,1"); - list.add("3," + columnFamilyStr1 + ",a,1"); - //row2 - list.add("2," + columnFamilyStr + ",a,3"); - list.add("2," + columnFamilyStr + ",b,3"); - - JavaRDD<String> rdd = jsc.parallelize(list); - - Configuration conf = htu.getConfiguration(); - JavaHBaseContext hbaseContext = new JavaHBaseContext(jsc, conf); - - - - hbaseContext.bulkLoad(rdd, TableName.valueOf(tableName), new BulkLoadFunction(), output.toUri().getPath(), - new HashMap<byte[], FamilyHFileWriteOptions>(), false, HConstants.DEFAULT_MAX_FILE_SIZE); - - try (Connection conn = ConnectionFactory.createConnection(conf); Admin admin = conn.getAdmin()) { - Table table = conn.getTable(TableName.valueOf(tableName)); - // Do bulk load - LoadIncrementalHFiles load = new LoadIncrementalHFiles(conf); - load.doBulkLoad(output, admin, table, conn.getRegionLocator(TableName.valueOf(tableName))); - - - - // Check row1 - List<Cell> cell1 = table.get(new Get(Bytes.toBytes("1"))).listCells(); - Assert.assertEquals(cell1.size(), 1); - Assert.assertEquals(Bytes.toString(CellUtil.cloneFamily(cell1.get(0))), columnFamilyStr); - Assert.assertEquals(Bytes.toString(CellUtil.cloneQualifier(cell1.get(0))), "b"); - Assert.assertEquals(Bytes.toString(CellUtil.cloneValue(cell1.get(0))), "1"); - - // Check row3 - List<Cell> cell3 = table.get(new Get(Bytes.toBytes("3"))).listCells(); - Assert.assertEquals(cell3.size(), 3); - Assert.assertEquals(Bytes.toString(CellUtil.cloneFamily(cell3.get(0))), columnFamilyStr); - Assert.assertEquals(Bytes.toString(CellUtil.cloneQualifier(cell3.get(0))), "a"); - Assert.assertEquals(Bytes.toString(CellUtil.cloneValue(cell3.get(0))), "2"); - Assert.assertEquals(Bytes.toString(CellUtil.cloneFamily(cell3.get(1))), columnFamilyStr); - Assert.assertEquals(Bytes.toString(CellUtil.cloneQualifier(cell3.get(1))), "b"); - Assert.assertEquals(Bytes.toString(CellUtil.cloneValue(cell3.get(1))), "1"); - Assert.assertEquals(Bytes.toString(CellUtil.cloneFamily(cell3.get(2))), columnFamilyStr1); - Assert.assertEquals(Bytes.toString(CellUtil.cloneQualifier(cell3.get(2))), "a"); - Assert.assertEquals(Bytes.toString(CellUtil.cloneValue(cell3.get(2))), "1"); - - // Check row2 - List<Cell> cell2 = table.get(new Get(Bytes.toBytes("2"))).listCells(); - Assert.assertEquals(cell2.size(), 2); - Assert.assertEquals(Bytes.toString(CellUtil.cloneFamily(cell2.get(0))), columnFamilyStr); - Assert.assertEquals(Bytes.toString(CellUtil.cloneQualifier(cell2.get(0))), "a"); - Assert.assertEquals(Bytes.toString(CellUtil.cloneValue(cell2.get(0))), "3"); - Assert.assertEquals(Bytes.toString(CellUtil.cloneFamily(cell2.get(1))), columnFamilyStr); - Assert.assertEquals(Bytes.toString(CellUtil.cloneQualifier(cell2.get(1))), "b"); - Assert.assertEquals(Bytes.toString(CellUtil.cloneValue(cell2.get(1))), "3"); - } - } - - @Test - public void testBulkLoadThinRows() throws Exception { - Path output = htu.getDataTestDir("testBulkLoadThinRows"); - // because of the limitation of scala bulkLoadThinRows API - // we need to provide data as <row, all cells in that row> - List<List<String>> list= new ArrayList<List<String>>(); - // row1 - List<String> list1 = new ArrayList<String>(); - list1.add("1," + columnFamilyStr + ",b,1"); - list.add(list1); - // row3 - List<String> list3 = new ArrayList<String>(); - list3.add("3," + columnFamilyStr + ",a,2"); - list3.add("3," + columnFamilyStr + ",b,1"); - list3.add("3," + columnFamilyStr1 + ",a,1"); - list.add(list3); - //row2 - List<String> list2 = new ArrayList<String>(); - list2.add("2," + columnFamilyStr + ",a,3"); - list2.add("2," + columnFamilyStr + ",b,3"); - list.add(list2); - - JavaRDD<List<String>> rdd = jsc.parallelize(list); - - Configuration conf = htu.getConfiguration(); - JavaHBaseContext hbaseContext = new JavaHBaseContext(jsc, conf); - - hbaseContext.bulkLoadThinRows(rdd, TableName.valueOf(tableName), new BulkLoadThinRowsFunction(), output.toString(), - new HashMap<byte[], FamilyHFileWriteOptions>(), false, HConstants.DEFAULT_MAX_FILE_SIZE); - - - try (Connection conn = ConnectionFactory.createConnection(conf); Admin admin = conn.getAdmin()) { - Table table = conn.getTable(TableName.valueOf(tableName)); - // Do bulk load - LoadIncrementalHFiles load = new LoadIncrementalHFiles(conf); - load.doBulkLoad(output, admin, table, conn.getRegionLocator(TableName.valueOf(tableName))); - - // Check row1 - List<Cell> cell1 = table.get(new Get(Bytes.toBytes("1"))).listCells(); - Assert.assertEquals(cell1.size(), 1); - Assert.assertEquals(Bytes.toString(CellUtil.cloneFamily(cell1.get(0))), columnFamilyStr); - Assert.assertEquals(Bytes.toString(CellUtil.cloneQualifier(cell1.get(0))), "b"); - Assert.assertEquals(Bytes.toString(CellUtil.cloneValue(cell1.get(0))), "1"); - - // Check row3 - List<Cell> cell3 = table.get(new Get(Bytes.toBytes("3"))).listCells(); - Assert.assertEquals(cell3.size(), 3); - Assert.assertEquals(Bytes.toString(CellUtil.cloneFamily(cell3.get(0))), columnFamilyStr); - Assert.assertEquals(Bytes.toString(CellUtil.cloneQualifier(cell3.get(0))), "a"); - Assert.assertEquals(Bytes.toString(CellUtil.cloneValue(cell3.get(0))), "2"); - Assert.assertEquals(Bytes.toString(CellUtil.cloneFamily(cell3.get(1))), columnFamilyStr); - Assert.assertEquals(Bytes.toString(CellUtil.cloneQualifier(cell3.get(1))), "b"); - Assert.assertEquals(Bytes.toString(CellUtil.cloneValue(cell3.get(1))), "1"); - Assert.assertEquals(Bytes.toString(CellUtil.cloneFamily(cell3.get(2))), columnFamilyStr1); - Assert.assertEquals(Bytes.toString(CellUtil.cloneQualifier(cell3.get(2))), "a"); - Assert.assertEquals(Bytes.toString(CellUtil.cloneValue(cell3.get(2))), "1"); - - // Check row2 - List<Cell> cell2 = table.get(new Get(Bytes.toBytes("2"))).listCells(); - Assert.assertEquals(cell2.size(), 2); - Assert.assertEquals(Bytes.toString(CellUtil.cloneFamily(cell2.get(0))), columnFamilyStr); - Assert.assertEquals(Bytes.toString(CellUtil.cloneQualifier(cell2.get(0))), "a"); - Assert.assertEquals(Bytes.toString(CellUtil.cloneValue(cell2.get(0))), "3"); - Assert.assertEquals(Bytes.toString(CellUtil.cloneFamily(cell2.get(1))), columnFamilyStr); - Assert.assertEquals(Bytes.toString(CellUtil.cloneQualifier(cell2.get(1))), "b"); - Assert.assertEquals(Bytes.toString(CellUtil.cloneValue(cell2.get(1))), "3"); - } - - } - public static class BulkLoadFunction implements Function<String, Pair<KeyFamilyQualifier, byte[]>> { - - @Override public Pair<KeyFamilyQualifier, byte[]> call(String v1) throws Exception { - if (v1 == null) - return null; - String[] strs = v1.split(","); - if(strs.length != 4) - return null; - KeyFamilyQualifier kfq = new KeyFamilyQualifier(Bytes.toBytes(strs[0]), Bytes.toBytes(strs[1]), - Bytes.toBytes(strs[2])); - return new Pair(kfq, Bytes.toBytes(strs[3])); - } - } - - public static class BulkLoadThinRowsFunction implements Function<List<String>, Pair<ByteArrayWrapper, FamiliesQualifiersValues>> { - - @Override public Pair<ByteArrayWrapper, FamiliesQualifiersValues> call(List<String> list) throws Exception { - if (list == null) - return null; - ByteArrayWrapper rowKey = null; - FamiliesQualifiersValues fqv = new FamiliesQualifiersValues(); - for (String cell : list) { - String[] strs = cell.split(","); - if (rowKey == null) { - rowKey = new ByteArrayWrapper(Bytes.toBytes(strs[0])); - } - fqv.add(Bytes.toBytes(strs[1]), Bytes.toBytes(strs[2]), Bytes.toBytes(strs[3])); - } - return new Pair(rowKey, fqv); - } - } - - public static class GetFunction implements Function<byte[], Get> { - - private static final long serialVersionUID = 1L; - - public Get call(byte[] v) throws Exception { - return new Get(v); - } - } - - public static class ResultFunction implements Function<Result, String> { - - private static final long serialVersionUID = 1L; - - public String call(Result result) throws Exception { - Iterator<Cell> it = result.listCells().iterator(); - StringBuilder b = new StringBuilder(); - - b.append(Bytes.toString(result.getRow())).append(":"); - - while (it.hasNext()) { - Cell cell = it.next(); - String q = Bytes.toString(CellUtil.cloneQualifier(cell)); - if ("counter".equals(q)) { - b.append("(") - .append(q) - .append(",") - .append(Bytes.toLong(CellUtil.cloneValue(cell))) - .append(")"); - } else { - b.append("(") - .append(q) - .append(",") - .append(Bytes.toString(CellUtil.cloneValue(cell))) - .append(")"); - } - } - return b.toString(); - } - } - - private void populateTableWithMockData(Configuration conf, TableName tableName) - throws IOException { - try ( - Connection conn = ConnectionFactory.createConnection(conf); - Table table = conn.getTable(tableName)) { - - List<Put> puts = new ArrayList<>(5); - - for (int i = 1; i < 6; i++) { - Put put = new Put(Bytes.toBytes(Integer.toString(i))); - put.addColumn(columnFamily, columnFamily, columnFamily); - puts.add(put); - } - table.put(puts); - } - } - -} http://git-wip-us.apache.org/repos/asf/hbase/blob/516d370b/hbase-spark/src/test/resources/hbase-site.xml ---------------------------------------------------------------------- diff --git a/hbase-spark/src/test/resources/hbase-site.xml b/hbase-spark/src/test/resources/hbase-site.xml deleted file mode 100644 index b3fb0d9..0000000 --- a/hbase-spark/src/test/resources/hbase-site.xml +++ /dev/null @@ -1,157 +0,0 @@ -<?xml version="1.0"?> -<?xml-stylesheet type="text/xsl" href="configuration.xsl"?> -<!-- -/** - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ ---> -<configuration> - <property> - <name>hbase.regionserver.msginterval</name> - <value>1000</value> - <description>Interval between messages from the RegionServer to HMaster - in milliseconds. Default is 15. Set this value low if you want unit - tests to be responsive. - </description> - </property> - <property> - <name>hbase.defaults.for.version.skip</name> - <value>true</value> - </property> - <property> - <name>hbase.server.thread.wakefrequency</name> - <value>1000</value> - <description>Time to sleep in between searches for work (in milliseconds). - Used as sleep interval by service threads such as hbase:meta scanner and log roller. - </description> - </property> - <property> - <name>hbase.master.event.waiting.time</name> - <value>50</value> - <description>Time to sleep between checks to see if a table event took place. - </description> - </property> - <property> - <name>hbase.regionserver.handler.count</name> - <value>5</value> - </property> - <property> - <name>hbase.regionserver.metahandler.count</name> - <value>5</value> - </property> - <property> - <name>hbase.ipc.server.read.threadpool.size</name> - <value>3</value> - </property> - <property> - <name>hbase.master.info.port</name> - <value>-1</value> - <description>The port for the hbase master web UI - Set to -1 if you do not want the info server to run. - </description> - </property> - <property> - <name>hbase.master.port</name> - <value>0</value> - <description>Always have masters and regionservers come up on port '0' so we don't clash over - default ports. - </description> - </property> - <property> - <name>hbase.regionserver.port</name> - <value>0</value> - <description>Always have masters and regionservers come up on port '0' so we don't clash over - default ports. - </description> - </property> - <property> - <name>hbase.ipc.client.fallback-to-simple-auth-allowed</name> - <value>true</value> - </property> - - <property> - <name>hbase.regionserver.info.port</name> - <value>-1</value> - <description>The port for the hbase regionserver web UI - Set to -1 if you do not want the info server to run. - </description> - </property> - <property> - <name>hbase.regionserver.info.port.auto</name> - <value>true</value> - <description>Info server auto port bind. Enables automatic port - search if hbase.regionserver.info.port is already in use. - Enabled for testing to run multiple tests on one machine. - </description> - </property> - <property> - <name>hbase.regionserver.safemode</name> - <value>false</value> - <description> - Turn on/off safe mode in region server. Always on for production, always off - for tests. - </description> - </property> - <property> - <name>hbase.hregion.max.filesize</name> - <value>67108864</value> - <description> - Maximum desired file size for an HRegion. If filesize exceeds - value + (value / 2), the HRegion is split in two. Default: 256M. - - Keep the maximum filesize small so we split more often in tests. - </description> - </property> - <property> - <name>hadoop.log.dir</name> - <value>${user.dir}/../logs</value> - </property> - <property> - <name>hbase.zookeeper.property.clientPort</name> - <value>21818</value> - <description>Property from ZooKeeper's config zoo.cfg. - The port at which the clients will connect. - </description> - </property> - <property> - <name>hbase.defaults.for.version.skip</name> - <value>true</value> - <description> - Set to true to skip the 'hbase.defaults.for.version'. - Setting this to true can be useful in contexts other than - the other side of a maven generation; i.e. running in an - ide. You'll want to set this boolean to true to avoid - seeing the RuntimeException complaint: "hbase-default.xml file - seems to be for and old version of HBase (@@@VERSION@@@), this - version is X.X.X-SNAPSHOT" - </description> - </property> - <property> - <name>hbase.table.sanity.checks</name> - <value>false</value> - <description>Skip sanity checks in tests - </description> - </property> - <property> - <name>hbase.procedure.fail.on.corruption</name> - <value>true</value> - <description> - Enable replay sanity checks on procedure tests. - </description> - </property> -</configuration> http://git-wip-us.apache.org/repos/asf/hbase/blob/516d370b/hbase-spark/src/test/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/hbase-spark/src/test/resources/log4j.properties b/hbase-spark/src/test/resources/log4j.properties deleted file mode 100644 index cd3b8e9..0000000 --- a/hbase-spark/src/test/resources/log4j.properties +++ /dev/null @@ -1,76 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -# Define some default values that can be overridden by system properties -hbase.root.logger=INFO,FA -hbase.log.dir=. -hbase.log.file=hbase.log - -# Define the root logger to the system property "hbase.root.logger". -log4j.rootLogger=${hbase.root.logger} - -# Logging Threshold -log4j.threshold=ALL - -# -# Daily Rolling File Appender -# -log4j.appender.DRFA=org.apache.log4j.DailyRollingFileAppender -log4j.appender.DRFA.File=${hbase.log.dir}/${hbase.log.file} - -# Rollver at midnight -log4j.appender.DRFA.DatePattern=.yyyy-MM-dd - -# 30-day backup -#log4j.appender.DRFA.MaxBackupIndex=30 -log4j.appender.DRFA.layout=org.apache.log4j.PatternLayout -# Debugging Pattern format -log4j.appender.DRFA.layout.ConversionPattern=%d{ISO8601} %-5p [%t] %C{2}(%L): %m%n - - -# -# console -# Add "console" to rootlogger above if you want to use this -# -log4j.appender.console=org.apache.log4j.ConsoleAppender -log4j.appender.console.target=System.err -log4j.appender.console.layout=org.apache.log4j.PatternLayout -log4j.appender.console.layout.ConversionPattern=%d{ISO8601} %-5p [%t] %C{2}(%L): %m%n - -#File Appender -log4j.appender.FA=org.apache.log4j.FileAppender -log4j.appender.FA.append=false -log4j.appender.FA.file=target/log-output.txt -log4j.appender.FA.layout=org.apache.log4j.PatternLayout -log4j.appender.FA.layout.ConversionPattern=%d{ISO8601} %-5p [%t] %C{2}(%L): %m%n -log4j.appender.FA.Threshold = INFO - -# Custom Logging levels - -#log4j.logger.org.apache.hadoop.fs.FSNamesystem=DEBUG - -log4j.logger.org.apache.hadoop=WARN -log4j.logger.org.apache.zookeeper=ERROR -log4j.logger.org.apache.hadoop.hbase=DEBUG - -#These settings are workarounds against spurious logs from the minicluster. -#See HBASE-4709 -log4j.logger.org.apache.hadoop.metrics2.impl.MetricsConfig=WARN -log4j.logger.org.apache.hadoop.metrics2.impl.MetricsSinkAdapter=WARN -log4j.logger.org.apache.hadoop.metrics2.impl.MetricsSystemImpl=WARN -log4j.logger.org.apache.hadoop.metrics2.util.MBeans=WARN -# Enable this to get detailed connection error/retry logging. -# log4j.logger.org.apache.hadoop.hbase.client.ConnectionImplementation=TRACE