Re: Task result is serialized twice by serializer and closure serializer

2015-03-04 Thread Patrick Wendell
Hey Mingyu,

I think it's broken out separately so we can record the time taken to
serialize the result. Once we serializing it once, the second
serialization should be really simple since it's just wrapping
something that has already been turned into a byte buffer. Do you see
a specific issue with serializing it twice?

I think you need to have two steps if you want to record the time
taken to serialize the result, since that needs to be sent back to the
driver when the task completes.

- Patrick

On Wed, Mar 4, 2015 at 4:01 PM, Mingyu Kim m...@palantir.com wrote:
 Hi all,

 It looks like the result of task is serialized twice, once by serializer 
 (I.e. Java/Kryo depending on configuration) and once again by closure 
 serializer (I.e. Java). To link the actual code,

 The first one: 
 https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/executor/Executor.scala#L213
 The second one: 
 https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/executor/Executor.scala#L226

 This serializes the value, which is the result of task run twice, which 
 affects things like collect(), takeSample(), and toLocalIterator(). Would it 
 make sense to simply serialize the DirectTaskResult once using the regular 
 serializer (as opposed to closure serializer)? Would it cause problems when 
 the Accumulator values are not Kryo-serializable?

 Alternatively, if we can assume that Accumator values are small, we can 
 closure-serialize those, put the serialized byte array in DirectTaskResult 
 with the raw task result value, and serialize DirectTaskResult.

 What do people think?

 Thanks,
 Mingyu

-
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org



Re: Task result is serialized twice by serializer and closure serializer

2015-03-04 Thread Mingyu Kim
The concern is really just the runtime overhead and memory footprint of
Java-serializing an already-serialized byte array again. We originally
noticed this when we were using RDD.toLocalIterator() which serializes the
entire 64MB partition. We worked around this issue by kryo-serializing and
snappy-compressing the partition on the executor side before returning it
back to the driver, but this operation just felt redundant.

Your explanation about reporting the time taken makes it clearer why it¹s
designed this way. Since the byte array for the serialized task result
shouldn¹t account for the majority of memory footprint anyways, I¹m okay
with leaving it as is, then.

Thanks,
Mingyu





On 3/4/15, 5:07 PM, Patrick Wendell pwend...@gmail.com wrote:

Hey Mingyu,

I think it's broken out separately so we can record the time taken to
serialize the result. Once we serializing it once, the second
serialization should be really simple since it's just wrapping
something that has already been turned into a byte buffer. Do you see
a specific issue with serializing it twice?

I think you need to have two steps if you want to record the time
taken to serialize the result, since that needs to be sent back to the
driver when the task completes.

- Patrick

On Wed, Mar 4, 2015 at 4:01 PM, Mingyu Kim m...@palantir.com wrote:
 Hi all,

 It looks like the result of task is serialized twice, once by
serializer (I.e. Java/Kryo depending on configuration) and once again by
closure serializer (I.e. Java). To link the actual code,

 The first one: 
https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_apache_sp
ark_blob_master_core_src_main_scala_org_apache_spark_executor_Executor.sc
ala-23L213d=AwIFAwc=izlc9mHr637UR4lpLEZLFFS3Vn2UXBrZ4tFb6oOnmz8r=ennQJ
q47pNnObsDh-88a9YUrUulcYQoV8giPASqXB84m=dw_fNvxBZ1DixNDGBTXRZBKn36QFyH-9
WMY_2Z07ulAs=cSKekTNmnB0g54h6-FaF-zOL46UZC_1_LdKK3p9Q0aAe=
 The second one: 
https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_apache_sp
ark_blob_master_core_src_main_scala_org_apache_spark_executor_Executor.sc
ala-23L226d=AwIFAwc=izlc9mHr637UR4lpLEZLFFS3Vn2UXBrZ4tFb6oOnmz8r=ennQJ
q47pNnObsDh-88a9YUrUulcYQoV8giPASqXB84m=dw_fNvxBZ1DixNDGBTXRZBKn36QFyH-9
WMY_2Z07ulAs=PFoz0HyINd2XuiqkHPgyMsOh9eFkCwXOdl9zdxfBwxMe=

 This serializes the value, which is the result of task run twice,
which affects things like collect(), takeSample(), and
toLocalIterator(). Would it make sense to simply serialize the
DirectTaskResult once using the regular serializer (as opposed to
closure serializer)? Would it cause problems when the Accumulator values
are not Kryo-serializable?

 Alternatively, if we can assume that Accumator values are small, we can
