viirya commented on code in PR #56387:
URL: https://github.com/apache/spark/pull/56387#discussion_r3461369973
##########
core/src/main/scala/org/apache/spark/internal/config/package.scala:
##########
@@ -1781,6 +1781,20 @@ package object config {
.intConf
.createWithDefault(8)
+ //
---------------------------------------------------------------------------
+ // Streaming shuffle writer configs
+ //
---------------------------------------------------------------------------
+
+ private[spark] val STREAMING_SHUFFLE_CHECKSUM_ENABLED =
+ ConfigBuilder("spark.shuffle.streaming.checksum.enabled")
+ .doc("Whether to append a CRC32C checksum to each streaming shuffle data
buffer. " +
+ "When enabled, the writer computes the checksum and embeds it in the
DataMessage header; " +
+ "the reader recomputes and compares. A mismatch fails the task,
providing early " +
+ "detection of data corruption in transit.")
+ .version("4.0.0")
+ .booleanConf
Review Comment:
Two issues on this config:
1. **Missing `withBindingPolicy` — this is the current CI failure.**
`SparkConfigBindingPolicySuite` fails with: `The following configs do not have
bindingPolicy field set... spark.shuffle.streaming.checksum.enabled`. A
shuffle-transport config has nothing to do with view/UDF/procedure binding, so
add `.withBindingPolicy(ConfigBindingPolicy.NOT_APPLICABLE)` (matching the only
other config in this file that sets it).
2. **`version("4.0.0")` is wrong.** 4.0.0 is already released and this
config is brand new on master. The newest configs in this file use `4.2.0`, and
the next feature release is 4.3.0, so this should be `.version("4.3.0")`.
##########
common/network-common/src/main/java/org/apache/spark/network/client/TransportClient.java:
##########
@@ -301,8 +304,12 @@ public void onFailure(Throwable e) {
*
* @param message The message to send.
*/
- public void send(ByteBuffer message) {
- channel.writeAndFlush(new OneWayMessage(new NioManagedBuffer(message)));
+ public ChannelFuture send(ByteBuffer message) {
+ return channel.writeAndFlush(new OneWayMessage(new
NioManagedBuffer(message)));
+ }
+
+ public ChannelFuture send(ByteBuf message) {
Review Comment:
This overload needs a Javadoc (the `send(ByteBuffer)` above it has one), and
in particular should document the ownership contract. I traced the refcount
path: the overload takes ownership of exactly one reference of the passed
`ByteBuf` (wrapped in `NettyManagedBuffer`, released by
`MessageWithHeader.deallocate()`); the caller must not release it afterward.
Documenting that hand-off will keep the follow-up writer/reader callers from
double-releasing or leaking.
##########
common/network-common/src/main/java/org/apache/spark/network/client/TransportClient.java:
##########
@@ -301,8 +304,12 @@ public void onFailure(Throwable e) {
*
* @param message The message to send.
*/
- public void send(ByteBuffer message) {
- channel.writeAndFlush(new OneWayMessage(new NioManagedBuffer(message)));
+ public ChannelFuture send(ByteBuffer message) {
Review Comment:
Widening this from `void` to `ChannelFuture` is source-compatible but **not
binary-compatible**: the JVM method descriptor includes the return type, so
code compiled against the old `()V` signature would hit `NoSuchMethodError`
against the new jar.
To be clear, this is **not** what's making the build red — I checked, the
`network-common`/`core` module compiled and passed, and no `NoSuchMethodError`
appears in any failure annotation (that's the config binding-policy issue). And
`TransportClient` is internal, unannotated network plumbing recompiled in-tree,
so the practical risk is low. Raising it only so it's a conscious decision: if
you'd rather be conservative, you could keep `void send(ByteBuffer)` and add a
separate `sendAndGetFuture(ByteBuffer)`, or only have the new `send(ByteBuf)`
return the future. Your call — not a blocker from my side.
##########
core/src/main/scala/org/apache/spark/shuffle/streaming/ErrorNotifier.scala:
##########
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.shuffle.streaming
+
+import java.util.concurrent.atomic.AtomicReference
+
+import org.apache.spark.internal.Logging
+
+/**
+ * Class to notify of any errors that might have occurred out of band.
+ *
+ * Background threads (Netty event loops, task discovery, client creation)
record errors here
+ * via [[markError]] rather than calling `TaskContext.markTaskFailed`
directly. The owning task
+ * thread polls [[throwErrorIfExists]] at safe points and re-throws on its own
thread. This avoids
+ * a race where invoking `markTaskFailed` from a background thread can cause
the task thread's
+ * subsequent `markTaskCompleted` to silently skip completion listeners
(including cleanup),
+ * leading to thread leaks.
+ */
+private[spark] class ErrorNotifier extends Logging {
Review Comment:
This duplicates an existing class:
`sql/core/.../streaming/runtime/ErrorNotifier.scala` is logically identical
(same `markError`/`getError`/`throwErrorIfExists`, same suppressed-attach).
This copy differs only in package, `private[spark]`, a richer scaladoc, and the
`log"..."` interpolator.
I understand the module layering forces a copy here (`core` can't see
`sql/core`). But rather than maintain two byte-for-byte implementations that
will drift, consider moving the canonical `ErrorNotifier` down into `core` and
re-pointing the ~6 `sql/core` references at it (they're all simple imports). If
there's a reason to keep them separate, a comment saying so would help.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]