http://git-wip-us.apache.org/repos/asf/hbase/blob/516d370b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/SchemaConverters.scala ---------------------------------------------------------------------- diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/SchemaConverters.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/SchemaConverters.scala deleted file mode 100644 index 9eeabc5..0000000 --- a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/SchemaConverters.scala +++ /dev/null @@ -1,430 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hbase.spark - -import java.io.ByteArrayInputStream -import java.nio.ByteBuffer -import java.sql.Timestamp -import java.util -import java.util.HashMap - -import org.apache.avro.SchemaBuilder.BaseFieldTypeBuilder -import org.apache.avro.SchemaBuilder.BaseTypeBuilder -import org.apache.avro.SchemaBuilder.FieldAssembler -import org.apache.avro.SchemaBuilder.FieldDefault -import org.apache.avro.SchemaBuilder.RecordBuilder -import org.apache.avro.io._ -import org.apache.commons.io.output.ByteArrayOutputStream -import org.apache.yetus.audience.InterfaceAudience; -import org.apache.hadoop.hbase.util.Bytes - -import scala.collection.JavaConversions._ - -import org.apache.avro.{SchemaBuilder, Schema} -import org.apache.avro.Schema.Type._ -import org.apache.avro.generic.GenericData.{Record, Fixed} -import org.apache.avro.generic.{GenericDatumReader, GenericDatumWriter, GenericData, GenericRecord} -import org.apache.spark.sql.Row -import org.apache.spark.sql.types._ - -import scala.collection.immutable.Map - -@InterfaceAudience.Private -abstract class AvroException(msg: String) extends Exception(msg) - -@InterfaceAudience.Private -case class SchemaConversionException(msg: String) extends AvroException(msg) - -/*** - * On top level, the converters provide three high level interface. - * 1. toSqlType: This function takes an avro schema and returns a sql schema. - * 2. createConverterToSQL: Returns a function that is used to convert avro types to their - * corresponding sparkSQL representations. - * 3. convertTypeToAvro: This function constructs converter function for a given sparkSQL - * datatype. This is used in writing Avro records out to disk - */ -@InterfaceAudience.Private -object SchemaConverters { - - case class SchemaType(dataType: DataType, nullable: Boolean) - - /** - * This function takes an avro schema and returns a sql schema. - */ - def toSqlType(avroSchema: Schema): SchemaType = { - avroSchema.getType match { - case INT => SchemaType(IntegerType, nullable = false) - case STRING => SchemaType(StringType, nullable = false) - case BOOLEAN => SchemaType(BooleanType, nullable = false) - case BYTES => SchemaType(BinaryType, nullable = false) - case DOUBLE => SchemaType(DoubleType, nullable = false) - case FLOAT => SchemaType(FloatType, nullable = false) - case LONG => SchemaType(LongType, nullable = false) - case FIXED => SchemaType(BinaryType, nullable = false) - case ENUM => SchemaType(StringType, nullable = false) - - case RECORD => - val fields = avroSchema.getFields.map { f => - val schemaType = toSqlType(f.schema()) - StructField(f.name, schemaType.dataType, schemaType.nullable) - } - - SchemaType(StructType(fields), nullable = false) - - case ARRAY => - val schemaType = toSqlType(avroSchema.getElementType) - SchemaType( - ArrayType(schemaType.dataType, containsNull = schemaType.nullable), - nullable = false) - - case MAP => - val schemaType = toSqlType(avroSchema.getValueType) - SchemaType( - MapType(StringType, schemaType.dataType, valueContainsNull = schemaType.nullable), - nullable = false) - - case UNION => - if (avroSchema.getTypes.exists(_.getType == NULL)) { - // In case of a union with null, eliminate it and make a recursive call - val remainingUnionTypes = avroSchema.getTypes.filterNot(_.getType == NULL) - if (remainingUnionTypes.size == 1) { - toSqlType(remainingUnionTypes.get(0)).copy(nullable = true) - } else { - toSqlType(Schema.createUnion(remainingUnionTypes)).copy(nullable = true) - } - } else avroSchema.getTypes.map(_.getType) match { - case Seq(t1, t2) if Set(t1, t2) == Set(INT, LONG) => - SchemaType(LongType, nullable = false) - case Seq(t1, t2) if Set(t1, t2) == Set(FLOAT, DOUBLE) => - SchemaType(DoubleType, nullable = false) - case other => throw new SchemaConversionException( - s"This mix of union types is not supported: $other") - } - - case other => throw new SchemaConversionException(s"Unsupported type $other") - } - } - - /** - * This function converts sparkSQL StructType into avro schema. This method uses two other - * converter methods in order to do the conversion. - */ - private def convertStructToAvro[T]( - structType: StructType, - schemaBuilder: RecordBuilder[T], - recordNamespace: String): T = { - val fieldsAssembler: FieldAssembler[T] = schemaBuilder.fields() - structType.fields.foreach { field => - val newField = fieldsAssembler.name(field.name).`type`() - - if (field.nullable) { - convertFieldTypeToAvro(field.dataType, newField.nullable(), field.name, recordNamespace) - .noDefault - } else { - convertFieldTypeToAvro(field.dataType, newField, field.name, recordNamespace) - .noDefault - } - } - fieldsAssembler.endRecord() - } - - /** - * Returns a function that is used to convert avro types to their - * corresponding sparkSQL representations. - */ - def createConverterToSQL(schema: Schema): Any => Any = { - schema.getType match { - // Avro strings are in Utf8, so we have to call toString on them - case STRING | ENUM => (item: Any) => if (item == null) null else item.toString - case INT | BOOLEAN | DOUBLE | FLOAT | LONG => identity - // Byte arrays are reused by avro, so we have to make a copy of them. - case FIXED => (item: Any) => if (item == null) { - null - } else { - item.asInstanceOf[Fixed].bytes().clone() - } - case BYTES => (item: Any) => if (item == null) { - null - } else { - val bytes = item.asInstanceOf[ByteBuffer] - val javaBytes = new Array[Byte](bytes.remaining) - bytes.get(javaBytes) - javaBytes - } - case RECORD => - val fieldConverters = schema.getFields.map(f => createConverterToSQL(f.schema)) - (item: Any) => if (item == null) { - null - } else { - val record = item.asInstanceOf[GenericRecord] - val converted = new Array[Any](fieldConverters.size) - var idx = 0 - while (idx < fieldConverters.size) { - converted(idx) = fieldConverters.apply(idx)(record.get(idx)) - idx += 1 - } - Row.fromSeq(converted.toSeq) - } - case ARRAY => - val elementConverter = createConverterToSQL(schema.getElementType) - (item: Any) => if (item == null) { - null - } else { - try { - item.asInstanceOf[GenericData.Array[Any]].map(elementConverter) - } catch { - case e: Throwable => - item.asInstanceOf[util.ArrayList[Any]].map(elementConverter) - } - } - case MAP => - val valueConverter = createConverterToSQL(schema.getValueType) - (item: Any) => if (item == null) { - null - } else { - item.asInstanceOf[HashMap[Any, Any]].map(x => (x._1.toString, valueConverter(x._2))).toMap - } - case UNION => - if (schema.getTypes.exists(_.getType == NULL)) { - val remainingUnionTypes = schema.getTypes.filterNot(_.getType == NULL) - if (remainingUnionTypes.size == 1) { - createConverterToSQL(remainingUnionTypes.get(0)) - } else { - createConverterToSQL(Schema.createUnion(remainingUnionTypes)) - } - } else schema.getTypes.map(_.getType) match { - case Seq(t1, t2) if Set(t1, t2) == Set(INT, LONG) => - (item: Any) => { - item match { - case l: Long => l - case i: Int => i.toLong - case null => null - } - } - case Seq(t1, t2) if Set(t1, t2) == Set(FLOAT, DOUBLE) => - (item: Any) => { - item match { - case d: Double => d - case f: Float => f.toDouble - case null => null - } - } - case other => throw new SchemaConversionException( - s"This mix of union types is not supported (see README): $other") - } - case other => throw new SchemaConversionException(s"invalid avro type: $other") - } - } - - /** - * This function is used to convert some sparkSQL type to avro type. Note that this function won't - * be used to construct fields of avro record (convertFieldTypeToAvro is used for that). - */ - private def convertTypeToAvro[T]( - dataType: DataType, - schemaBuilder: BaseTypeBuilder[T], - structName: String, - recordNamespace: String): T = { - dataType match { - case ByteType => schemaBuilder.intType() - case ShortType => schemaBuilder.intType() - case IntegerType => schemaBuilder.intType() - case LongType => schemaBuilder.longType() - case FloatType => schemaBuilder.floatType() - case DoubleType => schemaBuilder.doubleType() - case _: DecimalType => schemaBuilder.stringType() - case StringType => schemaBuilder.stringType() - case BinaryType => schemaBuilder.bytesType() - case BooleanType => schemaBuilder.booleanType() - case TimestampType => schemaBuilder.longType() - - case ArrayType(elementType, _) => - val builder = getSchemaBuilder(dataType.asInstanceOf[ArrayType].containsNull) - val elementSchema = convertTypeToAvro(elementType, builder, structName, recordNamespace) - schemaBuilder.array().items(elementSchema) - - case MapType(StringType, valueType, _) => - val builder = getSchemaBuilder(dataType.asInstanceOf[MapType].valueContainsNull) - val valueSchema = convertTypeToAvro(valueType, builder, structName, recordNamespace) - schemaBuilder.map().values(valueSchema) - - case structType: StructType => - convertStructToAvro( - structType, - schemaBuilder.record(structName).namespace(recordNamespace), - recordNamespace) - - case other => throw new IllegalArgumentException(s"Unexpected type $dataType.") - } - } - - /** - * This function is used to construct fields of the avro record, where schema of the field is - * specified by avro representation of dataType. Since builders for record fields are different - * from those for everything else, we have to use a separate method. - */ - private def convertFieldTypeToAvro[T]( - dataType: DataType, - newFieldBuilder: BaseFieldTypeBuilder[T], - structName: String, - recordNamespace: String): FieldDefault[T, _] = { - dataType match { - case ByteType => newFieldBuilder.intType() - case ShortType => newFieldBuilder.intType() - case IntegerType => newFieldBuilder.intType() - case LongType => newFieldBuilder.longType() - case FloatType => newFieldBuilder.floatType() - case DoubleType => newFieldBuilder.doubleType() - case _: DecimalType => newFieldBuilder.stringType() - case StringType => newFieldBuilder.stringType() - case BinaryType => newFieldBuilder.bytesType() - case BooleanType => newFieldBuilder.booleanType() - case TimestampType => newFieldBuilder.longType() - - case ArrayType(elementType, _) => - val builder = getSchemaBuilder(dataType.asInstanceOf[ArrayType].containsNull) - val elementSchema = convertTypeToAvro(elementType, builder, structName, recordNamespace) - newFieldBuilder.array().items(elementSchema) - - case MapType(StringType, valueType, _) => - val builder = getSchemaBuilder(dataType.asInstanceOf[MapType].valueContainsNull) - val valueSchema = convertTypeToAvro(valueType, builder, structName, recordNamespace) - newFieldBuilder.map().values(valueSchema) - - case structType: StructType => - convertStructToAvro( - structType, - newFieldBuilder.record(structName).namespace(recordNamespace), - recordNamespace) - - case other => throw new IllegalArgumentException(s"Unexpected type $dataType.") - } - } - - private def getSchemaBuilder(isNullable: Boolean): BaseTypeBuilder[Schema] = { - if (isNullable) { - SchemaBuilder.builder().nullable() - } else { - SchemaBuilder.builder() - } - } - /** - * This function constructs converter function for a given sparkSQL datatype. This is used in - * writing Avro records out to disk - */ - def createConverterToAvro( - dataType: DataType, - structName: String, - recordNamespace: String): (Any) => Any = { - dataType match { - case BinaryType => (item: Any) => item match { - case null => null - case bytes: Array[Byte] => ByteBuffer.wrap(bytes) - } - case ByteType | ShortType | IntegerType | LongType | - FloatType | DoubleType | StringType | BooleanType => identity - case _: DecimalType => (item: Any) => if (item == null) null else item.toString - case TimestampType => (item: Any) => - if (item == null) null else item.asInstanceOf[Timestamp].getTime - case ArrayType(elementType, _) => - val elementConverter = createConverterToAvro(elementType, structName, recordNamespace) - (item: Any) => { - if (item == null) { - null - } else { - val sourceArray = item.asInstanceOf[Seq[Any]] - val sourceArraySize = sourceArray.size - val targetArray = new util.ArrayList[Any](sourceArraySize) - var idx = 0 - while (idx < sourceArraySize) { - targetArray.add(elementConverter(sourceArray(idx))) - idx += 1 - } - targetArray - } - } - case MapType(StringType, valueType, _) => - val valueConverter = createConverterToAvro(valueType, structName, recordNamespace) - (item: Any) => { - if (item == null) { - null - } else { - val javaMap = new HashMap[String, Any]() - item.asInstanceOf[Map[String, Any]].foreach { case (key, value) => - javaMap.put(key, valueConverter(value)) - } - javaMap - } - } - case structType: StructType => - val builder = SchemaBuilder.record(structName).namespace(recordNamespace) - val schema: Schema = SchemaConverters.convertStructToAvro( - structType, builder, recordNamespace) - val fieldConverters = structType.fields.map(field => - createConverterToAvro(field.dataType, field.name, recordNamespace)) - (item: Any) => { - if (item == null) { - null - } else { - val record = new Record(schema) - val convertersIterator = fieldConverters.iterator - val fieldNamesIterator = dataType.asInstanceOf[StructType].fieldNames.iterator - val rowIterator = item.asInstanceOf[Row].toSeq.iterator - - while (convertersIterator.hasNext) { - val converter = convertersIterator.next() - record.put(fieldNamesIterator.next(), converter(rowIterator.next())) - } - record - } - } - } - } -} - -@InterfaceAudience.Private -object AvroSerdes { - // We only handle top level is record or primary type now - def serialize(input: Any, schema: Schema): Array[Byte]= { - schema.getType match { - case BOOLEAN => Bytes.toBytes(input.asInstanceOf[Boolean]) - case BYTES | FIXED=> input.asInstanceOf[Array[Byte]] - case DOUBLE => Bytes.toBytes(input.asInstanceOf[Double]) - case FLOAT => Bytes.toBytes(input.asInstanceOf[Float]) - case INT => Bytes.toBytes(input.asInstanceOf[Int]) - case LONG => Bytes.toBytes(input.asInstanceOf[Long]) - case STRING => Bytes.toBytes(input.asInstanceOf[String]) - case RECORD => - val gr = input.asInstanceOf[GenericRecord] - val writer2 = new GenericDatumWriter[GenericRecord](schema) - val bao2 = new ByteArrayOutputStream() - val encoder2: BinaryEncoder = EncoderFactory.get().directBinaryEncoder(bao2, null) - writer2.write(gr, encoder2) - bao2.toByteArray() - case _ => throw new Exception(s"unsupported data type ${schema.getType}") //TODO - } - } - - def deserialize(input: Array[Byte], schema: Schema): GenericRecord = { - val reader2: DatumReader[GenericRecord] = new GenericDatumReader[GenericRecord](schema) - val bai2 = new ByteArrayInputStream(input) - val decoder2: BinaryDecoder = DecoderFactory.get().directBinaryDecoder(bai2, null) - val gr2: GenericRecord = reader2.read(null, decoder2) - gr2 - } -}
http://git-wip-us.apache.org/repos/asf/hbase/blob/516d370b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/SerDes.scala ---------------------------------------------------------------------- diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/SerDes.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/SerDes.scala deleted file mode 100644 index 98cc871..0000000 --- a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/SerDes.scala +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hbase.spark.datasources - -import java.io.ByteArrayInputStream - -import org.apache.avro.Schema -import org.apache.avro.Schema.Type._ -import org.apache.avro.generic.GenericDatumReader -import org.apache.avro.generic.GenericDatumWriter -import org.apache.avro.generic.GenericRecord -import org.apache.avro.generic.{GenericDatumReader, GenericDatumWriter, GenericRecord} -import org.apache.avro.io._ -import org.apache.commons.io.output.ByteArrayOutputStream -import org.apache.hadoop.hbase.util.Bytes -import org.apache.spark.sql.types._ - -// TODO: This is not really used in code. -trait SerDes { - def serialize(value: Any): Array[Byte] - def deserialize(bytes: Array[Byte], start: Int, end: Int): Any -} - -// TODO: This is not really used in code. -class DoubleSerDes extends SerDes { - override def serialize(value: Any): Array[Byte] = Bytes.toBytes(value.asInstanceOf[Double]) - override def deserialize(bytes: Array[Byte], start: Int, end: Int): Any = { - Bytes.toDouble(bytes, start) - } -} - - http://git-wip-us.apache.org/repos/asf/hbase/blob/516d370b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/SerializableConfiguration.scala ---------------------------------------------------------------------- diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/SerializableConfiguration.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/SerializableConfiguration.scala deleted file mode 100644 index 0e2b6f4..0000000 --- a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/SerializableConfiguration.scala +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hbase.spark.datasources - -import java.io.{IOException, ObjectInputStream, ObjectOutputStream} - -import org.apache.hadoop.conf.Configuration -import org.apache.yetus.audience.InterfaceAudience; - -import scala.util.control.NonFatal - -@InterfaceAudience.Private -class SerializableConfiguration(@transient var value: Configuration) extends Serializable { - private def writeObject(out: ObjectOutputStream): Unit = tryOrIOException { - out.defaultWriteObject() - value.write(out) - } - - private def readObject(in: ObjectInputStream): Unit = tryOrIOException { - value = new Configuration(false) - value.readFields(in) - } - - def tryOrIOException(block: => Unit) { - try { - block - } catch { - case e: IOException => throw e - case NonFatal(t) => throw new IOException(t) - } - } -} http://git-wip-us.apache.org/repos/asf/hbase/blob/516d370b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/package.scala ---------------------------------------------------------------------- diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/package.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/package.scala deleted file mode 100644 index ce7b55a..0000000 --- a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/package.scala +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hbase.spark - -import org.apache.hadoop.hbase.util.Bytes - -import scala.math.Ordering - -package object hbase { - type HBaseType = Array[Byte] - def bytesMin = new Array[Byte](0) - def bytesMax = null - val ByteMax = -1.asInstanceOf[Byte] - val ByteMin = 0.asInstanceOf[Byte] - val ord: Ordering[HBaseType] = new Ordering[HBaseType] { - def compare(x: Array[Byte], y: Array[Byte]): Int = { - return Bytes.compareTo(x, y) - } - } - //Do not use BinaryType.ordering - implicit val order: Ordering[HBaseType] = ord - -} http://git-wip-us.apache.org/repos/asf/hbase/blob/516d370b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/datasources/AvroSource.scala ---------------------------------------------------------------------- diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/datasources/AvroSource.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/datasources/AvroSource.scala deleted file mode 100644 index c09e99d..0000000 --- a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/datasources/AvroSource.scala +++ /dev/null @@ -1,158 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hbase.spark.example.datasources - -import org.apache.avro.Schema -import org.apache.avro.generic.GenericData -import org.apache.hadoop.hbase.spark.AvroSerdes -import org.apache.spark.sql.datasources.hbase.HBaseTableCatalog -import org.apache.spark.sql.{DataFrame, SQLContext} -import org.apache.spark.{SparkConf, SparkContext} - -/** - * @param col0 Column #0, Type is String - * @param col1 Column #1, Type is Array[Byte] - */ -case class AvroHBaseRecord(col0: String, - col1: Array[Byte]) - -object AvroHBaseRecord { - val schemaString = - s"""{"namespace": "example.avro", - | "type": "record", "name": "User", - | "fields": [ - | {"name": "name", "type": "string"}, - | {"name": "favorite_number", "type": ["int", "null"]}, - | {"name": "favorite_color", "type": ["string", "null"]}, - | {"name": "favorite_array", "type": {"type": "array", "items": "string"}}, - | {"name": "favorite_map", "type": {"type": "map", "values": "int"}} - | ] }""".stripMargin - - val avroSchema: Schema = { - val p = new Schema.Parser - p.parse(schemaString) - } - - def apply(i: Int): AvroHBaseRecord = { - - val user = new GenericData.Record(avroSchema); - user.put("name", s"name${"%03d".format(i)}") - user.put("favorite_number", i) - user.put("favorite_color", s"color${"%03d".format(i)}") - val favoriteArray = new GenericData.Array[String](2, avroSchema.getField("favorite_array").schema()) - favoriteArray.add(s"number${i}") - favoriteArray.add(s"number${i+1}") - user.put("favorite_array", favoriteArray) - import collection.JavaConverters._ - val favoriteMap = Map[String, Int](("key1" -> i), ("key2" -> (i+1))).asJava - user.put("favorite_map", favoriteMap) - val avroByte = AvroSerdes.serialize(user, avroSchema) - AvroHBaseRecord(s"name${"%03d".format(i)}", avroByte) - } -} - -object AvroSource { - def catalog = s"""{ - |"table":{"namespace":"default", "name":"ExampleAvrotable"}, - |"rowkey":"key", - |"columns":{ - |"col0":{"cf":"rowkey", "col":"key", "type":"string"}, - |"col1":{"cf":"cf1", "col":"col1", "type":"binary"} - |} - |}""".stripMargin - - def avroCatalog = s"""{ - |"table":{"namespace":"default", "name":"ExampleAvrotable"}, - |"rowkey":"key", - |"columns":{ - |"col0":{"cf":"rowkey", "col":"key", "type":"string"}, - |"col1":{"cf":"cf1", "col":"col1", "avro":"avroSchema"} - |} - |}""".stripMargin - - def avroCatalogInsert = s"""{ - |"table":{"namespace":"default", "name":"ExampleAvrotableInsert"}, - |"rowkey":"key", - |"columns":{ - |"col0":{"cf":"rowkey", "col":"key", "type":"string"}, - |"col1":{"cf":"cf1", "col":"col1", "avro":"avroSchema"} - |} - |}""".stripMargin - - def main(args: Array[String]) { - val sparkConf = new SparkConf().setAppName("AvroSourceExample") - val sc = new SparkContext(sparkConf) - val sqlContext = new SQLContext(sc) - - import sqlContext.implicits._ - - def withCatalog(cat: String): DataFrame = { - sqlContext - .read - .options(Map("avroSchema" -> AvroHBaseRecord.schemaString, HBaseTableCatalog.tableCatalog -> avroCatalog)) - .format("org.apache.hadoop.hbase.spark") - .load() - } - - val data = (0 to 255).map { i => - AvroHBaseRecord(i) - } - - sc.parallelize(data).toDF.write.options( - Map(HBaseTableCatalog.tableCatalog -> catalog, HBaseTableCatalog.newTable -> "5")) - .format("org.apache.hadoop.hbase.spark") - .save() - - val df = withCatalog(catalog) - df.show() - df.printSchema() - df.registerTempTable("ExampleAvrotable") - val c = sqlContext.sql("select count(1) from ExampleAvrotable") - c.show() - - val filtered = df.select($"col0", $"col1.favorite_array").where($"col0" === "name001") - filtered.show() - val collected = filtered.collect() - if (collected(0).getSeq[String](1)(0) != "number1") { - throw new UserCustomizedSampleException("value invalid") - } - if (collected(0).getSeq[String](1)(1) != "number2") { - throw new UserCustomizedSampleException("value invalid") - } - - df.write.options( - Map("avroSchema"->AvroHBaseRecord.schemaString, HBaseTableCatalog.tableCatalog->avroCatalogInsert, - HBaseTableCatalog.newTable -> "5")) - .format("org.apache.hadoop.hbase.spark") - .save() - val newDF = withCatalog(avroCatalogInsert) - newDF.show() - newDF.printSchema() - if(newDF.count() != 256) { - throw new UserCustomizedSampleException("value invalid") - } - - df.filter($"col1.name" === "name005" || $"col1.name" <= "name005") - .select("col0", "col1.favorite_color", "col1.favorite_number") - .show() - - df.filter($"col1.name" <= "name005" || $"col1.name".contains("name007")) - .select("col0", "col1.favorite_color", "col1.favorite_number") - .show() - } -} http://git-wip-us.apache.org/repos/asf/hbase/blob/516d370b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/datasources/DataType.scala ---------------------------------------------------------------------- diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/datasources/DataType.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/datasources/DataType.scala deleted file mode 100644 index 96c6d6e..0000000 --- a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/datasources/DataType.scala +++ /dev/null @@ -1,165 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hbase.spark.example.datasources - -import org.apache.spark.sql.{DataFrame, SQLContext} -import org.apache.spark.{SparkContext, SparkConf} -import org.apache.spark.sql.datasources.hbase.HBaseTableCatalog - -class UserCustomizedSampleException(message: String = null, cause: Throwable = null) extends - RuntimeException(UserCustomizedSampleException.message(message, cause), cause) - -object UserCustomizedSampleException { - def message(message: String, cause: Throwable) = - if (message != null) message - else if (cause != null) cause.toString() - else null -} - -case class IntKeyRecord( - col0: Integer, - col1: Boolean, - col2: Double, - col3: Float, - col4: Int, - col5: Long, - col6: Short, - col7: String, - col8: Byte) - -object IntKeyRecord { - def apply(i: Int): IntKeyRecord = { - IntKeyRecord(if (i % 2 == 0) i else -i, - i % 2 == 0, - i.toDouble, - i.toFloat, - i, - i.toLong, - i.toShort, - s"String$i extra", - i.toByte) - } -} - -object DataType { - val cat = s"""{ - |"table":{"namespace":"default", "name":"DataTypeExampleTable"}, - |"rowkey":"key", - |"columns":{ - |"col0":{"cf":"rowkey", "col":"key", "type":"int"}, - |"col1":{"cf":"cf1", "col":"col1", "type":"boolean"}, - |"col2":{"cf":"cf2", "col":"col2", "type":"double"}, - |"col3":{"cf":"cf3", "col":"col3", "type":"float"}, - |"col4":{"cf":"cf4", "col":"col4", "type":"int"}, - |"col5":{"cf":"cf5", "col":"col5", "type":"bigint"}, - |"col6":{"cf":"cf6", "col":"col6", "type":"smallint"}, - |"col7":{"cf":"cf7", "col":"col7", "type":"string"}, - |"col8":{"cf":"cf8", "col":"col8", "type":"tinyint"} - |} - |}""".stripMargin - - def main(args: Array[String]){ - val sparkConf = new SparkConf().setAppName("DataTypeExample") - val sc = new SparkContext(sparkConf) - val sqlContext = new SQLContext(sc) - - import sqlContext.implicits._ - - def withCatalog(cat: String): DataFrame = { - sqlContext - .read - .options(Map(HBaseTableCatalog.tableCatalog->cat)) - .format("org.apache.hadoop.hbase.spark") - .load() - } - - // test populate table - val data = (0 until 32).map { i => - IntKeyRecord(i) - } - sc.parallelize(data).toDF.write.options( - Map(HBaseTableCatalog.tableCatalog -> cat, HBaseTableCatalog.newTable -> "5")) - .format("org.apache.hadoop.hbase.spark") - .save() - - // test less than 0 - val df = withCatalog(cat) - val s = df.filter($"col0" < 0) - s.show() - if(s.count() != 16){ - throw new UserCustomizedSampleException("value invalid") - } - - //test less or equal than -10. The number of results is 11 - val num1 = df.filter($"col0" <= -10) - num1.show() - val c1 = num1.count() - println(s"test result count should be 11: $c1") - - //test less or equal than -9. The number of results is 12 - val num2 = df.filter($"col0" <= -9) - num2.show() - val c2 = num2.count() - println(s"test result count should be 12: $c2") - - //test greater or equal than -9". The number of results is 21 - val num3 = df.filter($"col0" >= -9) - num3.show() - val c3 = num3.count() - println(s"test result count should be 21: $c3") - - //test greater or equal than 0. The number of results is 16 - val num4 = df.filter($"col0" >= 0) - num4.show() - val c4 = num4.count() - println(s"test result count should be 16: $c4") - - //test greater than 10. The number of results is 10 - val num5 = df.filter($"col0" > 10) - num5.show() - val c5 = num5.count() - println(s"test result count should be 10: $c5") - - // test "and". The number of results is 11 - val num6 = df.filter($"col0" > -10 && $"col0" <= 10) - num6.show() - val c6 = num6.count() - println(s"test result count should be 11: $c6") - - //test "or". The number of results is 21 - val num7 = df.filter($"col0" <= -10 || $"col0" > 10) - num7.show() - val c7 = num7.count() - println(s"test result count should be 21: $c7") - - //test "all". The number of results is 32 - val num8 = df.filter($"col0" >= -100) - num8.show() - val c8 = num8.count() - println(s"test result count should be 32: $c8") - - //test "full query" - val df1 = withCatalog(cat) - df1.show() - val c_df = df1.count() - println(s"df count should be 32: $c_df") - if(c_df != 32){ - throw new UserCustomizedSampleException("value invalid") - } - } -} http://git-wip-us.apache.org/repos/asf/hbase/blob/516d370b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/datasources/HBaseSource.scala ---------------------------------------------------------------------- diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/datasources/HBaseSource.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/datasources/HBaseSource.scala deleted file mode 100644 index 056c071..0000000 --- a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/datasources/HBaseSource.scala +++ /dev/null @@ -1,103 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hbase.spark.example.datasources - -import org.apache.spark.sql.{DataFrame, SQLContext} -import org.apache.spark.{SparkContext, SparkConf} -import org.apache.spark.sql.datasources.hbase.HBaseTableCatalog - -case class HBaseRecord( - col0: String, - col1: Boolean, - col2: Double, - col3: Float, - col4: Int, - col5: Long, - col6: Short, - col7: String, - col8: Byte) - -object HBaseRecord { - def apply(i: Int): HBaseRecord = { - val s = s"""row${"%03d".format(i)}""" - HBaseRecord(s, - i % 2 == 0, - i.toDouble, - i.toFloat, - i, - i.toLong, - i.toShort, - s"String$i extra", - i.toByte) - } -} - -object HBaseSource { - val cat = s"""{ - |"table":{"namespace":"default", "name":"HBaseSourceExampleTable"}, - |"rowkey":"key", - |"columns":{ - |"col0":{"cf":"rowkey", "col":"key", "type":"string"}, - |"col1":{"cf":"cf1", "col":"col1", "type":"boolean"}, - |"col2":{"cf":"cf2", "col":"col2", "type":"double"}, - |"col3":{"cf":"cf3", "col":"col3", "type":"float"}, - |"col4":{"cf":"cf4", "col":"col4", "type":"int"}, - |"col5":{"cf":"cf5", "col":"col5", "type":"bigint"}, - |"col6":{"cf":"cf6", "col":"col6", "type":"smallint"}, - |"col7":{"cf":"cf7", "col":"col7", "type":"string"}, - |"col8":{"cf":"cf8", "col":"col8", "type":"tinyint"} - |} - |}""".stripMargin - - def main(args: Array[String]) { - val sparkConf = new SparkConf().setAppName("HBaseSourceExample") - val sc = new SparkContext(sparkConf) - val sqlContext = new SQLContext(sc) - - import sqlContext.implicits._ - - def withCatalog(cat: String): DataFrame = { - sqlContext - .read - .options(Map(HBaseTableCatalog.tableCatalog->cat)) - .format("org.apache.hadoop.hbase.spark") - .load() - } - - val data = (0 to 255).map { i => - HBaseRecord(i) - } - - sc.parallelize(data).toDF.write.options( - Map(HBaseTableCatalog.tableCatalog -> cat, HBaseTableCatalog.newTable -> "5")) - .format("org.apache.hadoop.hbase.spark") - .save() - - val df = withCatalog(cat) - df.show() - df.filter($"col0" <= "row005") - .select($"col0", $"col1").show - df.filter($"col0" === "row005" || $"col0" <= "row005") - .select($"col0", $"col1").show - df.filter($"col0" > "row250") - .select($"col0", $"col1").show - df.registerTempTable("table1") - val c = sqlContext.sql("select count(col1) from table1 where col0 < 'row050'") - c.show() - } -} http://git-wip-us.apache.org/repos/asf/hbase/blob/516d370b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/hbasecontext/HBaseBulkDeleteExample.scala ---------------------------------------------------------------------- diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/hbasecontext/HBaseBulkDeleteExample.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/hbasecontext/HBaseBulkDeleteExample.scala deleted file mode 100644 index 46135a5..0000000 --- a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/hbasecontext/HBaseBulkDeleteExample.scala +++ /dev/null @@ -1,63 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hbase.spark.example.hbasecontext - -import org.apache.hadoop.hbase.spark.HBaseContext -import org.apache.spark.SparkContext -import org.apache.hadoop.hbase.{TableName, HBaseConfiguration} -import org.apache.hadoop.hbase.util.Bytes -import org.apache.hadoop.hbase.client.Delete -import org.apache.spark.SparkConf - -/** - * This is a simple example of deleting records in HBase - * with the bulkDelete function. - */ -object HBaseBulkDeleteExample { - def main(args: Array[String]) { - if (args.length < 1) { - println("HBaseBulkDeleteExample {tableName} missing an argument") - return - } - - val tableName = args(0) - - val sparkConf = new SparkConf().setAppName("HBaseBulkDeleteExample " + tableName) - val sc = new SparkContext(sparkConf) - try { - //[Array[Byte]] - val rdd = sc.parallelize(Array( - Bytes.toBytes("1"), - Bytes.toBytes("2"), - Bytes.toBytes("3"), - Bytes.toBytes("4"), - Bytes.toBytes("5") - )) - - val conf = HBaseConfiguration.create() - - val hbaseContext = new HBaseContext(sc, conf) - hbaseContext.bulkDelete[Array[Byte]](rdd, - TableName.valueOf(tableName), - putRecord => new Delete(putRecord), - 4) - } finally { - sc.stop() - } - } -} http://git-wip-us.apache.org/repos/asf/hbase/blob/516d370b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/hbasecontext/HBaseBulkGetExample.scala ---------------------------------------------------------------------- diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/hbasecontext/HBaseBulkGetExample.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/hbasecontext/HBaseBulkGetExample.scala deleted file mode 100644 index 1bdc90d..0000000 --- a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/hbasecontext/HBaseBulkGetExample.scala +++ /dev/null @@ -1,93 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hbase.spark.example.hbasecontext - -import org.apache.hadoop.hbase.spark.HBaseContext -import org.apache.spark.SparkContext -import org.apache.hadoop.hbase.{CellUtil, TableName, HBaseConfiguration} -import org.apache.hadoop.hbase.util.Bytes -import org.apache.hadoop.hbase.client.Get -import org.apache.hadoop.hbase.client.Result -import org.apache.spark.SparkConf - -/** - * This is a simple example of getting records from HBase - * with the bulkGet function. - */ -object HBaseBulkGetExample { - def main(args: Array[String]) { - if (args.length < 1) { - println("HBaseBulkGetExample {tableName} missing an argument") - return - } - - val tableName = args(0) - - val sparkConf = new SparkConf().setAppName("HBaseBulkGetExample " + tableName) - val sc = new SparkContext(sparkConf) - - try { - - //[(Array[Byte])] - val rdd = sc.parallelize(Array( - Bytes.toBytes("1"), - Bytes.toBytes("2"), - Bytes.toBytes("3"), - Bytes.toBytes("4"), - Bytes.toBytes("5"), - Bytes.toBytes("6"), - Bytes.toBytes("7"))) - - val conf = HBaseConfiguration.create() - - val hbaseContext = new HBaseContext(sc, conf) - - val getRdd = hbaseContext.bulkGet[Array[Byte], String]( - TableName.valueOf(tableName), - 2, - rdd, - record => { - System.out.println("making Get") - new Get(record) - }, - (result: Result) => { - - val it = result.listCells().iterator() - val b = new StringBuilder - - b.append(Bytes.toString(result.getRow) + ":") - - while (it.hasNext) { - val cell = it.next() - val q = Bytes.toString(CellUtil.cloneQualifier(cell)) - if (q.equals("counter")) { - b.append("(" + q + "," + Bytes.toLong(CellUtil.cloneValue(cell)) + ")") - } else { - b.append("(" + q + "," + Bytes.toString(CellUtil.cloneValue(cell)) + ")") - } - } - b.toString() - }) - - getRdd.collect().foreach(v => println(v)) - - } finally { - sc.stop() - } - } -} http://git-wip-us.apache.org/repos/asf/hbase/blob/516d370b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/hbasecontext/HBaseBulkPutExample.scala ---------------------------------------------------------------------- diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/hbasecontext/HBaseBulkPutExample.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/hbasecontext/HBaseBulkPutExample.scala deleted file mode 100644 index 063f2c2..0000000 --- a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/hbasecontext/HBaseBulkPutExample.scala +++ /dev/null @@ -1,75 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hbase.spark.example.hbasecontext - -import org.apache.hadoop.hbase.spark.HBaseContext -import org.apache.spark.SparkContext -import org.apache.hadoop.hbase.{TableName, HBaseConfiguration} -import org.apache.hadoop.hbase.util.Bytes -import org.apache.hadoop.hbase.client.Put -import org.apache.spark.SparkConf - -/** - * This is a simple example of putting records in HBase - * with the bulkPut function. - */ -object HBaseBulkPutExample { - def main(args: Array[String]) { - if (args.length < 2) { - println("HBaseBulkPutExample {tableName} {columnFamily} are missing an arguments") - return - } - - val tableName = args(0) - val columnFamily = args(1) - - val sparkConf = new SparkConf().setAppName("HBaseBulkPutExample " + - tableName + " " + columnFamily) - val sc = new SparkContext(sparkConf) - - try { - //[(Array[Byte], Array[(Array[Byte], Array[Byte], Array[Byte])])] - val rdd = sc.parallelize(Array( - (Bytes.toBytes("1"), - Array((Bytes.toBytes(columnFamily), Bytes.toBytes("1"), Bytes.toBytes("1")))), - (Bytes.toBytes("2"), - Array((Bytes.toBytes(columnFamily), Bytes.toBytes("1"), Bytes.toBytes("2")))), - (Bytes.toBytes("3"), - Array((Bytes.toBytes(columnFamily), Bytes.toBytes("1"), Bytes.toBytes("3")))), - (Bytes.toBytes("4"), - Array((Bytes.toBytes(columnFamily), Bytes.toBytes("1"), Bytes.toBytes("4")))), - (Bytes.toBytes("5"), - Array((Bytes.toBytes(columnFamily), Bytes.toBytes("1"), Bytes.toBytes("5")))) - )) - - val conf = HBaseConfiguration.create() - - val hbaseContext = new HBaseContext(sc, conf) - hbaseContext.bulkPut[(Array[Byte], Array[(Array[Byte], Array[Byte], Array[Byte])])](rdd, - TableName.valueOf(tableName), - (putRecord) => { - val put = new Put(putRecord._1) - putRecord._2.foreach((putValue) => - put.addColumn(putValue._1, putValue._2, putValue._3)) - put - }); - } finally { - sc.stop() - } - } -} http://git-wip-us.apache.org/repos/asf/hbase/blob/516d370b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/hbasecontext/HBaseBulkPutExampleFromFile.scala ---------------------------------------------------------------------- diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/hbasecontext/HBaseBulkPutExampleFromFile.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/hbasecontext/HBaseBulkPutExampleFromFile.scala deleted file mode 100644 index 37a0358..0000000 --- a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/hbasecontext/HBaseBulkPutExampleFromFile.scala +++ /dev/null @@ -1,76 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hbase.spark.example.hbasecontext - -import org.apache.hadoop.hbase.spark.HBaseContext -import org.apache.spark.SparkContext -import org.apache.hadoop.hbase.{TableName, HBaseConfiguration} -import org.apache.hadoop.hbase.util.Bytes -import org.apache.hadoop.hbase.client.Put -import org.apache.hadoop.mapred.TextInputFormat -import org.apache.hadoop.io.LongWritable -import org.apache.hadoop.io.Text -import org.apache.spark.SparkConf - -/** - * This is a simple example of putting records in HBase - * with the bulkPut function. In this example we are - * getting the put information from a file - */ -object HBaseBulkPutExampleFromFile { - def main(args: Array[String]) { - if (args.length < 3) { - println("HBaseBulkPutExampleFromFile {tableName} {columnFamily} {inputFile} are missing an argument") - return - } - - val tableName = args(0) - val columnFamily = args(1) - val inputFile = args(2) - - val sparkConf = new SparkConf().setAppName("HBaseBulkPutExampleFromFile " + - tableName + " " + columnFamily + " " + inputFile) - val sc = new SparkContext(sparkConf) - - try { - var rdd = sc.hadoopFile( - inputFile, - classOf[TextInputFormat], - classOf[LongWritable], - classOf[Text]).map(v => { - System.out.println("reading-" + v._2.toString) - v._2.toString - }) - - val conf = HBaseConfiguration.create() - - val hbaseContext = new HBaseContext(sc, conf) - hbaseContext.bulkPut[String](rdd, - TableName.valueOf(tableName), - (putRecord) => { - System.out.println("hbase-" + putRecord) - val put = new Put(Bytes.toBytes("Value- " + putRecord)) - put.addColumn(Bytes.toBytes("c"), Bytes.toBytes("1"), - Bytes.toBytes(putRecord.length())) - put - }); - } finally { - sc.stop() - } - } -} http://git-wip-us.apache.org/repos/asf/hbase/blob/516d370b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/hbasecontext/HBaseBulkPutTimestampExample.scala ---------------------------------------------------------------------- diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/hbasecontext/HBaseBulkPutTimestampExample.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/hbasecontext/HBaseBulkPutTimestampExample.scala deleted file mode 100644 index fa78216..0000000 --- a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/hbasecontext/HBaseBulkPutTimestampExample.scala +++ /dev/null @@ -1,77 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hbase.spark.example.hbasecontext - -import org.apache.hadoop.hbase.spark.HBaseContext -import org.apache.spark.SparkContext -import org.apache.hadoop.hbase.{TableName, HBaseConfiguration} -import org.apache.hadoop.hbase.util.Bytes -import org.apache.hadoop.hbase.client.Put -import org.apache.spark.SparkConf - -/** - * This is a simple example of putting records in HBase - * with the bulkPut function. In this example we are - * also setting the timestamp in the put - */ -object HBaseBulkPutTimestampExample { - def main(args: Array[String]) { - if (args.length < 2) { - System.out.println("HBaseBulkPutTimestampExample {tableName} {columnFamily} are missing an argument") - return - } - - val tableName = args(0) - val columnFamily = args(1) - - val sparkConf = new SparkConf().setAppName("HBaseBulkPutTimestampExample " + - tableName + " " + columnFamily) - val sc = new SparkContext(sparkConf) - - try { - - val rdd = sc.parallelize(Array( - (Bytes.toBytes("6"), - Array((Bytes.toBytes(columnFamily), Bytes.toBytes("1"), Bytes.toBytes("1")))), - (Bytes.toBytes("7"), - Array((Bytes.toBytes(columnFamily), Bytes.toBytes("1"), Bytes.toBytes("2")))), - (Bytes.toBytes("8"), - Array((Bytes.toBytes(columnFamily), Bytes.toBytes("1"), Bytes.toBytes("3")))), - (Bytes.toBytes("9"), - Array((Bytes.toBytes(columnFamily), Bytes.toBytes("1"), Bytes.toBytes("4")))), - (Bytes.toBytes("10"), - Array((Bytes.toBytes(columnFamily), Bytes.toBytes("1"), Bytes.toBytes("5")))))) - - val conf = HBaseConfiguration.create() - - val timeStamp = System.currentTimeMillis() - - val hbaseContext = new HBaseContext(sc, conf) - hbaseContext.bulkPut[(Array[Byte], Array[(Array[Byte], Array[Byte], Array[Byte])])](rdd, - TableName.valueOf(tableName), - (putRecord) => { - val put = new Put(putRecord._1) - putRecord._2.foreach((putValue) => put.addColumn(putValue._1, putValue._2, - timeStamp, putValue._3)) - put - }) - } finally { - sc.stop() - } - } -} http://git-wip-us.apache.org/repos/asf/hbase/blob/516d370b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/hbasecontext/HBaseDistributedScanExample.scala ---------------------------------------------------------------------- diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/hbasecontext/HBaseDistributedScanExample.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/hbasecontext/HBaseDistributedScanExample.scala deleted file mode 100644 index bb2e79d..0000000 --- a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/hbasecontext/HBaseDistributedScanExample.scala +++ /dev/null @@ -1,59 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.spark.example.hbasecontext - -import org.apache.hadoop.hbase.spark.HBaseContext -import org.apache.spark.SparkContext -import org.apache.hadoop.hbase.{TableName, HBaseConfiguration} -import org.apache.hadoop.hbase.util.Bytes -import org.apache.hadoop.hbase.client.Scan -import org.apache.spark.SparkConf -/** - * This is a simple example of scanning records from HBase - * with the hbaseRDD function in Distributed fashion. - */ -object HBaseDistributedScanExample { - def main(args: Array[String]) { - if (args.length < 1) { - println("HBaseDistributedScanExample {tableName} missing an argument") - return - } - - val tableName = args(0) - - val sparkConf = new SparkConf().setAppName("HBaseDistributedScanExample " + tableName ) - val sc = new SparkContext(sparkConf) - - try { - val conf = HBaseConfiguration.create() - - val hbaseContext = new HBaseContext(sc, conf) - - val scan = new Scan() - scan.setCaching(100) - - val getRdd = hbaseContext.hbaseRDD(TableName.valueOf(tableName), scan) - - getRdd.foreach(v => println(Bytes.toString(v._1.get()))) - - println("Length: " + getRdd.map(r => r._1.copyBytes()).collect().length); - } finally { - sc.stop() - } - } - -} http://git-wip-us.apache.org/repos/asf/hbase/blob/516d370b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/hbasecontext/HBaseStreamingBulkPutExample.scala ---------------------------------------------------------------------- diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/hbasecontext/HBaseStreamingBulkPutExample.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/hbasecontext/HBaseStreamingBulkPutExample.scala deleted file mode 100644 index 8ac93ef..0000000 --- a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/hbasecontext/HBaseStreamingBulkPutExample.scala +++ /dev/null @@ -1,74 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hbase.spark.example.hbasecontext - -import org.apache.hadoop.hbase.spark.HBaseContext -import org.apache.spark.SparkContext -import org.apache.hadoop.hbase.{TableName, HBaseConfiguration} -import org.apache.hadoop.hbase.util.Bytes -import org.apache.hadoop.hbase.client.Put -import org.apache.spark.streaming.StreamingContext -import org.apache.spark.streaming.Seconds -import org.apache.spark.SparkConf - -/** - * This is a simple example of BulkPut with Spark Streaming - */ -object HBaseStreamingBulkPutExample { - def main(args: Array[String]) { - if (args.length < 4) { - println("HBaseStreamingBulkPutExample " + - "{host} {port} {tableName} {columnFamily} are missing an argument") - return - } - - val host = args(0) - val port = args(1) - val tableName = args(2) - val columnFamily = args(3) - - val sparkConf = new SparkConf().setAppName("HBaseStreamingBulkPutExample " + - tableName + " " + columnFamily) - val sc = new SparkContext(sparkConf) - try { - val ssc = new StreamingContext(sc, Seconds(1)) - - val lines = ssc.socketTextStream(host, port.toInt) - - val conf = HBaseConfiguration.create() - - val hbaseContext = new HBaseContext(sc, conf) - - hbaseContext.streamBulkPut[String](lines, - TableName.valueOf(tableName), - (putRecord) => { - if (putRecord.length() > 0) { - val put = new Put(Bytes.toBytes(putRecord)) - put.addColumn(Bytes.toBytes("c"), Bytes.toBytes("foo"), Bytes.toBytes("bar")) - put - } else { - null - } - }) - ssc.start() - ssc.awaitTerminationOrTimeout(60000) - } finally { - sc.stop() - } - } -} http://git-wip-us.apache.org/repos/asf/hbase/blob/516d370b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/rdd/HBaseBulkDeleteExample.scala ---------------------------------------------------------------------- diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/rdd/HBaseBulkDeleteExample.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/rdd/HBaseBulkDeleteExample.scala deleted file mode 100644 index 83d3f9e..0000000 --- a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/rdd/HBaseBulkDeleteExample.scala +++ /dev/null @@ -1,64 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.spark.example.rdd - -import org.apache.hadoop.hbase.client.Delete -import org.apache.hadoop.hbase.{TableName, HBaseConfiguration} -import org.apache.hadoop.hbase.spark.HBaseContext -import org.apache.hadoop.hbase.spark.HBaseRDDFunctions._ -import org.apache.hadoop.hbase.util.Bytes - -import org.apache.spark.{SparkContext, SparkConf} - -/** - * This is a simple example of deleting records in HBase - * with the bulkDelete function. - */ -object HBaseBulkDeleteExample { - def main(args: Array[String]) { - if (args.length < 1) { - println("HBaseBulkDeleteExample {tableName} are missing an argument") - return - } - - val tableName = args(0) - - val sparkConf = new SparkConf().setAppName("HBaseBulkDeleteExample " + tableName) - val sc = new SparkContext(sparkConf) - try { - //[Array[Byte]] - val rdd = sc.parallelize(Array( - Bytes.toBytes("1"), - Bytes.toBytes("2"), - Bytes.toBytes("3"), - Bytes.toBytes("4"), - Bytes.toBytes("5") - )) - - val conf = HBaseConfiguration.create() - - val hbaseContext = new HBaseContext(sc, conf) - - rdd.hbaseBulkDelete(hbaseContext, TableName.valueOf(tableName), - putRecord => new Delete(putRecord), - 4) - - } finally { - sc.stop() - } - } -} http://git-wip-us.apache.org/repos/asf/hbase/blob/516d370b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/rdd/HBaseBulkGetExample.scala ---------------------------------------------------------------------- diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/rdd/HBaseBulkGetExample.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/rdd/HBaseBulkGetExample.scala deleted file mode 100644 index eedabc3..0000000 --- a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/rdd/HBaseBulkGetExample.scala +++ /dev/null @@ -1,88 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.spark.example.rdd - -import org.apache.hadoop.hbase.client.{Result, Get} -import org.apache.hadoop.hbase.{CellUtil, TableName, HBaseConfiguration} -import org.apache.hadoop.hbase.spark.HBaseContext -import org.apache.hadoop.hbase.util.Bytes -import org.apache.hadoop.hbase.spark.HBaseRDDFunctions._ -import org.apache.spark.{SparkContext, SparkConf} - -/** - * This is a simple example of getting records from HBase - * with the bulkGet function. - */ -object HBaseBulkGetExample { - def main(args: Array[String]) { - if (args.length < 1) { - println("HBaseBulkGetExample {tableName} is missing an argument") - return - } - - val tableName = args(0) - - val sparkConf = new SparkConf().setAppName("HBaseBulkGetExample " + tableName) - val sc = new SparkContext(sparkConf) - - try { - - //[(Array[Byte])] - val rdd = sc.parallelize(Array( - Bytes.toBytes("1"), - Bytes.toBytes("2"), - Bytes.toBytes("3"), - Bytes.toBytes("4"), - Bytes.toBytes("5"), - Bytes.toBytes("6"), - Bytes.toBytes("7"))) - - val conf = HBaseConfiguration.create() - - val hbaseContext = new HBaseContext(sc, conf) - - val getRdd = rdd.hbaseBulkGet[String](hbaseContext, TableName.valueOf(tableName), 2, - record => { - System.out.println("making Get") - new Get(record) - }, - (result: Result) => { - - val it = result.listCells().iterator() - val b = new StringBuilder - - b.append(Bytes.toString(result.getRow) + ":") - - while (it.hasNext) { - val cell = it.next() - val q = Bytes.toString(CellUtil.cloneQualifier(cell)) - if (q.equals("counter")) { - b.append("(" + q + "," + Bytes.toLong(CellUtil.cloneValue(cell)) + ")") - } else { - b.append("(" + q + "," + Bytes.toString(CellUtil.cloneValue(cell)) + ")") - } - } - b.toString() - }) - - getRdd.collect().foreach(v => println(v)) - - } finally { - sc.stop() - } - } -} http://git-wip-us.apache.org/repos/asf/hbase/blob/516d370b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/rdd/HBaseBulkPutExample.scala ---------------------------------------------------------------------- diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/rdd/HBaseBulkPutExample.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/rdd/HBaseBulkPutExample.scala deleted file mode 100644 index 28711b8..0000000 --- a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/rdd/HBaseBulkPutExample.scala +++ /dev/null @@ -1,76 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hbase.spark.example.rdd - -import org.apache.hadoop.hbase.client.Put -import org.apache.hadoop.hbase.spark.HBaseContext -import org.apache.hadoop.hbase.spark.HBaseRDDFunctions._ -import org.apache.hadoop.hbase.util.Bytes -import org.apache.hadoop.hbase.{HBaseConfiguration, TableName} -import org.apache.spark.{SparkConf, SparkContext} - -/** - * This is a simple example of putting records in HBase - * with the bulkPut function. - */ -object HBaseBulkPutExample { - def main(args: Array[String]) { - if (args.length < 2) { - println("HBaseBulkPutExample {tableName} {columnFamily} are missing an arguments") - return - } - - val tableName = args(0) - val columnFamily = args(1) - - val sparkConf = new SparkConf().setAppName("HBaseBulkPutExample " + - tableName + " " + columnFamily) - val sc = new SparkContext(sparkConf) - - try { - //[(Array[Byte], Array[(Array[Byte], Array[Byte], Array[Byte])])] - val rdd = sc.parallelize(Array( - (Bytes.toBytes("1"), - Array((Bytes.toBytes(columnFamily), Bytes.toBytes("1"), Bytes.toBytes("1")))), - (Bytes.toBytes("2"), - Array((Bytes.toBytes(columnFamily), Bytes.toBytes("1"), Bytes.toBytes("2")))), - (Bytes.toBytes("3"), - Array((Bytes.toBytes(columnFamily), Bytes.toBytes("1"), Bytes.toBytes("3")))), - (Bytes.toBytes("4"), - Array((Bytes.toBytes(columnFamily), Bytes.toBytes("1"), Bytes.toBytes("4")))), - (Bytes.toBytes("5"), - Array((Bytes.toBytes(columnFamily), Bytes.toBytes("1"), Bytes.toBytes("5")))) - )) - - val conf = HBaseConfiguration.create() - - val hbaseContext = new HBaseContext(sc, conf) - - rdd.hbaseBulkPut(hbaseContext, TableName.valueOf(tableName), - (putRecord) => { - val put = new Put(putRecord._1) - putRecord._2.foreach((putValue) => put.addColumn(putValue._1, putValue._2, - putValue._3)) - put - }) - - } finally { - sc.stop() - } - } - } http://git-wip-us.apache.org/repos/asf/hbase/blob/516d370b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/rdd/HBaseForeachPartitionExample.scala ---------------------------------------------------------------------- diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/rdd/HBaseForeachPartitionExample.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/rdd/HBaseForeachPartitionExample.scala deleted file mode 100644 index 8dfefc2..0000000 --- a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/rdd/HBaseForeachPartitionExample.scala +++ /dev/null @@ -1,83 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hbase.spark.example.rdd - -import org.apache.hadoop.hbase.client.Put -import org.apache.hadoop.hbase.{TableName, HBaseConfiguration} -import org.apache.hadoop.hbase.spark.HBaseContext -import org.apache.hadoop.hbase.spark.HBaseRDDFunctions._ -import org.apache.hadoop.hbase.util.Bytes -import org.apache.spark.{SparkContext, SparkConf} - -/** - * This is a simple example of using the foreachPartition - * method with a HBase connection - */ -object HBaseForeachPartitionExample { - def main(args: Array[String]) { - if (args.length < 2) { - println("HBaseForeachPartitionExample {tableName} {columnFamily} are missing an arguments") - return - } - - val tableName = args(0) - val columnFamily = args(1) - - val sparkConf = new SparkConf().setAppName("HBaseForeachPartitionExample " + - tableName + " " + columnFamily) - val sc = new SparkContext(sparkConf) - - try { - //[(Array[Byte], Array[(Array[Byte], Array[Byte], Array[Byte])])] - val rdd = sc.parallelize(Array( - (Bytes.toBytes("1"), - Array((Bytes.toBytes(columnFamily), Bytes.toBytes("1"), Bytes.toBytes("1")))), - (Bytes.toBytes("2"), - Array((Bytes.toBytes(columnFamily), Bytes.toBytes("1"), Bytes.toBytes("2")))), - (Bytes.toBytes("3"), - Array((Bytes.toBytes(columnFamily), Bytes.toBytes("1"), Bytes.toBytes("3")))), - (Bytes.toBytes("4"), - Array((Bytes.toBytes(columnFamily), Bytes.toBytes("1"), Bytes.toBytes("4")))), - (Bytes.toBytes("5"), - Array((Bytes.toBytes(columnFamily), Bytes.toBytes("1"), Bytes.toBytes("5")))) - )) - - val conf = HBaseConfiguration.create() - - val hbaseContext = new HBaseContext(sc, conf) - - - rdd.hbaseForeachPartition(hbaseContext, - (it, connection) => { - val m = connection.getBufferedMutator(TableName.valueOf(tableName)) - - it.foreach(r => { - val put = new Put(r._1) - r._2.foreach((putValue) => - put.addColumn(putValue._1, putValue._2, putValue._3)) - m.mutate(put) - }) - m.flush() - m.close() - }) - - } finally { - sc.stop() - } - } -} http://git-wip-us.apache.org/repos/asf/hbase/blob/516d370b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/rdd/HBaseMapPartitionExample.scala ---------------------------------------------------------------------- diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/rdd/HBaseMapPartitionExample.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/rdd/HBaseMapPartitionExample.scala deleted file mode 100644 index 0d0b314..0000000 --- a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/rdd/HBaseMapPartitionExample.scala +++ /dev/null @@ -1,89 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hbase.spark.example.rdd - -import org.apache.hadoop.hbase.client.Get -import org.apache.hadoop.hbase.{TableName, HBaseConfiguration} -import org.apache.hadoop.hbase.spark.HBaseContext -import org.apache.hadoop.hbase.spark.HBaseRDDFunctions._ -import org.apache.hadoop.hbase.util.Bytes -import org.apache.spark.{SparkContext, SparkConf} - -/** - * This is a simple example of using the mapPartitions - * method with a HBase connection - */ -object HBaseMapPartitionExample { - def main(args: Array[String]) { - if (args.length < 1) { - println("HBaseMapPartitionExample {tableName} is missing an argument") - return - } - - val tableName = args(0) - - val sparkConf = new SparkConf().setAppName("HBaseMapPartitionExample " + tableName) - val sc = new SparkContext(sparkConf) - - try { - - //[(Array[Byte])] - val rdd = sc.parallelize(Array( - Bytes.toBytes("1"), - Bytes.toBytes("2"), - Bytes.toBytes("3"), - Bytes.toBytes("4"), - Bytes.toBytes("5"), - Bytes.toBytes("6"), - Bytes.toBytes("7"))) - - val conf = HBaseConfiguration.create() - - val hbaseContext = new HBaseContext(sc, conf) - - val getRdd = rdd.hbaseMapPartitions[String](hbaseContext, (it, connection) => { - val table = connection.getTable(TableName.valueOf(tableName)) - it.map{r => - //batching would be faster. This is just an example - val result = table.get(new Get(r)) - - val it = result.listCells().iterator() - val b = new StringBuilder - - b.append(Bytes.toString(result.getRow) + ":") - - while (it.hasNext) { - val cell = it.next() - val q = Bytes.toString(cell.getQualifierArray) - if (q.equals("counter")) { - b.append("(" + q + "," + Bytes.toLong(cell.getValueArray) + ")") - } else { - b.append("(" + q + "," + Bytes.toString(cell.getValueArray) + ")") - } - } - b.toString() - } - }) - - getRdd.collect().foreach(v => println(v)) - - } finally { - sc.stop() - } - } -} http://git-wip-us.apache.org/repos/asf/hbase/blob/516d370b/hbase-spark/src/main/scala/org/apache/spark/sql/datasources/hbase/DataTypeParserWrapper.scala ---------------------------------------------------------------------- diff --git a/hbase-spark/src/main/scala/org/apache/spark/sql/datasources/hbase/DataTypeParserWrapper.scala b/hbase-spark/src/main/scala/org/apache/spark/sql/datasources/hbase/DataTypeParserWrapper.scala deleted file mode 100644 index 3df23f9..0000000 --- a/hbase-spark/src/main/scala/org/apache/spark/sql/datasources/hbase/DataTypeParserWrapper.scala +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.datasources.hbase - -import org.apache.spark.sql.catalyst.SqlLexical -import org.apache.spark.sql.catalyst.util.DataTypeParser -import org.apache.spark.sql.types.DataType - -// TODO: Only used in test suite. -object DataTypeParserWrapper { - lazy val dataTypeParser = new DataTypeParser { - override val lexical = new SqlLexical - } - - def parse(dataTypeString: String): DataType = dataTypeParser.toDataType(dataTypeString) -}