closure-serialize those, put the serialized byte array in
DirectTaskResult with the raw task result value, and serialize
DirectTaskResult.

 What do people think?

 Thanks,
 Mingyu


-
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org



Task result is serialized twice by serializer and closure serializer

2015-03-04 Thread Mingyu Kim
Hi all,

It looks like the result of task is serialized twice, once by serializer (I.e. 
Java/Kryo depending on configuration) and once again by closure serializer 
(I.e. Java). To link the actual code,

The first one: 
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/executor/Executor.scala#L213
The second one: 
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/executor/Executor.scala#L226

This serializes the “value”, which is the result of task run twice, which 
affects things like collect(), takeSample(), and toLocalIterator(). Would it 
make sense to simply serialize the DirectTaskResult once using the regular 
“serializer” (as opposed to closure serializer)? Would it cause problems when 
the Accumulator values are not Kryo-serializable?

Alternatively, if we can assume that Accumator values are small, we can 
closure-serialize those, put the serialized byte array in DirectTaskResult with 
the raw task result “value”, and serialize DirectTaskResult.

What do people think?

Thanks,
Mingyu


Re: Task result is serialized twice by serializer and closure serializer

2015-03-04 Thread Patrick Wendell
Yeah, it will result in a second serialized copy of the array (costing
some memory). But the computational overhead should be very small. The
absolute worst case here will be when doing a collect() or something
similar that just bundles the entire partition.

- Patrick

On Wed, Mar 4, 2015 at 5:47 PM, Mingyu Kim m...@palantir.com wrote:
 The concern is really just the runtime overhead and memory footprint of
 Java-serializing an already-serialized byte array again. We originally
 noticed this when we were using RDD.toLocalIterator() which serializes the
 entire 64MB partition. We worked around this issue by kryo-serializing and
 snappy-compressing the partition on the executor side before returning it
 back to the driver, but this operation just felt redundant.

 Your explanation about reporting the time taken makes it clearer why it¹s
 designed this way. Since the byte array for the serialized task result
 shouldn¹t account for the majority of memory footprint anyways, I¹m okay
 with leaving it as is, then.

 Thanks,
 Mingyu





 On 3/4/15, 5:07 PM, Patrick Wendell pwend...@gmail.com wrote:

Hey Mingyu,

I think it's broken out separately so we can record the time taken to
serialize the result. Once we serializing it once, the second
serialization should be really simple since it's just wrapping
something that has already been turned into a byte buffer. Do you see
a specific issue with serializing it twice?

I think you need to have two steps if you want to record the time
taken to serialize the result, since that needs to be sent back to the
driver when the task completes.

- Patrick

On Wed, Mar 4, 2015 at 4:01 PM, Mingyu Kim m...@palantir.com wrote:
 Hi all,

 It looks like the result of task is serialized twice, once by
serializer (I.e. Java/Kryo depending on configuration) and once again by
closure serializer (I.e. Java). To link the actual code,

 The first one:
https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_apache_sp
ark_blob_master_core_src_main_scala_org_apache_spark_executor_Executor.sc
ala-23L213d=AwIFAwc=izlc9mHr637UR4lpLEZLFFS3Vn2UXBrZ4tFb6oOnmz8r=ennQJ
q47pNnObsDh-88a9YUrUulcYQoV8giPASqXB84m=dw_fNvxBZ1DixNDGBTXRZBKn36QFyH-9
WMY_2Z07ulAs=cSKekTNmnB0g54h6-FaF-zOL46UZC_1_LdKK3p9Q0aAe=
 The second one:
https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_apache_sp
ark_blob_master_core_src_main_scala_org_apache_spark_executor_Executor.sc
ala-23L226d=AwIFAwc=izlc9mHr637UR4lpLEZLFFS3Vn2UXBrZ4tFb6oOnmz8r=ennQJ
q47pNnObsDh-88a9YUrUulcYQoV8giPASqXB84m=dw_fNvxBZ1DixNDGBTXRZBKn36QFyH-9
WMY_2Z07ulAs=PFoz0HyINd2XuiqkHPgyMsOh9eFkCwXOdl9zdxfBwxMe=

 This serializes the value, which is the result of task run twice,
which affects things like collect(), takeSample(), and
toLocalIterator(). Would it make sense to simply serialize the
DirectTaskResult once using the regular serializer (as opposed to
closure serializer)? Would it cause problems when the Accumulator values
are not Kryo-serializable?

 Alternatively, if we can assume that Accumator values are small, we can
closure-serialize those, put the serialized byte array in
DirectTaskResult with the raw task result value, and serialize
DirectTaskResult.

 What do people think?

 Thanks,
 Mingyu


-
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org