eejbyfeldt commented on code in PR #38428:
URL: https://github.com/apache/spark/pull/38428#discussion_r1010340881
##########
core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala:
##########
@@ -301,15 +300,18 @@ class KryoDeserializationStream(
private[this] var kryo: Kryo = serInstance.borrowKryo()
+ final private[this] def hasNext: Boolean = {
Review Comment:
Yes, will fix.
##########
core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala:
##########
@@ -324,6 +326,36 @@ class KryoDeserializationStream(
}
}
}
+
+ final override def asIterator: Iterator[Any] = new NextIterator[Any] {
+ override protected def getNext() = {
+ if (KryoDeserializationStream.this.hasNext) {
+ readValue[Any]()
+ } else {
+ finished = true
+ null
+ }
+ }
+
+ override protected def close(): Unit = {
+ KryoDeserializationStream.this.close()
+ }
+ }
+
+ final override def asKeyValueIterator: Iterator[(Any, Any)] = new
NextIterator[(Any, Any)] {
+ override protected def getNext() = {
+ if (KryoDeserializationStream.this.hasNext) {
+ (readKey[Any](), readValue[Any]())
Review Comment:
You mean that if only a key exist we just ignore it like the current
implementation would?
##########
core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala:
##########
@@ -301,15 +300,18 @@ class KryoDeserializationStream(
private[this] var kryo: Kryo = serInstance.borrowKryo()
+ final private[this] def hasNext: Boolean = {
+ if (input == null) {
+ return false
+ }
+
+ val eof = input.eof()
+ if (eof) close()
+ !eof
+ }
+
override def readObject[T: ClassTag](): T = {
- try {
kryo.readClassAndObject(input).asInstanceOf[T]
- } catch {
- // DeserializationStream uses the EOF exception to indicate stopping
condition.
- case e: KryoException
- if e.getMessage.toLowerCase(Locale.ROOT).contains("buffer underflow")
=>
- throw new EOFException
- }
Review Comment:
Sure will add it back. I think that catching and ignoring the exceptions
here should be revisited in some other change as it seems to me like it could
case dataloss that we just assume the exception here means EOF.
##########
core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala:
##########
@@ -504,44 +505,31 @@ class ExternalAppendOnlyMap[K, V, C](
* If no more pairs are left, return null.
*/
private def readNextItem(): (K, C) = {
- try {
- val k = deserializeStream.readKey().asInstanceOf[K]
- val c = deserializeStream.readValue().asInstanceOf[C]
- val item = (k, c)
- objectsRead += 1
- if (objectsRead == serializerBatchSize) {
- objectsRead = 0
- deserializeStream = nextBatchStream()
- }
- item
- } catch {
- case e: EOFException =>
- cleanup()
- null
+ val next = batchIterator.next()
+ objectsRead += 1
+ if (objectsRead == serializerBatchSize) {
+ objectsRead = 0
+ batchIterator = nextBatchIterator()
}
+ next
}
override def hasNext: Boolean = {
- if (nextItem == null) {
- if (deserializeStream == null) {
- // In case of deserializeStream has not been initialized
- deserializeStream = nextBatchStream()
- if (deserializeStream == null) {
- return false
- }
+ if (batchIterator == null) {
+ // In case of batchIterator has not been initialized
+ batchIterator = nextBatchIterator()
+ if (batchIterator == null) {
+ return false
}
- nextItem = readNextItem()
}
- nextItem != null
+ batchIterator.hasNext
}
override def next(): (K, C) = {
- if (!hasNext) {
+ if (batchIterator == null) {
Review Comment:
In that case it will call next on the empty iterator and we should still
throw a `NoSuchElementException`. But `!hasNext` should also have that behavior
so can change back to that.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]