Re: Task result is serialized twice by serializer and closure serializer
Hey Mingyu, I think it's broken out separately so we can record the time taken to serialize the result. Once we serializing it once, the second serialization should be really simple since it's just wrapping something that has already been turned into a byte buffer. Do you see a specific issue with serializing it twice? I think you need to have two steps if you want to record the time taken to serialize the result, since that needs to be sent back to the driver when the task completes. - Patrick On Wed, Mar 4, 2015 at 4:01 PM, Mingyu Kim m...@palantir.com wrote: Hi all, It looks like the result of task is serialized twice, once by serializer (I.e. Java/Kryo depending on configuration) and once again by closure serializer (I.e. Java). To link the actual code, The first one: https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/executor/Executor.scala#L213 The second one: https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/executor/Executor.scala#L226 This serializes the value, which is the result of task run twice, which affects things like collect(), takeSample(), and toLocalIterator(). Would it make sense to simply serialize the DirectTaskResult once using the regular serializer (as opposed to closure serializer)? Would it cause problems when the Accumulator values are not Kryo-serializable? Alternatively, if we can assume that Accumator values are small, we can closure-serialize those, put the serialized byte array in DirectTaskResult with the raw task result value, and serialize DirectTaskResult. What do people think? Thanks, Mingyu - To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For additional commands, e-mail: dev-h...@spark.apache.org
Re: Task result is serialized twice by serializer and closure serializer
The concern is really just the runtime overhead and memory footprint of Java-serializing an already-serialized byte array again. We originally noticed this when we were using RDD.toLocalIterator() which serializes the entire 64MB partition. We worked around this issue by kryo-serializing and snappy-compressing the partition on the executor side before returning it back to the driver, but this operation just felt redundant. Your explanation about reporting the time taken makes it clearer why it¹s designed this way. Since the byte array for the serialized task result shouldn¹t account for the majority of memory footprint anyways, I¹m okay with leaving it as is, then. Thanks, Mingyu On 3/4/15, 5:07 PM, Patrick Wendell pwend...@gmail.com wrote: Hey Mingyu, I think it's broken out separately so we can record the time taken to serialize the result. Once we serializing it once, the second serialization should be really simple since it's just wrapping something that has already been turned into a byte buffer. Do you see a specific issue with serializing it twice? I think you need to have two steps if you want to record the time taken to serialize the result, since that needs to be sent back to the driver when the task completes. - Patrick On Wed, Mar 4, 2015 at 4:01 PM, Mingyu Kim m...@palantir.com wrote: Hi all, It looks like the result of task is serialized twice, once by serializer (I.e. Java/Kryo depending on configuration) and once again by closure serializer (I.e. Java). To link the actual code, The first one: https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_apache_sp ark_blob_master_core_src_main_scala_org_apache_spark_executor_Executor.sc ala-23L213d=AwIFAwc=izlc9mHr637UR4lpLEZLFFS3Vn2UXBrZ4tFb6oOnmz8r=ennQJ q47pNnObsDh-88a9YUrUulcYQoV8giPASqXB84m=dw_fNvxBZ1DixNDGBTXRZBKn36QFyH-9 WMY_2Z07ulAs=cSKekTNmnB0g54h6-FaF-zOL46UZC_1_LdKK3p9Q0aAe= The second one: https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_apache_sp ark_blob_master_core_src_main_scala_org_apache_spark_executor_Executor.sc ala-23L226d=AwIFAwc=izlc9mHr637UR4lpLEZLFFS3Vn2UXBrZ4tFb6oOnmz8r=ennQJ q47pNnObsDh-88a9YUrUulcYQoV8giPASqXB84m=dw_fNvxBZ1DixNDGBTXRZBKn36QFyH-9 WMY_2Z07ulAs=PFoz0HyINd2XuiqkHPgyMsOh9eFkCwXOdl9zdxfBwxMe= This serializes the value, which is the result of task run twice, which affects things like collect(), takeSample(), and toLocalIterator(). Would it make sense to simply serialize the DirectTaskResult once using the regular serializer (as opposed to closure serializer)? Would it cause problems when the Accumulator values are not Kryo-serializable? Alternatively, if we can assume that Accumator values are small, we can closure-serialize those, put the serialized byte array in DirectTaskResult with the raw task result value, and serialize DirectTaskResult. What do people think? Thanks, Mingyu - To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For additional commands, e-mail: dev-h...@spark.apache.org
Task result is serialized twice by serializer and closure serializer
Hi all, It looks like the result of task is serialized twice, once by serializer (I.e. Java/Kryo depending on configuration) and once again by closure serializer (I.e. Java). To link the actual code, The first one: https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/executor/Executor.scala#L213 The second one: https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/executor/Executor.scala#L226 This serializes the “value”, which is the result of task run twice, which affects things like collect(), takeSample(), and toLocalIterator(). Would it make sense to simply serialize the DirectTaskResult once using the regular “serializer” (as opposed to closure serializer)? Would it cause problems when the Accumulator values are not Kryo-serializable? Alternatively, if we can assume that Accumator values are small, we can closure-serialize those, put the serialized byte array in DirectTaskResult with the raw task result “value”, and serialize DirectTaskResult. What do people think? Thanks, Mingyu
Re: Task result is serialized twice by serializer and closure serializer
Yeah, it will result in a second serialized copy of the array (costing some memory). But the computational overhead should be very small. The absolute worst case here will be when doing a collect() or something similar that just bundles the entire partition. - Patrick On Wed, Mar 4, 2015 at 5:47 PM, Mingyu Kim m...@palantir.com wrote: The concern is really just the runtime overhead and memory footprint of Java-serializing an already-serialized byte array again. We originally noticed this when we were using RDD.toLocalIterator() which serializes the entire 64MB partition. We worked around this issue by kryo-serializing and snappy-compressing the partition on the executor side before returning it back to the driver, but this operation just felt redundant. Your explanation about reporting the time taken makes it clearer why it¹s designed this way. Since the byte array for the serialized task result shouldn¹t account for the majority of memory footprint anyways, I¹m okay with leaving it as is, then. Thanks, Mingyu On 3/4/15, 5:07 PM, Patrick Wendell pwend...@gmail.com wrote: Hey Mingyu, I think it's broken out separately so we can record the time taken to serialize the result. Once we serializing it once, the second serialization should be really simple since it's just wrapping something that has already been turned into a byte buffer. Do you see a specific issue with serializing it twice? I think you need to have two steps if you want to record the time taken to serialize the result, since that needs to be sent back to the driver when the task completes. - Patrick On Wed, Mar 4, 2015 at 4:01 PM, Mingyu Kim m...@palantir.com wrote: Hi all, It looks like the result of task is serialized twice, once by serializer (I.e. Java/Kryo depending on configuration) and once again by closure serializer (I.e. Java). To link the actual code, The first one: https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_apache_sp ark_blob_master_core_src_main_scala_org_apache_spark_executor_Executor.sc ala-23L213d=AwIFAwc=izlc9mHr637UR4lpLEZLFFS3Vn2UXBrZ4tFb6oOnmz8r=ennQJ q47pNnObsDh-88a9YUrUulcYQoV8giPASqXB84m=dw_fNvxBZ1DixNDGBTXRZBKn36QFyH-9 WMY_2Z07ulAs=cSKekTNmnB0g54h6-FaF-zOL46UZC_1_LdKK3p9Q0aAe= The second one: https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_apache_sp ark_blob_master_core_src_main_scala_org_apache_spark_executor_Executor.sc ala-23L226d=AwIFAwc=izlc9mHr637UR4lpLEZLFFS3Vn2UXBrZ4tFb6oOnmz8r=ennQJ q47pNnObsDh-88a9YUrUulcYQoV8giPASqXB84m=dw_fNvxBZ1DixNDGBTXRZBKn36QFyH-9 WMY_2Z07ulAs=PFoz0HyINd2XuiqkHPgyMsOh9eFkCwXOdl9zdxfBwxMe= This serializes the value, which is the result of task run twice, which affects things like collect(), takeSample(), and toLocalIterator(). Would it make sense to simply serialize the DirectTaskResult once using the regular serializer (as opposed to closure serializer)? Would it cause problems when the Accumulator values are not Kryo-serializable? Alternatively, if we can assume that Accumator values are small, we can closure-serialize those, put the serialized byte array in DirectTaskResult with the raw task result value, and serialize DirectTaskResult. What do people think? Thanks, Mingyu - To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For additional commands, e-mail: dev-h...@spark.apache.org