Github user viirya commented on a diff in the pull request:
https://github.com/apache/spark/pull/4014#discussion_r23516234
--- Diff:
sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala
---
@@ -53,28 +78,176 @@ case class ScriptTransformation(
val inputStream = proc.getInputStream
val outputStream = proc.getOutputStream
val reader = new BufferedReader(new InputStreamReader(inputStream))
+
+ val outputSerde: AbstractSerDe = if (ioschema.outputSerdeClass !=
"") {
+ val trimed_class = ioschema.outputSerdeClass.split("'")(1)
+ Utils.classForName(trimed_class)
+ .newInstance.asInstanceOf[AbstractSerDe]
+ } else {
+ null
+ }
+
+ if (outputSerde != null) {
+ val columns = output.map { case aref: AttributeReference =>
aref.name }
+ .mkString(",")
+ val columnTypes = output.map { case aref: AttributeReference =>
+ aref.dataType.toTypeInfo.getTypeName()
+ }.mkString(",")
+
+ var propsMap = ioschema.outputSerdeProps.map(kv => {
+ (kv._1.split("'")(1), kv._2.split("'")(1))
+ }).toMap + (serdeConstants.LIST_COLUMNS -> columns)
+ propsMap = propsMap + (serdeConstants.LIST_COLUMN_TYPES ->
columnTypes)
+
+ val properties = new Properties()
+ properties.putAll(propsMap)
+
+ outputSerde.initialize(null, properties)
+ }
+
+ val outputSoi = if (outputSerde != null) {
+
outputSerde.getObjectInspector().asInstanceOf[StructObjectInspector]
+ } else {
+ null
+ }
+
+ val iterator: Iterator[Row] = new Iterator[Row] with HiveInspectors {
+ var cacheRow: Row = null
+ var curLine: String = null
+ var eof: Boolean = false
+
+ override def hasNext: Boolean = {
+ if (outputSerde == null) {
+ if (curLine == null) {
+ curLine = reader.readLine()
+ curLine != null
+ } else {
+ true
+ }
+ } else {
+ !eof
+ }
+ }
- // TODO: This should be exposed as an iterator instead of reading in
all the data at once.
- val outputLines = collection.mutable.ArrayBuffer[Row]()
- val readerThread = new Thread("Transform OutputReader") {
- override def run() {
- var curLine = reader.readLine()
- while (curLine != null) {
- // TODO: Use SerDe
- outputLines += new
GenericRow(curLine.split("\t").asInstanceOf[Array[Any]])
+ def deserialize(): Row = {
+ if (cacheRow != null) return cacheRow
+
+ val mutableRow = new SpecificMutableRow(output.map(_.dataType))
+ try {
+ val dataInputStream = new DataInputStream(inputStream)
+ val writable = outputSerde.getSerializedClass().newInstance
+ writable.readFields(dataInputStream)
+
+ val raw = outputSerde.deserialize(writable)
+ val dataList = outputSoi.getStructFieldsDataAsList(raw)
+ val fieldList = outputSoi.getAllStructFieldRefs()
+
+ var i = 0
+ dataList.foreach( element => {
+ if (element == null) {
+ mutableRow.setNullAt(i)
+ } else {
+ mutableRow(i) = unwrap(element,
fieldList(i).getFieldObjectInspector)
+ }
+ i += 1
+ })
+ return mutableRow
+ } catch {
+ case e: EOFException =>
+ eof = true
+ return null
+ }
+ }
+
+ override def next(): Row = {
+ if (!hasNext) {
+ throw new NoSuchElementException
+ }
+
+ if (outputSerde == null) {
+ val prevLine = curLine
curLine = reader.readLine()
+
+ if (!ioschema.schemaLess) {
+ new GenericRow(
+
prevLine.split(outputRowFormatMap("TOK_TABLEROWFORMATFIELD"))
+ .asInstanceOf[Array[Any]])
+ } else {
+ new GenericRow(
+
prevLine.split(outputRowFormatMap("TOK_TABLEROWFORMATFIELD"), 2)
+ .asInstanceOf[Array[Any]])
+ }
+ } else {
+ val ret = deserialize()
+ if (!eof) {
+ cacheRow = null
+ cacheRow = deserialize()
+ }
+ ret
}
}
}
- readerThread.start()
+
+ val inputSerde: AbstractSerDe = if (ioschema.inputSerdeClass != "") {
+ val trimed_class = ioschema.inputSerdeClass.split("'")(1)
+ Utils.classForName(trimed_class)
+ .newInstance.asInstanceOf[AbstractSerDe]
+ } else {
+ null
+ }
+
+ if (inputSerde != null) {
+ val columns = input.map { case e: NamedExpression => e.name
}.mkString(",")
+ val columnTypes = input.map { case e: NamedExpression =>
+ e.dataType.toTypeInfo.getTypeName()
+ }.mkString(",")
+
+ var propsMap = ioschema.inputSerdeProps.map(kv => {
+ (kv._1.split("'")(1), kv._2.split("'")(1))
+ }).toMap + (serdeConstants.LIST_COLUMNS -> columns)
+ propsMap = propsMap + (serdeConstants.LIST_COLUMN_TYPES ->
columnTypes)
+
+ val properties = new Properties()
+ properties.putAll(propsMap)
+ inputSerde.initialize(null, properties)
+ }
+
+ val inputSoi = if (inputSerde != null) {
+ val fieldNames = input.map { case e: NamedExpression => e.name }
+ val fieldObjectInspectors = input.map { case e: NamedExpression =>
+ toInspector(e.dataType)
+ }
+ ObjectInspectorFactory
+ .getStandardStructObjectInspector(fieldNames,
fieldObjectInspectors)
+ .asInstanceOf[ObjectInspector]
+ } else {
+ null
+ }
+
+ val dataOutputStream = new DataOutputStream(outputStream)
val outputProjection = new InterpretedProjection(input, child.output)
iter
.map(outputProjection)
- // TODO: Use SerDe
- .map(_.mkString("", "\t",
"\n").getBytes("utf-8")).foreach(outputStream.write)
+ .foreach { row =>
+ if (inputSerde == null) {
+ val data = row.mkString("",
inputRowFormatMap("TOK_TABLEROWFORMATFIELD"),
+ inputRowFormatMap("TOK_TABLEROWFORMATLINES")).getBytes("utf-8")
+
+ outputStream.write(data)
+ } else {
+ val writable =
inputSerde.serialize(row.asInstanceOf[GenericRow].values, inputSoi)
+ if (writable.isInstanceOf[AvroGenericRecordWritable] &&
HiveShim.version == "0.13.1") {
--- End diff --
I believe that Hive has a bug when using Avro as SerDe in transform syntax.
I am trying to send an issue and patch to Hive. I agree that we'd better keep
this as generic purpose. So I may remove the `if` block and comment out unit
tests of Avro SerDe and write down the reason there.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]