mridulm commented on code in PR #38428:
URL: https://github.com/apache/spark/pull/38428#discussion_r1008798658
##########
core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala:
##########
@@ -301,15 +300,18 @@ class KryoDeserializationStream(
private[this] var kryo: Kryo = serInstance.borrowKryo()
+ final private[this] def hasNext: Boolean = {
Review Comment:
`private[this]` is sufficient ?
##########
core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala:
##########
@@ -504,44 +505,31 @@ class ExternalAppendOnlyMap[K, V, C](
* If no more pairs are left, return null.
*/
private def readNextItem(): (K, C) = {
- try {
- val k = deserializeStream.readKey().asInstanceOf[K]
- val c = deserializeStream.readValue().asInstanceOf[C]
- val item = (k, c)
- objectsRead += 1
- if (objectsRead == serializerBatchSize) {
- objectsRead = 0
- deserializeStream = nextBatchStream()
- }
- item
- } catch {
- case e: EOFException =>
- cleanup()
- null
+ val next = batchIterator.next()
+ objectsRead += 1
+ if (objectsRead == serializerBatchSize) {
+ objectsRead = 0
+ batchIterator = nextBatchIterator()
}
+ next
}
override def hasNext: Boolean = {
- if (nextItem == null) {
- if (deserializeStream == null) {
- // In case of deserializeStream has not been initialized
- deserializeStream = nextBatchStream()
- if (deserializeStream == null) {
- return false
- }
+ if (batchIterator == null) {
+ // In case of batchIterator has not been initialized
+ batchIterator = nextBatchIterator()
+ if (batchIterator == null) {
+ return false
}
- nextItem = readNextItem()
}
- nextItem != null
+ batchIterator.hasNext
}
override def next(): (K, C) = {
- if (!hasNext) {
+ if (batchIterator == null) {
Review Comment:
We need `hasNext` here - if current iterator is exhausted, it will be
non-null, but empty
##########
core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala:
##########
@@ -301,15 +300,18 @@ class KryoDeserializationStream(
private[this] var kryo: Kryo = serInstance.borrowKryo()
+ final private[this] def hasNext: Boolean = {
+ if (input == null) {
+ return false
+ }
+
+ val eof = input.eof()
+ if (eof) close()
+ !eof
+ }
+
override def readObject[T: ClassTag](): T = {
- try {
kryo.readClassAndObject(input).asInstanceOf[T]
- } catch {
- // DeserializationStream uses the EOF exception to indicate stopping
condition.
- case e: KryoException
- if e.getMessage.toLowerCase(Locale.ROOT).contains("buffer underflow")
=>
- throw new EOFException
- }
Review Comment:
Preserve this even with the proposed change of checking eof - to continue
catching cases where EOF is encountered prematurely ?
This will be mainly to handle abnormal cases, instead of the common case.
##########
core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala:
##########
@@ -324,6 +326,36 @@ class KryoDeserializationStream(
}
}
}
+
+ final override def asIterator: Iterator[Any] = new NextIterator[Any] {
+ override protected def getNext() = {
+ if (KryoDeserializationStream.this.hasNext) {
+ readValue[Any]()
+ } else {
+ finished = true
+ null
+ }
+ }
+
+ override protected def close(): Unit = {
+ KryoDeserializationStream.this.close()
+ }
+ }
+
+ final override def asKeyValueIterator: Iterator[(Any, Any)] = new
NextIterator[(Any, Any)] {
+ override protected def getNext() = {
+ if (KryoDeserializationStream.this.hasNext) {
+ (readKey[Any](), readValue[Any]())
Review Comment:
Given we are fix this, not make assumptions that if key is present, value
will be as well ?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]