Github user viirya commented on a diff in the pull request:

    https://github.com/apache/spark/pull/7710#discussion_r35629043
  
    --- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala
 ---
    @@ -146,49 +198,83 @@ case class ScriptTransformation(
             }
           }
     
    -      val (inputSerde, inputSoi) = ioschema.initInputSerDe(input)
    -      val dataOutputStream = new DataOutputStream(outputStream)
    -      val outputProjection = new InterpretedProjection(input, child.output)
    +      writerThread.start()
     
    -      // TODO make the 2048 configurable?
    -      val stderrBuffer = new CircularBuffer(2048)
    -      // Consume the error stream from the pipeline, otherwise it will be 
blocked if
    -      // the pipeline is full.
    -      new RedirectThread(errorStream, // input stream from the pipeline
    -        stderrBuffer,                 // output to a circular buffer
    -        "Thread-ScriptTransformation-STDERR-Consumer").start()
    +      outputIterator
    +    }
     
    -      // Put the write(output to the pipeline) into a single thread
    -      // and keep the collector as remain in the main thread.
    -      // otherwise it will causes deadlock if the data size greater than
    -      // the pipeline / buffer capacity.
    -      new Thread(new Runnable() {
    -        override def run(): Unit = {
    -          Utils.tryWithSafeFinally {
    -            iter
    -              .map(outputProjection)
    -              .foreach { row =>
    -              if (inputSerde == null) {
    -                val data = row.mkString("", 
ioschema.inputRowFormatMap("TOK_TABLEROWFORMATFIELD"),
    -                  
ioschema.inputRowFormatMap("TOK_TABLEROWFORMATLINES")).getBytes("utf-8")
    -
    -                outputStream.write(data)
    -              } else {
    -                val writable = inputSerde.serialize(
    -                  row.asInstanceOf[GenericInternalRow].values, inputSoi)
    -                prepareWritable(writable).write(dataOutputStream)
    -              }
    -            }
    -            outputStream.close()
    -          } {
    -            if (proc.waitFor() != 0) {
    -              logError(stderrBuffer.toString) // log the stderr circular 
buffer
    -            }
    -          }
    -        }
    -      }, "Thread-ScriptTransformation-Feed").start()
    +    child.execute().mapPartitions { iter =>
    +      if (iter.hasNext) {
    +        processIterator(iter)
    +      } else {
    +        // If the input iterator has no rows then do not launch the 
external script.
    +        Iterator.empty
    +      }
    +    }
    +  }
    +}
     
    -      iterator
    +private class ScriptTransformationWriterThread(
    +    iter: Iterator[InternalRow],
    +    outputProjection: Projection,
    +    @Nullable inputSerde: AbstractSerDe,
    +    @Nullable inputSoi: ObjectInspector,
    +    ioschema: HiveScriptIOSchema,
    +    outputStream: OutputStream,
    +    proc: Process,
    +    stderrBuffer: CircularBuffer,
    +    taskContext: TaskContext
    +  ) extends Thread("Thread-ScriptTransformation-Feed") with Logging {
    +
    +  setDaemon(true)
    +
    +  @volatile private var _exception: Throwable = null
    +
    +  /** Contains the exception thrown while writing the parent iterator to 
the external process. */
    +  def exception: Option[Throwable] = Option(_exception)
    +
    +  override def run(): Unit = Utils.logUncaughtExceptions {
    +    TaskContext.setTaskContext(taskContext)
    +
    +    val dataOutputStream = new DataOutputStream(outputStream)
    +
    +    // We can't use Utils.tryWithSafeFinally here because we also need a 
`catch` block, so
    +    // let's use a variable to record whether the `finally` block was hit 
due to an exception
    +    var threwException: Boolean = true
    +    try {
    +      iter.map(outputProjection).foreach { row =>
    +        if (inputSerde == null) {
    +          val data = row.mkString("", 
ioschema.inputRowFormatMap("TOK_TABLEROWFORMATFIELD"),
    +            
ioschema.inputRowFormatMap("TOK_TABLEROWFORMATLINES")).getBytes("utf-8")
    +          outputStream.write(data)
    +        } else {
    +          val writable = inputSerde.serialize(
    +            row.asInstanceOf[GenericInternalRow].values, inputSoi)
    +          prepareWritable(writable).write(dataOutputStream)
    +        }
    +      }
    +      outputStream.close()
    +      threwException = false
    +    } catch {
    +      case NonFatal(e) =>
    +        // An error occurred while writing input, so kill the child 
process. According to the
    +        // Javadoc this call will not throw an exception:
    +        _exception = e
    +        proc.destroy()
    +        throw e
    +    } finally {
    +      try {
    +        if (proc.waitFor() != 0) {
    +          logError(stderrBuffer.toString) // log the stderr circular buffer
    +        }
    +      } catch {
    +        case NonFatal(exceptionFromFinallyBlock) =>
    +          if (!threwException) {
    --- End diff --
    
    Why we have different handling for `exceptionFromFinallyBlock` here? Looks 
like when no exception is thrown in first try block, we will re-throw 
`exceptionFromFinallyBlock`, if there is an exception above, we just log it?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to