[GitHub] spark issue #22320: [SPARK-25313][SQL]Fix regression in FileFormatWriter out...
Github user JoshRosen commented on the issue: https://github.com/apache/spark/pull/22320 (This is a test comment to test a GitHub Integration; please ignore) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21559: [SPARK-24525][SS] Provide an option to limit number of r...
Github user JoshRosen commented on the issue: https://github.com/apache/spark/pull/21559 jenkins add to whitelist --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21481: [SPARK-24452][SQL][Core] Avoid possible overflow in int ...
Github user JoshRosen commented on the issue: https://github.com/apache/spark/pull/21481 Let's merge this as-is and do the build improvements in a separate PR. That's important because we may want to backport the overflow fix to maintenance branches and may want to do so independent of the build changes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21346: [SPARK-6237][NETWORK] Network-layer changes to al...
Github user JoshRosen commented on a diff in the pull request: https://github.com/apache/spark/pull/21346#discussion_r192566116 --- Diff: common/network-common/src/main/java/org/apache/spark/network/client/TransportClient.java --- @@ -141,26 +141,14 @@ public void fetchChunk( StreamChunkId streamChunkId = new StreamChunkId(streamId, chunkIndex); handler.addFetchRequest(streamChunkId, callback); -channel.writeAndFlush(new ChunkFetchRequest(streamChunkId)).addListener(future -> { --- End diff -- Thanks for explaining. I guess the re-ordering of `channel.close()` and the `handler` operations is safe because the handler doesn't hold references to the channel / otherwise does not interact with it (and doesn't hold references to objects tied to channel lifecycle (like buffers))? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21346: [SPARK-6237][NETWORK] Network-layer changes to al...
Github user JoshRosen commented on a diff in the pull request: https://github.com/apache/spark/pull/21346#discussion_r192565980 --- Diff: common/network-common/src/main/java/org/apache/spark/network/server/RpcHandler.java --- @@ -38,15 +38,24 @@ * * This method will not be called in parallel for a single TransportClient (i.e., channel). * + * The rpc *might* included a data stream in streamData (eg. for uploading a large + * amount of data which should not be buffered in memory here). Any errors while handling the + * streamData will lead to failing this entire connection -- all other in-flight rpcs will fail. --- End diff -- I'm trying to think through whether we'll risk introducing any weird new failure modes (or increasing the occurrence of existing-but-improbable failure modes). For example, causing in-flight RPCs to fail could surface latent RPC timeout issues: if we have a timeout which is way too long and we drop in-flight responses on the floor without sending back negative ACKs then we could see (finite but potentially long) hangs. On the other hand, this pathway is used for executor <-> executor transfers and generally not executor <-> driver transfers, so my understanding is that failures in this channel generally won't impact control RPCs. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21346: [SPARK-6237][NETWORK] Network-layer changes to al...
Github user JoshRosen commented on a diff in the pull request: https://github.com/apache/spark/pull/21346#discussion_r192565530 --- Diff: common/network-common/src/main/java/org/apache/spark/network/protocol/UploadStream.java --- @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.network.protocol; + +import java.io.IOException; +import java.nio.ByteBuffer; + +import com.google.common.base.Objects; +import io.netty.buffer.ByteBuf; + +import org.apache.spark.network.buffer.ManagedBuffer; +import org.apache.spark.network.buffer.NettyManagedBuffer; + +/** + * An RPC with data that is sent outside of the frame, so it can be read as a stream. + */ +public final class UploadStream extends AbstractMessage implements RequestMessage { + /** Used to link an RPC request with its response. */ + public final long requestId; + public final ManagedBuffer meta; + public final long bodyByteCount; + + public UploadStream(long requestId, ManagedBuffer meta, ManagedBuffer body) { +super(body, false); // body is *not* included in the frame +this.requestId = requestId; +this.meta = meta; +bodyByteCount = body.size(); + } + + // this version is called when decoding the bytes on the receiving end. The body is handled + // separately. + private UploadStream(long requestId, ManagedBuffer meta, long bodyByteCount) { +super(null, false); +this.requestId = requestId; +this.meta = meta; +this.bodyByteCount = bodyByteCount; + } + + @Override + public Type type() { return Type.UploadStream; } + + @Override + public int encodedLength() { +// the requestId, meta size, meta and bodyByteCount (body is not included) +return 8 + 4 + ((int) meta.size()) + 8; + } + + @Override + public void encode(ByteBuf buf) { +buf.writeLong(requestId); +try { + ByteBuffer metaBuf = meta.nioByteBuffer(); + buf.writeInt(metaBuf.remaining()); + buf.writeBytes(metaBuf); +} catch (IOException io) { + throw new RuntimeException(io); +} +buf.writeLong(bodyByteCount); + } + + public static UploadStream decode(ByteBuf buf) { +long requestId = buf.readLong(); +int metaSize = buf.readInt(); +ManagedBuffer meta = new NettyManagedBuffer(buf.readRetainedSlice(metaSize)); +long bodyByteCount = buf.readLong(); +// This is called by the frame decoder, so the data is still null. We need a StreamInterceptor +// to read the data. +return new UploadStream(requestId, meta, bodyByteCount); + } + + @Override + public int hashCode() { +return Objects.hashCode(requestId, body()); + } + + @Override + public boolean equals(Object other) { +if (other instanceof UploadStream) { + UploadStream o = (UploadStream) other; + return requestId == o.requestId && super.equals(o); +} +return false; + } + + @Override + public String toString() { +return Objects.toStringHelper(this) + .add("requestId", requestId) + .add("body", body()) --- End diff -- I'm not actually sure. I wonder if this is a latent problem in the old code waiting to happen in case we turn on trace logging. We can probably investigate that separately, but just wanted to note it since it seemed a little dodgy. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21481: [SPARK-24452][SQL][Core] Avoid possible overflow in int ...
Github user JoshRosen commented on the issue: https://github.com/apache/spark/pull/21481 Hey @kiszk, thanks for tracking this down. This change looks good to me. I have a couple of questions, mostly aimed towards figuring out how we can categorically solve this problem: 1. What's the impact of this issue? Have you observed actual crashes or silent data corruption caused by this problem? I ask because it looks like this could be a plausible cause of a data corruption bug that I've been investigating. 2. Are these five files the only occurrence of this problem or are there potentially others? I've noticed that all of the files modified here are `.java` files: is that a coincidence or is that the result of running some Java linting tool? If the latter, is it possible that we have similar issues in other files which we haven't found yet? 3. Do you have any ideas for how we can categorically prevent this class of problem in the future? Are there compiler plugins or linters which can warn on these cases and turn them into compile-time errors? Or code-review checklists that we can employ to more easily spot these potential overflows? This is a hard problem, but I think it's worth brainstorming on categorical solutions since this isn't the first time we've hit this class of problem (but hopefully it can be the last!). I think we should definitely backport this fix, at least to 2.3 and possibly earlier. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21390: [SPARK-24340][Core] Clean up non-shuffle disk block mana...
Github user JoshRosen commented on the issue: https://github.com/apache/spark/pull/21390 Feel free to do the TTL in a followup. My feeling is that it won't be super useful in practice, though: 1. Cleanup of non-shuffle disk block manager files following executor exit only really matters for super-long-running applications. For short-running applications, you can just remove the entire application directory via the existing TTL cleaner mechanism. 2. If production jobs would fail with this change due to user code relying on undocumented internal behavior then I think the right solution is to disable this cleanup completely vs. putting it on a TTL. We've tried TTL-based cleanup before in the predecessor to the ContextCleaner and it was a huge source of user issues / JIRA tickets in cases where the cleanup was happening too soon (but not immediately, e.g. a 20 minute delay). 3. If you want this feature only for debugging (e.g. manual inspection of the contents of spill files) then I again image that you probably want an infinite timeout. Let's say I have a hard-to-reproduce production failure and I'd like to debug from the production repro by looking at spill files. In that case, the problem could occur at any hour, possibly when I'm asleep, so if I want the files to stick around long enough for a human to look at them then that could be several hours (possibly days in case we're running something over a weekend) and I feel like at a certain point a large timeout might as well become infinite. Feel free to push back if you have a concrete use case where TTL-based cleanup of this specific file category is preferable to the binary on/off option implemented here. I'm just worried that it will be a lot of additional work to implement and will be harder to reason about (while offering relatively little additional marginal benefit compared to the simple "right after executor exit" approach). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21346: [SPARK-6237][NETWORK] Network-layer changes to allow str...
Github user JoshRosen commented on the issue: https://github.com/apache/spark/pull/21346 Summary of key changes (WIP; notes to self): > Summary of changes: > > - Introduce a new `UploadStream` RPC which is sent to push a large payload as a stream (in contrast, the pre-existing `StreamRequest` and `StreamResponse` RPCs are used for pull-based streaming). > - Generalize `RpcHandler.receive()` to support requests which contain streams. > - Generalize `StreamInterceptor` to handle both request and response messages (previously it only handled responses). > - Introduce `StdChannelListener` to abstract away common logging logic in `ChannelFuture` listeners. Question: is this effectively dead code at this point? In other words, this PR just adds the lower-level pieces but there's nothing currently using the new API? So this patch as of now has no behavior change and actual functional changes impacting queries / actual usage will come later when we wire this up to the block replicator? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21346: [SPARK-6237][NETWORK] Network-layer changes to al...
Github user JoshRosen commented on a diff in the pull request: https://github.com/apache/spark/pull/21346#discussion_r191964329 --- Diff: project/MimaExcludes.scala --- @@ -36,6 +36,9 @@ object MimaExcludes { // Exclude rules for 2.4.x lazy val v24excludes = v23excludes ++ Seq( +// [SPARK-6237][NETWORK] Network-layer changes to allow stream upload + ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.network.netty.NettyBlockRpcServer.receive"), --- End diff -- I suspect that it's because we might want to access these across Java package boundaries and Java doesn't have the equivalent of Scala's nested package scoped `private[package]`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21346: [SPARK-6237][NETWORK] Network-layer changes to al...
Github user JoshRosen commented on a diff in the pull request: https://github.com/apache/spark/pull/21346#discussion_r191941962 --- Diff: common/network-common/src/main/java/org/apache/spark/network/server/StreamData.java --- @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.network.server; + +import java.io.IOException; +import java.nio.ByteBuffer; + +import org.apache.spark.network.client.RpcResponseCallback; +import org.apache.spark.network.client.StreamCallback; +import org.apache.spark.network.client.StreamInterceptor; +import org.apache.spark.network.util.TransportFrameDecoder; + +/** + * A holder for streamed data sent along with an RPC message. + */ +public class StreamData { + + private final TransportRequestHandler handler; + private final TransportFrameDecoder frameDecoder; + private final RpcResponseCallback rpcCallback; + private final ByteBuffer meta; --- End diff -- It looks like this field is not actually used in the current implementation. Is that intentional? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21346: [SPARK-6237][NETWORK] Network-layer changes to al...
Github user JoshRosen commented on a diff in the pull request: https://github.com/apache/spark/pull/21346#discussion_r191941503 --- Diff: common/network-common/src/main/java/org/apache/spark/network/server/RpcHandler.java --- @@ -38,15 +38,24 @@ * * This method will not be called in parallel for a single TransportClient (i.e., channel). * + * The rpc *might* included a data stream in streamData (eg. for uploading a large + * amount of data which should not be buffered in memory here). Any errors while handling the + * streamData will lead to failing this entire connection -- all other in-flight rpcs will fail. --- End diff -- Perhaps naive question: what are the implications of this? Is this referring to a scenario where we've multiplexed multiple asynchronous requests / responses over a single network connection? I think I understand _why_ the failure mode is as stated (we're worried about leaving non-consumed leftover data in the channel) but I just wanted to ask about the implications of failing other in-flight RPCs. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21346: [SPARK-6237][NETWORK] Network-layer changes to al...
Github user JoshRosen commented on a diff in the pull request: https://github.com/apache/spark/pull/21346#discussion_r191940304 --- Diff: common/network-common/src/main/java/org/apache/spark/network/protocol/UploadStream.java --- @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.network.protocol; + +import java.io.IOException; +import java.nio.ByteBuffer; + +import com.google.common.base.Objects; +import io.netty.buffer.ByteBuf; + +import org.apache.spark.network.buffer.ManagedBuffer; +import org.apache.spark.network.buffer.NettyManagedBuffer; + +/** + * An RPC with data that is sent outside of the frame, so it can be read as a stream. + */ +public final class UploadStream extends AbstractMessage implements RequestMessage { + /** Used to link an RPC request with its response. */ + public final long requestId; + public final ManagedBuffer meta; + public final long bodyByteCount; + + public UploadStream(long requestId, ManagedBuffer meta, ManagedBuffer body) { +super(body, false); // body is *not* included in the frame +this.requestId = requestId; +this.meta = meta; +bodyByteCount = body.size(); + } + + // this version is called when decoding the bytes on the receiving end. The body is handled + // separately. + private UploadStream(long requestId, ManagedBuffer meta, long bodyByteCount) { +super(null, false); +this.requestId = requestId; +this.meta = meta; +this.bodyByteCount = bodyByteCount; + } + + @Override + public Type type() { return Type.UploadStream; } + + @Override + public int encodedLength() { +// the requestId, meta size, meta and bodyByteCount (body is not included) +return 8 + 4 + ((int) meta.size()) + 8; + } + + @Override + public void encode(ByteBuf buf) { +buf.writeLong(requestId); +try { + ByteBuffer metaBuf = meta.nioByteBuffer(); + buf.writeInt(metaBuf.remaining()); + buf.writeBytes(metaBuf); +} catch (IOException io) { + throw new RuntimeException(io); +} +buf.writeLong(bodyByteCount); + } + + public static UploadStream decode(ByteBuf buf) { +long requestId = buf.readLong(); +int metaSize = buf.readInt(); +ManagedBuffer meta = new NettyManagedBuffer(buf.readRetainedSlice(metaSize)); +long bodyByteCount = buf.readLong(); +// This is called by the frame decoder, so the data is still null. We need a StreamInterceptor +// to read the data. +return new UploadStream(requestId, meta, bodyByteCount); + } + + @Override + public int hashCode() { +return Objects.hashCode(requestId, body()); + } + + @Override + public boolean equals(Object other) { +if (other instanceof UploadStream) { + UploadStream o = (UploadStream) other; + return requestId == o.requestId && super.equals(o); +} +return false; + } + + @Override + public String toString() { +return Objects.toStringHelper(this) + .add("requestId", requestId) + .add("body", body()) --- End diff -- Similar question here about whether `body()` is useful in this context: will this actually end up printing buffer contents, which are potentially huge? Or will it do something reasonable and print only the buffer type? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21346: [SPARK-6237][NETWORK] Network-layer changes to al...
Github user JoshRosen commented on a diff in the pull request: https://github.com/apache/spark/pull/21346#discussion_r191939431 --- Diff: common/network-common/src/main/java/org/apache/spark/network/protocol/UploadStream.java --- @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.network.protocol; + +import java.io.IOException; +import java.nio.ByteBuffer; + +import com.google.common.base.Objects; +import io.netty.buffer.ByteBuf; + +import org.apache.spark.network.buffer.ManagedBuffer; +import org.apache.spark.network.buffer.NettyManagedBuffer; + +/** + * An RPC with data that is sent outside of the frame, so it can be read as a stream. + */ +public final class UploadStream extends AbstractMessage implements RequestMessage { + /** Used to link an RPC request with its response. */ + public final long requestId; + public final ManagedBuffer meta; + public final long bodyByteCount; + + public UploadStream(long requestId, ManagedBuffer meta, ManagedBuffer body) { +super(body, false); // body is *not* included in the frame +this.requestId = requestId; +this.meta = meta; +bodyByteCount = body.size(); + } + + // this version is called when decoding the bytes on the receiving end. The body is handled + // separately. + private UploadStream(long requestId, ManagedBuffer meta, long bodyByteCount) { +super(null, false); +this.requestId = requestId; +this.meta = meta; +this.bodyByteCount = bodyByteCount; + } + + @Override + public Type type() { return Type.UploadStream; } + + @Override + public int encodedLength() { +// the requestId, meta size, meta and bodyByteCount (body is not included) +return 8 + 4 + ((int) meta.size()) + 8; + } + + @Override + public void encode(ByteBuf buf) { +buf.writeLong(requestId); +try { + ByteBuffer metaBuf = meta.nioByteBuffer(); + buf.writeInt(metaBuf.remaining()); + buf.writeBytes(metaBuf); +} catch (IOException io) { + throw new RuntimeException(io); +} +buf.writeLong(bodyByteCount); + } + + public static UploadStream decode(ByteBuf buf) { +long requestId = buf.readLong(); +int metaSize = buf.readInt(); +ManagedBuffer meta = new NettyManagedBuffer(buf.readRetainedSlice(metaSize)); +long bodyByteCount = buf.readLong(); +// This is called by the frame decoder, so the data is still null. We need a StreamInterceptor +// to read the data. +return new UploadStream(requestId, meta, bodyByteCount); + } + + @Override + public int hashCode() { +return Objects.hashCode(requestId, body()); --- End diff -- The `equals()` and `hashCode()` implementations of this `UploadStream` class appear to differ slightly: the `equals()` method only checks equality of the `requestIds`, whereas this hashCode is checking both the `requestId` and the `body()`. I'm not sure what a `ManagedBuffer`'s `hashCode()` is: the `hashCode()` might not depend on the buffer contents, in which case this could lead to false hashCode mismatches for equal requests. Should we use just `requestId` here instead? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21346: [SPARK-6237][NETWORK] Network-layer changes to al...
Github user JoshRosen commented on a diff in the pull request: https://github.com/apache/spark/pull/21346#discussion_r191938203 --- Diff: common/network-common/src/main/java/org/apache/spark/network/client/TransportClient.java --- @@ -141,26 +141,14 @@ public void fetchChunk( StreamChunkId streamChunkId = new StreamChunkId(streamId, chunkIndex); handler.addFetchRequest(streamChunkId, callback); -channel.writeAndFlush(new ChunkFetchRequest(streamChunkId)).addListener(future -> { --- End diff -- Are the changes to these `.addListener()` calls primarily cleanup / refactoring? Is the intent to reduce the amount of _new_ duplicate code which would otherwise be added to `uploadStream` in this file? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21346: [SPARK-6237][NETWORK] Network-layer changes to al...
Github user JoshRosen commented on a diff in the pull request: https://github.com/apache/spark/pull/21346#discussion_r191935821 --- Diff: common/network-common/src/main/java/org/apache/spark/network/client/StreamInterceptor.java --- @@ -50,16 +52,22 @@ @Override public void exceptionCaught(Throwable cause) throws Exception { -handler.deactivateStream(); +deactivateStream(); callback.onFailure(streamId, cause); } @Override public void channelInactive() throws Exception { -handler.deactivateStream(); +deactivateStream(); callback.onFailure(streamId, new ClosedChannelException()); } + private void deactivateStream() { +if (handler instanceof TransportResponseHandler) { --- End diff -- Why don't we need to do this for `TransportRequestHandler`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21346: [SPARK-6237][NETWORK] Network-layer changes to allow str...
Github user JoshRosen commented on the issue: https://github.com/apache/spark/pull/21346 I'm going to be starting a more detailed review pass on this now and will be getting caught back up with the discussion that's happened so far. One high-level point I'd like to keep in mind: what are the major risks of this change in terms of introducing performance or correctness issues? If we identify risks (e.g. "this is a historically tricky area of code?"), can we mitigate those risks through correctness testing / load testing? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21437: [SPARK-24397][PYSPARK] Added TaskContext.getLocal...
Github user JoshRosen commented on a diff in the pull request: https://github.com/apache/spark/pull/21437#discussion_r191881226 --- Diff: python/pyspark/taskcontext.py --- @@ -88,3 +89,9 @@ def taskAttemptId(self): TaskAttemptID. """ return self._taskAttemptId + +def getLocalProperty(self, key): +""" +Get a local property set upstream in the driver, or None if it is missing. --- End diff -- FWIW we can always evolve what you have here towards the `getLocalProperty(key, default=None)` case; that evolution would maintain behavior and source compatibility. Therefore maybe it's fine to defer that until later if we're not sure whether that variant is useful. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21437: [SPARK-24397][PYSPARK] Added TaskContext.getLocal...
Github user JoshRosen commented on a diff in the pull request: https://github.com/apache/spark/pull/21437#discussion_r191877900 --- Diff: python/pyspark/taskcontext.py --- @@ -88,3 +89,9 @@ def taskAttemptId(self): TaskAttemptID. """ return self._taskAttemptId + +def getLocalProperty(self, key): +""" +Get a local property set upstream in the driver, or None if it is missing. --- End diff -- The Java / Scala equivalents of this API return `null` for missing keys, so on the one hand returning `None` is kinda consistent with that. On the other hand, consider a case where you want to specify an alternative in case a key is not set: With this API, you might think of doing something like `tc.getLocalProperty('key') or 'defaultValue'`, which potentially could be a problem in case a non-None key could have a `False`-y value. I suppose we're only dealing with strings here, though, and that'd only happen for empty strings. If we allowed non-strings to be returned here, though, then we'd have problems if we're returning values like `0`. For that case, having a `getLocalProperty('key', 'defaultValue')` is a bit more useful. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21390: [SPARK-24340][Core] Clean up non-shuffle disk block mana...
Github user JoshRosen commented on the issue: https://github.com/apache/spark/pull/21390 Yeah, this is only concerned with non-shuffle files which are located in the block manager temp directories (e.g. large sorter spill files). There is a related issue where shuffle files can be leaked indefinitely following executor death because the external shuffle service is never directly told that shuffles are safe to remove (the context cleaner sends RPCs to executors and executors clean up their own shuffle files). That issue is substantially harder to fix, though, since it likely requires protocol changes to the shuffle service or an inversion-of-control where the shuffle service can periodically ask the driver "do any of these shuffle IDs correspond to cleaned shuffles?". As a result, I think the strategy here is to decompose that disk leak into two separate sets of fixes, where this patch is concerned with the simpler case of non-shuffle files (we'll defer the more complex case to a separate PR because it requires a lot more design). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21390: [SPARK-24340][Core] Clean up non-shuffle disk blo...
Github user JoshRosen commented on a diff in the pull request: https://github.com/apache/spark/pull/21390#discussion_r190705466 --- Diff: core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala --- @@ -97,6 +99,10 @@ private[deploy] class Worker( private val APP_DATA_RETENTION_SECONDS = conf.getLong("spark.worker.cleanup.appDataTtl", 7 * 24 * 3600) + // Whether or not cleanup the non-shuffle files on executor death. + private val CLEANUP_NON_SHUFFLE_FILES_ENABLED = +conf.getBoolean("spark.storage.cleanupFilesAfterExecutorDeath", true) --- End diff -- @jerryshao, that's a good point; I prefer your suggested `*Exit` name. If executors exit in a healthy / controlled fashion then we don't expect non-shuffle files to be leaked, but the cleanup logic still will be run. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21383: [SPARK-23754][Python] Re-raising StopIteration in...
Github user JoshRosen commented on a diff in the pull request: https://github.com/apache/spark/pull/21383#discussion_r190438434 --- Diff: python/pyspark/shuffle.py --- @@ -67,6 +67,19 @@ def get_used_memory(): return 0 +def safe_iter(f): +""" wraps f to make it safe (= does not lead to data loss) to use inside a for loop +make StopIteration's raised inside f explicit +""" +def wrapper(*args, **kwargs): +try: +return f(*args, **kwargs) +except StopIteration as exc: +raise RuntimeError('StopIteration in client code', exc) --- End diff -- What about `Caught StopIteration thrown from user's code; failing the task` in order to make it clear that it's expected that the exception bubbles and fails the task? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21342: [SPARK-24294] Throw SparkException when OOM in Broadcast...
Github user JoshRosen commented on the issue: https://github.com/apache/spark/pull/21342 Updated changes LGTM. Thanks for working on this! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21390: [SPARK-24340][Core] Clean up non-shuffle disk blo...
Github user JoshRosen commented on a diff in the pull request: https://github.com/apache/spark/pull/21390#discussion_r190105740 --- Diff: core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala --- @@ -97,6 +97,10 @@ private[deploy] class Worker( private val APP_DATA_RETENTION_SECONDS = conf.getLong("spark.worker.cleanup.appDataTtl", 7 * 24 * 3600) + // Whether or not cleanup the non-shuffle files on executor finishes. + private val CLEANUP_NON_SHUFFLE_FILES_ENABLED = +conf.getBoolean("spark.worker.cleanup.nonShuffleFiles.enabled", true) --- End diff -- I think these configurations might actually be independent: - `spark.worker.cleanup.enabled` controls whether directories from finished applications (i.e. terminated drivers) are cleaned up is a decision which can impact users' ability to debug failed applications. In environments where clusters / underlying physical workers are ephemeral this configuration tends to matter a little less, but in environments with long-lived machines which host many Spark drivers over a long period of time (e.g. on prem) this configuration can matter. For example, I think that setting `spark.worker.cleanup.enabled = true` could mean that completely applications' executor logs get purged from the underlying host machine after a time period. - The new configuration being proposed here generally matters only _within a long-running Spark application_ and is kind of orthogonal to the choice to keep or discard failed applications' executor directories. I think that we're only removing files within block managers' nested folders, right? So there should be no user-controlled / user-generated files here, at least in theory. Therefore the impact of this configuration on ordinary users' ability to debug is likely to be small (I think you'd only ever really want access to the files that we're deleting if you're going to hex-dump files to debug corruption issues which trigger segfaults, an unlikely scenario). Given the kind of different motivations and lifecycles for these cleanup mechanisms and the motivations for different choices of defaults, I'm thinking that it might make sense to rename the new configuration so that we don't confuse users via similar names. What do you think about something like `spark.storage.removeNonShuffleBlockManagerFilesAfterExecutorDeath`? Really verbose, but a precise description of what's going on. I'm open to better name suggestions (since I tend to pick overly-complicated ones); this is just a conversation-starting strawman. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21390: [SPARK-24340][Core] Clean up non-shuffle disk blo...
Github user JoshRosen commented on a diff in the pull request: https://github.com/apache/spark/pull/21390#discussion_r190104118 --- Diff: core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala --- @@ -732,6 +736,9 @@ private[deploy] class Worker( trimFinishedExecutorsIfNecessary() coresUsed -= executor.cores memoryUsed -= executor.memory + if (CLEANUP_NON_SHUFFLE_FILES_ENABLED) { + shuffleService.executorRemoved(executorStateChanged.execId.toString, appId) --- End diff -- Nah, no need to rename since this matches existing convention. I was just thinking aloud here / explaining the reasoning for other future readers. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21383: [SPARK-23754][Python] Re-raising StopIteration in...
Github user JoshRosen commented on a diff in the pull request: https://github.com/apache/spark/pull/21383#discussion_r189967162 --- Diff: python/pyspark/shuffle.py --- @@ -67,6 +67,19 @@ def get_used_memory(): return 0 +def safe_iter(f): +""" wraps f to make it safe (= does not lead to data loss) to use inside a for loop +make StopIteration's raised inside f explicit +""" +def wrapper(*args, **kwargs): +try: +return f(*args, **kwargs) +except StopIteration as exc: +raise RuntimeError('StopIteration in client code', exc) --- End diff -- What about `user code`? That's a bit clearer, making it clear that the exception originates in code that was not part of Spark itself but, instead, was written by a user of the Spark framework? Words like `client` and `consumer` could be confusing because both are used in other more precise technical clients within our codebase (e.g. client deploy mode, streaming consumer). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21311: [SPARK-24257][SQL]LongToUnsafeRowMap calculate the new s...
Github user JoshRosen commented on the issue: https://github.com/apache/spark/pull/21311 @cxzl25, to clarify: > Some data is lost and the data read out is dirty To clarify, is this a potential cause of a wrong-answer correctness bug? If so, we should be sure to backport the resulting fix to maintenance branches. /cc @cloud-fan @gatorsmile --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21383: [SPARK-23754][Python] Re-raising StopIteration in client...
Github user JoshRosen commented on the issue: https://github.com/apache/spark/pull/21383 jenkins this is ok to test --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21383: [SPARK-23754][Python] Re-raising StopIteration in...
Github user JoshRosen commented on a diff in the pull request: https://github.com/apache/spark/pull/21383#discussion_r189818845 --- Diff: python/pyspark/shuffle.py --- @@ -67,6 +67,19 @@ def get_used_memory(): return 0 +def safe_iter(f): +""" wraps f to make it safe (= does not lead to data loss) to use inside a for loop --- End diff -- It sounds like this is a potential correctness issue, so the eventual fix for this should be backported to maintenance releases (at least the most recent ones and the next 2.3.x). I saw the examples provided on the linked JIRAs, but do you have an example of a realistic user workload where this problem can occur (i.e. a case where the problem is more subtle than explicitly throwing `StopIteration()`)? Would that be something like calling `next()` past the end of an iterator (which I suppose could occur deep in library code)? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21390: [SPARK-24340][Core] Clean up non-shuffle disk blo...
Github user JoshRosen commented on a diff in the pull request: https://github.com/apache/spark/pull/21390#discussion_r189816085 --- Diff: common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/NonShuffleFilesCleanupSuite.java --- @@ -0,0 +1,222 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.network.shuffle; + +import java.io.File; +import java.io.FilenameFilter; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.Random; +import java.util.concurrent.Executor; +import java.util.concurrent.atomic.AtomicBoolean; + +import com.google.common.util.concurrent.MoreExecutors; +import org.junit.Test; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import org.apache.spark.network.util.MapConfigProvider; +import org.apache.spark.network.util.TransportConf; + +public class NonShuffleFilesCleanupSuite { --- End diff -- Do we need a test to check that we preserve the old behavior in case the new configuration is set to `false`? An end-to-end test will likely be prone to flakiness, so instead maybe we could somehow test that `shuffleService. executorRemoved()` is _not_ called if the configuration is `false`. One way to do that would be to move the construction of `new ExternalShuffleService` from the default constructor of `Worker` into its public constructor and then inject it in the `new Worker` call. This, in turn, would let you inject either a mock or spy in order to verify call counts. Do you know if we have this style of test for other `Worker` functionality? Is this a ton of work or is it relatively simple to do? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21390: [SPARK-24340][Core] Clean up non-shuffle disk blo...
Github user JoshRosen commented on a diff in the pull request: https://github.com/apache/spark/pull/21390#discussion_r189813626 --- Diff: core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala --- @@ -97,6 +97,10 @@ private[deploy] class Worker( private val APP_DATA_RETENTION_SECONDS = conf.getLong("spark.worker.cleanup.appDataTtl", 7 * 24 * 3600) + // Whether or not cleanup the non-shuffle files on executor finishes. + private val CLEANUP_NON_SHUFFLE_FILES_ENABLED = +conf.getBoolean("spark.worker.cleanup.nonShuffleFiles.enabled", true) --- End diff -- Is there potential confusion from the fact that `spark.worker.cleanup.nonShuffleFiles.enabled`'s effects are not controlled by `spark.worker.cleanup.enabled`? Should they be? The `spark.worker.cleanup.enabled` configuration only seems to be dealing with the cleanup of completed applications' directories, whereas this is dealing with cleanup from completed executors whose application continues running (likely with new executors). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21390: [SPARK-24340][Core] Clean up non-shuffle disk blo...
Github user JoshRosen commented on a diff in the pull request: https://github.com/apache/spark/pull/21390#discussion_r189813156 --- Diff: core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala --- @@ -732,6 +736,9 @@ private[deploy] class Worker( trimFinishedExecutorsIfNecessary() coresUsed -= executor.cores memoryUsed -= executor.memory + if (CLEANUP_NON_SHUFFLE_FILES_ENABLED) { + shuffleService.executorRemoved(executorStateChanged.execId.toString, appId) --- End diff -- Re-reading this code with fresh eyes made me realize that maybe the method naming here is a bit confusing: from the name alone, it sounds like `executorRemoved` is a notification-sending method, the kind of thing where you'd always want to call it when the corresponding event (executor removal) occurs and then leave it up to the method implementation to decide whether to actually carry out specific cleanup logic. On the other hand, we call `shuffleService.applicationRemoved()` in `maybeCleanupApplication()` and there's a flag in that method controlling whether we actually clean up, so we have a similar pattern there. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21390: [SPARK-24340][Core] Clean up non-shuffle disk blo...
Github user JoshRosen commented on a diff in the pull request: https://github.com/apache/spark/pull/21390#discussion_r189809797 --- Diff: core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala --- @@ -97,6 +97,10 @@ private[deploy] class Worker( private val APP_DATA_RETENTION_SECONDS = conf.getLong("spark.worker.cleanup.appDataTtl", 7 * 24 * 3600) + // Whether or not cleanup the non-shuffle files on executor finishes. + private val CLEANUP_NON_SHUFFLE_FILES_ENABLED = +conf.getBoolean("spark.worker.cleanup.nonShuffleFiles.enabled", true) --- End diff -- Should we document this configuration at https://github.com/apache/spark/blob/master/docs/spark-standalone.md ? I can't imagine a great reason for users wanting to disable this and believe that this is functioning primarily as a feature-flag to give us an escape-hatch in case of regressions, so maybe it's not a huge deal to leave it undocumented? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21390: [SPARK-24340][Core] Clean up non-shuffle disk blo...
Github user JoshRosen commented on a diff in the pull request: https://github.com/apache/spark/pull/21390#discussion_r189809055 --- Diff: common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java --- @@ -226,6 +246,29 @@ private void deleteExecutorDirs(String[] dirs) { } } + private FilenameFilter filter = new FilenameFilter() { --- End diff -- Is it feasible to define this as a locale variable inside of `deleteNonShuffleFiles` in order to reduce its scope to be as small as possible and make it clear that this is only used within `deleteNonShuffleFiles`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21390: [SPARK-24340][Core] Clean up non-shuffle disk blo...
Github user JoshRosen commented on a diff in the pull request: https://github.com/apache/spark/pull/21390#discussion_r189808705 --- Diff: common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java --- @@ -211,6 +211,26 @@ public void applicationRemoved(String appId, boolean cleanupLocalDirs) { } } + /** + * Removes all the non-shuffle files in any local directories associated with the finished + * executor. + */ + public void executorRemoved(String executorId, String appId) { +logger.info("Clean up non-shuffle files associated with the finished executor {}", executorId); +AppExecId fullId = new AppExecId(appId, executorId); +final ExecutorShuffleInfo executor = executors.get(fullId); +if (executor == null) { + // Executor not registered, skip clean up of the local directories. + logger.info("Executor is not registered (appId={}, execId={})", appId, executorId); +} else { + logger.info("Cleaning up non-shuffle files in executor {}'s {} local dirs", fullId, + executor.localDirs.length); + + // Execute the actual deletion in a different thread, as it may take some time. --- End diff -- ð , thanks for doing this in a separate thread. This indeed could take a _very_ long time under certain rare conditions. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21390: [SPARK-24340][Core] Clean up non-shuffle disk block mana...
Github user JoshRosen commented on the issue: https://github.com/apache/spark/pull/21390 Context for other reviewers: the issue addressed by this patch is actually a real issue in practice, especially for long-lived Spark clusters; I have seen this specific problem play a large contributing role to certain production out-of-disk-space failures. One thing I'd like to note: as implemented here, this patch only addresses this problem for Spark's built-in "Standalone" cluster manager. @jiangxb1987, could you mention that limitation in the PR title and description? My personal preference is to proceed incrementally by merging this Standalone-only PR and and deferring support for other cluster managers to future PRs (perhaps from experts familiar with those other cluster managers). I'll take a more detailed look tomorrow, but just wanted to provide motivation for other reviewers who might leave comments before then. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21342: [SPARK-24294] Throw SparkException when OOM in Broadcast...
Github user JoshRosen commented on the issue: https://github.com/apache/spark/pull/21342 Thanks for the updates. The net change / scope of changes have been significantly reduced here, so I feel that this change is a lot less risky now. I left only one nitpicky comment at https://github.com/apache/spark/pull/21342/files#r189753488 worrying about potential future risks from people coming along and writing new code throwing `SparkFatalException` in a context where it can bubble up to the uncaught exception handler. If we want to be super defensive we could add some logic at https://github.com/apache/spark/blob/32447079e9d0fa9f7e180b94ecac19091b6af1ab/core/src/main/scala/org/apache/spark/util/SparkUncaughtExceptionHandler.scala#L42 to also catch a `SparkFatalException` on top of an OOM and treat that as an OOM. Debatable if we want to do that, but it's a great way of addressing @gatorsmile's comment at https://github.com/apache/spark/pull/21342#discussion_r189627101 and avoids future breakage. Otherwise, LGTM. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21342: [SPARK-24294] Throw SparkException when OOM in Br...
Github user JoshRosen commented on a diff in the pull request: https://github.com/apache/spark/pull/21342#discussion_r189754203 --- Diff: core/src/main/scala/org/apache/spark/util/SparkFatalException.scala --- @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.util + +/** + * SPARK-24294: To bypass scala bug: https://github.com/scala/bug/issues/9554, we catch + * fatal throwable in {@link scala.concurrent.Future}'s body, and re-throw + * SparkFatalException, which wraps the fatal throwable inside. + */ +private[spark] final class SparkFatalException(val throwable: Throwable) extends Exception --- End diff -- OTOH I guess we're actually only using this in one place right now, so I think things are correct as written, but I was just kind of abstractly worrying about potential future pitfalls in case people start using this pattern in new code without also noticing the `ThreadUtils.awayResult` requirement. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21342: [SPARK-24294] Throw SparkException when OOM in Br...
Github user JoshRosen commented on a diff in the pull request: https://github.com/apache/spark/pull/21342#discussion_r189754010 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala --- @@ -111,12 +112,18 @@ case class BroadcastExchangeExec( SQLMetrics.postDriverMetricUpdates(sparkContext, executionId, metrics.values.toSeq) broadcasted } catch { + // SPARK-24294: To bypass scala bug: https://github.com/scala/bug/issues/9554, we throw + // SparkFatalException, which is a subclass of Exception. ThreadUtils.awaitResult + // will catch this exception and re-throw the wrapped fatal throwable. case oe: OutOfMemoryError => -throw new OutOfMemoryError(s"Not enough memory to build and broadcast the table to " + +throw new SparkFatalException( + new OutOfMemoryError(s"Not enough memory to build and broadcast the table to " + --- End diff -- I agree that we're likely to have reclaimable space at this point, so the chance of a second OOM / failure here seems small. I'm pretty sure that the OutOfMemoryError being caught here often originates from Spark itself where we explicitly throw another `OutOfMemoryError` at a lower layer of the system, in which case we still actually have heap to allocate strings. We should investigate and clean up that practice, but let's do that in a separate PR. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21342: [SPARK-24294] Throw SparkException when OOM in Br...
Github user JoshRosen commented on a diff in the pull request: https://github.com/apache/spark/pull/21342#discussion_r189753564 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala --- @@ -111,12 +112,18 @@ case class BroadcastExchangeExec( SQLMetrics.postDriverMetricUpdates(sparkContext, executionId, metrics.values.toSeq) broadcasted } catch { + // SPARK-24294: To bypass scala bug: https://github.com/scala/bug/issues/9554, we throw + // SparkFatalException, which is a subclass of Exception. ThreadUtils.awaitResult + // will catch this exception and re-throw the wrapped fatal throwable. case oe: OutOfMemoryError => --- End diff -- Agreed; let's fix that separately. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21342: [SPARK-24294] Throw SparkException when OOM in Br...
Github user JoshRosen commented on a diff in the pull request: https://github.com/apache/spark/pull/21342#discussion_r189753488 --- Diff: core/src/main/scala/org/apache/spark/util/SparkFatalException.scala --- @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.util + +/** + * SPARK-24294: To bypass scala bug: https://github.com/scala/bug/issues/9554, we catch + * fatal throwable in {@link scala.concurrent.Future}'s body, and re-throw + * SparkFatalException, which wraps the fatal throwable inside. + */ +private[spark] final class SparkFatalException(val throwable: Throwable) extends Exception --- End diff -- Are there places where we fetch results from Futures without going through the `ThreadUtils.awaitResult`? In other words, is that a narrow waist? Would it make sense to add a second redundant layer of unwrapping at the top of `SparkUncaughtExceptionHandler` to handle that case? Not sure yet, but just thinking aloud here. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21329: [SPARK-24277][SQL] Code clean up in SQL module: HadoopMa...
Github user JoshRosen commented on the issue: https://github.com/apache/spark/pull/21329 I'd also like to note that commit protocols have historically been a very high risk area of the code, so I think we should have a much higher bar for explaining changes to that component. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21329: [SPARK-24277][SQL] Code clean up in SQL module: HadoopMa...
Github user JoshRosen commented on the issue: https://github.com/apache/spark/pull/21329 In general, I'm very wary of cleanup changes like this: unless we have a _need_ to do this (i.e. it causes negative side effects, breaks workloads, prevents specific concrete improvements, etc.) then the risk of changing longstanding old code outweighs any benefits of "cleanliness". In this specific case, I'm most concerned about the removal of those `jobContext` `Configuration` changes: they might appear superfluous but I'd bet that they were originally added for a reason which made sense at the time the original code was authored. If we're going to remove such old code, I'd like to see a written explanation of why the change is safe which explains the original context for adding that code. I chased through the git blame for the removed lines and the oldest version I found was https://github.com/apache/spark/commit/0595b6de8f1da04baceda082553c2aa1aa2cb006#diff-5c7d283b2f533b6f491dd1845dd86797R299, which is #5526: "[SPARK-3928] [SPARK-5182] [SQL] Partitioning support for the data sources API" by @liancheng. It looks that PR simply copied even older code from either `hiveWriterContainers.scala` or `HadoopRDD`. It looks like these properties actually _did_ have some effect for something at one point in time because https://github.com/apache/spark/pull/101 was fixing some sort of corner-case or bug in that code in the `HadoopRDD` case. If you go even further back, similar lines are present in a commit which predated our merge script (so it's an intermediate commit from @mateiz as part of some larger changeset): https://github.com/apache/spark/commit/0ccfe20755665aa4c347b82e18297c5b3a2284ee#diff-8382b1a276e6cbfae1efb3ee49c7e06eR144 Given this long history, I'd like to flag that change as potentially high-risk: it's not obvious to me that this code is unneeded and if we don't have a strong reason to change it then I'd prefer to leave it as it was before simply to help us manage / reduce risk. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21342: [SPARK-24294] Throw SparkException when OOM in Broadcast...
Github user JoshRosen commented on the issue: https://github.com/apache/spark/pull/21342 I'm also in favor of delaying for a couple of days for more detailed review because historically I think these types of changes have been high risk. The risk calculus might be a bit different if this was fixing a critical "wrong result" correctness bug, but it seems like this is a longstanding annoyance which causes either poor performance or visible job failures, so I don't see an extreme urgency to get this in immediately. Therefore let's take a bit of time to ponder things and sleep on it to be sure that we've thought through corner-cases (to be clear, I do think this patch is generally in a good direction). Some specifics: 1. The old code had some places which purposely caught `OutOfMemoryError` thrown from layers of the spilling code. I do not know whether the expected sources of OOMs were only the throw sites modified here or whether the intent was also to catch OOMs from allocating too big arrays, etc. The latter would have been a dodgy pattern and bad idea in the first place, but I just wanted to note this as a potential risk for unintended / implicit behavior changes. If we want to be super conservative about that we could update throw sites but keep catch sites and extend them to catch _both_ OOM cases. 2. Should we maybe throw an `OutOfMemoryError` subclass and then pattern-match on our subclass in a couple of specific places? That might help reduce change-surface. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21346: [SPARK-6237][NETWORK] Network-layer changes to allow str...
Github user JoshRosen commented on the issue: https://github.com/apache/spark/pull/21346 It's been a little while since I've thought about this issue, so I have a few clarifying questions to help me understand the high-level changes: 1. I recall that the problem with large shuffle blocks was that the OneForOneBlockFetcher strategy basically read the entire block as a single chunk, which becomes a problem for large blocks. I understand that we have now removed this limitation for shuffles by using a streaming transfer strategy only for large blocks (above some threshold). Is this patch conceptually doing the same thing for push-based communication where the action is initiated by a sender (e.g. to push a block for replication)? Does it also affect pull-based remote cache block reads or will that be handled separately? 2. Given that we already seem to have pull-based `openStream()` calls which can be initiated from the receive side, could we simplify things here by pushing a "this value is big, pull it" message and then have the remote end initiate a streaming read, similar to how DirectTaskResult and IndirectTaskResult work? 3. For remote reads of large cached blocks: is it true that this works today _only if_ the block is on disk but fails if the block is in memory? If certain size limit problems only occur when things are cached in memory, can we simplify anything if we add a requirement that blocks above 2GB can _only_ be cached on disk (regardless of storage level)? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21327: [SPARK-24107][CORE][followup] ChunkedByteBuffer.writeFul...
Github user JoshRosen commented on the issue: https://github.com/apache/spark/pull/21327 LGTM. Thanks for these changes; they really help to clarify this tricky piece of code for readers. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21175: [SPARK-24107][CORE] ChunkedByteBuffer.writeFully method ...
Github user JoshRosen commented on the issue: https://github.com/apache/spark/pull/21175 No, I mean that the code here can simply follow the write call as straight through code. We don't need to guard against exceptions here because the duplicate of the buffer is used only by a single thread, so you can omit the try block and just concatenate the try contents to the finally contents. Minor bit but I wanted to comment because I initially was confused about when errors could occur and thread safety / sharing until I realized that the modified state does not escape this method. On Mon, May 14, 2018 at 9:03 PM Wenchen Fan <notificati...@github.com> wrote: > *@cloud-fan* commented on this pull request. > -- > > In core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala > <https://github.com/apache/spark/pull/21175#discussion_r188160044>: > > > @@ -63,10 +63,15 @@ private[spark] class ChunkedByteBuffer(var chunks: Array[ByteBuffer]) { > */ >def writeFully(channel: WritableByteChannel): Unit = { > for (bytes <- getChunks()) { > - while (bytes.remaining() > 0) { > -val ioSize = Math.min(bytes.remaining(), bufferWriteChunkSize) > -bytes.limit(bytes.position() + ioSize) > -channel.write(bytes) > + val curChunkLimit = bytes.limit() > + while (bytes.hasRemaining) { > +try { > + val ioSize = Math.min(bytes.remaining(), bufferWriteChunkSize) > + bytes.limit(bytes.position() + ioSize) > + channel.write(bytes) > +} finally { > > Do you mean this is not a real bug that can cause real workload to fail? > > â > You are receiving this because you commented. > Reply to this email directly, view it on GitHub > <https://github.com/apache/spark/pull/21175#discussion_r188160044>, or mute > the thread > <https://github.com/notifications/unsubscribe-auth/AADGPJvZNC5LYjHl2WZ44YEIBVGLrehEks5tylODgaJpZM4TptO_> > . > --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21175: [SPARK-24107][CORE] ChunkedByteBuffer.writeFully ...
Github user JoshRosen commented on a diff in the pull request: https://github.com/apache/spark/pull/21175#discussion_r188154900 --- Diff: core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala --- @@ -63,10 +63,15 @@ private[spark] class ChunkedByteBuffer(var chunks: Array[ByteBuffer]) { */ def writeFully(channel: WritableByteChannel): Unit = { for (bytes <- getChunks()) { - while (bytes.remaining() > 0) { -val ioSize = Math.min(bytes.remaining(), bufferWriteChunkSize) -bytes.limit(bytes.position() + ioSize) -channel.write(bytes) + val curChunkLimit = bytes.limit() + while (bytes.hasRemaining) { +try { + val ioSize = Math.min(bytes.remaining(), bufferWriteChunkSize) + bytes.limit(bytes.position() + ioSize) --- End diff -- The rationale for the `limit()` isn't super-clear, but that was a problem in the original PR which introduced the bug (#18730). I'm commenting here only for cross-reference reference for folks who come across this patch in the future. I believe that the original motivation was http://www.evanjones.ca/java-bytebuffer-leak.html --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21175: [SPARK-24107][CORE] ChunkedByteBuffer.writeFully ...
Github user JoshRosen commented on a diff in the pull request: https://github.com/apache/spark/pull/21175#discussion_r188154716 --- Diff: core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala --- @@ -63,10 +63,15 @@ private[spark] class ChunkedByteBuffer(var chunks: Array[ByteBuffer]) { */ def writeFully(channel: WritableByteChannel): Unit = { for (bytes <- getChunks()) { - while (bytes.remaining() > 0) { -val ioSize = Math.min(bytes.remaining(), bufferWriteChunkSize) -bytes.limit(bytes.position() + ioSize) -channel.write(bytes) + val curChunkLimit = bytes.limit() + while (bytes.hasRemaining) { +try { + val ioSize = Math.min(bytes.remaining(), bufferWriteChunkSize) + bytes.limit(bytes.position() + ioSize) + channel.write(bytes) +} finally { --- End diff -- I don't think we need the `try` and `finally` here because `getChunks()` returns duplicated ByteBuffers which have their own position and limit. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21212: [SPARK-24143] filter empty blocks when convert ma...
Github user JoshRosen commented on a diff in the pull request: https://github.com/apache/spark/pull/21212#discussion_r186315362 --- Diff: core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala --- @@ -267,28 +269,30 @@ final class ShuffleBlockFetcherIterator( // at most maxBytesInFlight in order to limit the amount of data in flight. val remoteRequests = new ArrayBuffer[FetchRequest] -// Tracks total number of blocks (including zero sized blocks) -var totalBlocks = 0 for ((address, blockInfos) <- blocksByAddress) { - totalBlocks += blockInfos.size if (address.executorId == blockManager.blockManagerId.executorId) { -// Filter out zero-sized blocks -localBlocks ++= blockInfos.filter(_._2 != 0).map(_._1) +blockInfos.find(_._2 <= 0) match { + case Some((blockId, size)) if size < 0 => +throw new BlockException(blockId, "Negative block size " + size) + case Some((blockId, size)) if size == 0 => +throw new BlockException(blockId, "Zero-sized blocks should be excluded.") --- End diff -- I think that failing with an exception here is a great idea, so thanks for adding these checks. In general, I'm in favor of adding explicit fail-fast checks for invariants like this because it can help to defend against silent corruption bugs. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18801: SPARK-10878 Fix race condition when multiple clie...
Github user JoshRosen commented on a diff in the pull request: https://github.com/apache/spark/pull/18801#discussion_r186160092 --- Diff: core/src/test/scala/org/apache/spark/deploy/SparkSubmitUtilsSuite.scala --- @@ -255,4 +256,20 @@ class SparkSubmitUtilsSuite extends SparkFunSuite with BeforeAndAfterAll { assert(jarPath.indexOf("mydep") >= 0, "should find dependency") } } + + test("test resolution files cleaned after resolving artifact") { --- End diff -- Actually triggering the original race in a test could be pretty hard, so I'm not sure that we necessarily should block this fix on having an end-to-end integration test which could reproduce the original race. I think that the use of the UUID should be sufficient and therefore the only important thing to test is that we're still cleaning up the files properly (as is being done here). Therefore I would welcome a re-submitted and cleaned-up version of this PR which addresses the other review comments (which I hope to merge soon if it's in good shape). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21219: [SPARK-24160] ShuffleBlockFetcherIterator should ...
Github user JoshRosen commented on a diff in the pull request: https://github.com/apache/spark/pull/21219#discussion_r185949405 --- Diff: core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala --- @@ -407,6 +407,25 @@ final class ShuffleBlockFetcherIterator( logDebug("Number of requests in flight " + reqsInFlight) } + if (buf.size == 0) { +// We will never legitimately receive a zero-size block. All blocks with zero records +// have zero size and all zero-size blocks have no records (and hence should never +// have been requested in the first place). This statement relies on behaviors of the +// shuffle writers, which are guaranteed by the following test cases: +// +// - BypassMergeSortShuffleWriterSuite: "write with some empty partitions" +// - UnsafeShuffleWriterSuite: "writeEmptyIterator" +// - DiskBlockObjectWriterSuite: "commit() and close() without ever opening or writing" +// +// There is not an explicit test for SortShuffleWriter but the underlying APIs that +// uses are shared by the UnsafeShuffleWriter (both writers use DiskBlockObjectWriter +// which returns a zero-size from commitAndGet() in case the no records were written --- End diff -- Fixed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21219: [SPARK-24160] ShuffleBlockFetcherIterator should fail if...
Github user JoshRosen commented on the issue: https://github.com/apache/spark/pull/21219 jenkins retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21219: [SPARK-24160] ShuffleBlockFetcherIterator should fail if...
Github user JoshRosen commented on the issue: https://github.com/apache/spark/pull/21219 jenkins retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21219: [SPARK-24160] ShuffleBlockFetcherIterator should ...
GitHub user JoshRosen opened a pull request: https://github.com/apache/spark/pull/21219 [SPARK-24160] ShuffleBlockFetcherIterator should fail if it receives zero-size blocks ## What changes were proposed in this pull request? This patch modifies `ShuffleBlockFetcherIterator` so that the receipt of zero-size blocks is treated as an error. This is done as a preventative measure to guard against a potential source of data loss bugs. In the shuffle layer, we guarantee that zero-size blocks will never be requested (a block containing zero records is always 0 bytes in size and is marked as empty such that it will never be legitimately requested by executors). However, the existing code does not fully take advantage of this invariant in the shuffle-read path: the existing code did not explicitly check whether blocks are non-zero-size. Additionally, our decompression and deserialization streams treat zero-size inputs as empty streams rather than errors (EOF might actually be treated as "end-of-stream" in certain layers (longstanding behavior dating to earliest versions of Spark) and decompressors like Snappy may be tolerant to zero-size inputs). As a result, if some other bug causes legitimate buffers to be replaced with zero-sized buffers (due to corruption on either the send or receive sides) then this would translate into silent data loss rather than an explicit fail-fast error. This patch addresses this problem by adding a `buf.size != 0` check. See code comments for pointers to tests which guarantee the invariants relied on here. ## How was this patch tested? Existing tests (which required modifications, since some were creating empty buffers in mocks). I also added a test to make sure we fail on zero-size blocks. To test that the zero-size blocks are indeed a potential corruption source, I manually ran a workload in `spark-shell` with a modified build which replaces all buffers with zero-size buffers in the receive path. You can merge this pull request into a Git repository by running: $ git pull https://github.com/JoshRosen/spark SPARK-24160 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/21219.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #21219 commit 41d06e13d0f95f1dd146b6b512a0becc88eb2caa Author: Josh Rosen <joshrosen@...> Date: 2018-05-02T21:59:26Z ShuffleBlockFetcherIterator should fail if it receives zero-size blocks --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21101: [SPARK-23989][SQL] exchange should copy data befo...
Github user JoshRosen commented on a diff in the pull request: https://github.com/apache/spark/pull/21101#discussion_r182609675 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala --- @@ -167,22 +164,24 @@ object ShuffleExchangeExec { val shuffleManager = SparkEnv.get.shuffleManager val sortBasedShuffleOn = shuffleManager.isInstanceOf[SortShuffleManager] val bypassMergeThreshold = conf.getInt("spark.shuffle.sort.bypassMergeThreshold", 200) +val numParts = partitioner.numPartitions if (sortBasedShuffleOn) { - val bypassIsSupported = SparkEnv.get.shuffleManager.isInstanceOf[SortShuffleManager] - if (bypassIsSupported && partitioner.numPartitions <= bypassMergeThreshold) { + if (numParts <= bypassMergeThreshold) { // If we're using the original SortShuffleManager and the number of output partitions is // sufficiently small, then Spark will fall back to the hash-based shuffle write path, which // doesn't buffer deserialized records. // Note that we'll have to remove this case if we fix SPARK-6026 and remove this bypass. false - } else if (serializer.supportsRelocationOfSerializedObjects) { + } else if (numParts <= SortShuffleManager.MAX_SHUFFLE_OUTPUT_PARTITIONS_FOR_SERIALIZED_MODE) { --- End diff -- I was almost going to suggest that we should we check for both conditions with an `&&` here just as future-proofing in case `serializer` was changed, but I can now see why that isn't necessary: we always use an `UnsafeRowSerializer` here now. It was only in the pre-Tungsten era that we could use either `UnsafeRowSerializer` or `SparkSqlSerializer` here. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20310: revert [SPARK-10030] Use tags to control which te...
Github user JoshRosen commented on a diff in the pull request: https://github.com/apache/spark/pull/20310#discussion_r162268244 --- Diff: common/tags/src/test/java/org/apache/spark/tags/DockerTest.java --- @@ -1,26 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - *http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.tags; - -import java.lang.annotation.*; -import org.scalatest.TagAnnotation; - -@TagAnnotation -@Retention(RetentionPolicy.RUNTIME) -@Target({ElementType.METHOD, ElementType.TYPE}) -public @interface DockerTest { } --- End diff -- If you search through the commit history, I'm pretty sure that we originally tried running those DockerTests on RISELAB Jenkins but ran into problems with the Docker Daemon becoming unstable under heavy build load. This should be fixed in the newer-generation Ubuntu build workers, but we haven't quite finished migrating the PRBs onto those. Given this, my hunch is that those tests aren't running anywhere right now, which isn't great. I think they're primarily used for testing JDBC data source SQL dialect mappings. It's been a year or more since I last looked into this, though, so I might be misremembering. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20310: revert [SPARK-10030] Use tags to control which tests to ...
Github user JoshRosen commented on the issue: https://github.com/apache/spark/pull/20310 Are you sure that we want to blanket revert this entire patch? Is there a more surgical short-term fix we can make in `dev/sparktestsupport/modules.py` to just always unconditionally enable the tag for now? Also, is this the first time recently that we've failed the YARN integration tests? How much time do they add? The trade off here seems to be between slightly slower after-the-fact detection of a test failure / build break due to YARN vs. faster tests for the majority of PRs that don't touch YARN code. I think we've had one or two such breaks in the 2+ years that we've been using these test tags, so I'd also be fine with postponing this change if you agree that it's unlikely that we're going to have many such failures here. If the motivation is that it's hard to test the fix for such build breaks (because the failing test wouldn't be exercised in the PR builder) then I think we might already have a solution via special tags placed into the PR title (I think `test-yarn` or something similar; see `run-tests-jenkins.py`). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20264: [SPARK-23070] Bump previousSparkVersion in MimaBu...
Github user JoshRosen commented on a diff in the pull request: https://github.com/apache/spark/pull/20264#discussion_r161418395 --- Diff: project/MimaBuild.scala --- @@ -88,7 +88,7 @@ object MimaBuild { def mimaSettings(sparkHome: File, projectRef: ProjectRef) = { val organization = "org.apache.spark" -val previousSparkVersion = "2.0.0" +val previousSparkVersion = "2.2.0" --- End diff -- Yeah, 2.2.0 is right. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20222: [SPARK-23028] Bump master branch version to 2.4.0...
Github user JoshRosen commented on a diff in the pull request: https://github.com/apache/spark/pull/20222#discussion_r160741170 --- Diff: project/MimaExcludes.scala --- @@ -34,6 +34,10 @@ import com.typesafe.tools.mima.core.ProblemFilters._ */ object MimaExcludes { + // Exclude rules for 2.4.x --- End diff -- We might want to make my proposed MiMa changes in a separate patch so the same set of reviewed changes can go to both master and branch-2.3. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20222: [SPARK-23028] Bump master branch version to 2.4.0...
Github user JoshRosen commented on a diff in the pull request: https://github.com/apache/spark/pull/20222#discussion_r160740904 --- Diff: project/MimaExcludes.scala --- @@ -34,6 +34,10 @@ import com.typesafe.tools.mima.core.ProblemFilters._ */ object MimaExcludes { + // Exclude rules for 2.4.x --- End diff -- This reminds me that we should probably bump `previousSparkVersion` in `MimaBuild.scala` to be 2.2.0: https://github.com/apache/spark/blame/f340b6b3066033d40b7e163fd5fb68e9820adfb1/project/MimaBuild.scala#L91. I think this should happen for both master and branch-2.3. See #15061 for an example of a similar change when 2.1.x was being prepared (it looks like we missed this step for 2.2.0). We may also need to un-exclude any new subprojects / artifacts that were added in 2.2.0 since they now need to be backwards-compatibility-tested for 2.3.x. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20222: [SPARK-23028] Bump master branch version to 2.4.0-SNAPSH...
Github user JoshRosen commented on the issue: https://github.com/apache/spark/pull/20222 We should already be set up for 2.3.x builds in AMPLab Jenkins. For example: https://amplab.cs.berkeley.edu/jenkins/view/Spark%20QA%20Test/job/spark-branch-2.3-test-maven-hadoop-2.6/ --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20191: [SPARK-22997] Add additional defenses against use of fre...
Github user JoshRosen commented on the issue: https://github.com/apache/spark/pull/20191 I'm merging this into master and branch-2.3. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20191: [SPARK-22997] Add additional defenses against use of fre...
Github user JoshRosen commented on the issue: https://github.com/apache/spark/pull/20191 jenkins retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20179: [SPARK-22982] Remove unsafe asynchronous close() ...
Github user JoshRosen commented on a diff in the pull request: https://github.com/apache/spark/pull/20179#discussion_r160547545 --- Diff: core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala --- @@ -376,18 +374,13 @@ private[netty] class NettyRpcEnv( def setError(e: Throwable): Unit = { error = e - source.close() } override def read(dst: ByteBuffer): Int = { Try(source.read(dst)) match { +case _ if error != null => throw error --- End diff -- I added a pair of comments to explain the flow of calls involving `setError()` and pipe closes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20179: [SPARK-22982] Remove unsafe asynchronous close() ...
Github user JoshRosen commented on a diff in the pull request: https://github.com/apache/spark/pull/20179#discussion_r160489460 --- Diff: core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala --- @@ -196,11 +196,24 @@ private[spark] class IndexShuffleBlockResolver( // find out the consolidated file, then the offset within that from our index val indexFile = getIndexFile(blockId.shuffleId, blockId.mapId) -val in = new DataInputStream(new FileInputStream(indexFile)) +// SPARK-22982: if this FileInputStream's position is seeked forward by another piece of code +// which is incorrectly using our file descriptor then this code will fetch the wrong offsets +// (which may cause a reducer to be sent a different reducer's data). The explicit position +// checks added here were a useful debugging aid during SPARK-22982 and may help prevent this +// class of issue from re-occurring in the future which is why they are left here even though +// SPARK-22982 is fixed. +val channel = Files.newByteChannel(indexFile.toPath) +channel.position(blockId.reduceId * 8) --- End diff -- For some more background: the asynchronous `close()` bug can cause reads from a closed-and-subsequently-reassigned file descriptor number and in principle this can affect almost any IO operation _anywhere_ in the application. For example, if the closed file descriptor number is immediately recycled by opening a socket then the invalid read can cause that socket read to miss data (since the data would have been consumed by the invalid reader and won't be delivered to the legitimate new user of the file descriptor). Given this, I see how it might be puzzling that this patch is adding a check only here. There are two reasons for this: 1. Many other IO operations have implicit checksumming such that dropping data due to an invalid read be detected and cause an exception. For example, many compression codecs have block-level checksumming (and magic numbers at the beginning of the stream), so dropping data (especially at the start of a read) will be detected. This particular shuffle index file, however, does _not_ have mechanisms to detect corruption: skipping forward in the read by a multiple of 8 bytes will still read structurally-valid data (but it will be the _wrong_ data, causing the wrong output to be read from the shuffle data file). 2. In the investigation which uncovered this bug, the invalid reads were predominantly impacting shuffle index lookups for reading local blocks. In a nutshell, there's a subtle race condition where Janino codegen compilation triggers attempted remote classloading of classes which don't exist, triggering the error-handling / error-propagation paths in `FileDownloadChannel` and causing the invalid asynchronous `close()` call to be performed. At the same time that this `close()` call was being performed, another task from the same stage attempts to read the shuffle index files of local blocks and experiences an invalid read due to the falsely-shared file descriptor. This is a very hard-to-trigger bug: we were only able to reproduce it on large clusters with very fast machines and shuffles that contain large numbers of map and reduce tasks (more shuffle blocks means more index file reads and more chances for the race to occur; faster machines increase the likelihood of the race occurring; larger clusters give us more chances for the error to occur). In our reproduction, this race occurred on a microsecond timescale (measured via kernel syscall tracing) and occurred relatively rarely, requiring many iterations until we could trigger a reproduction. While investigating, I added these checks so that the index read fails-fast when this issue occurs, which made it significantly easier to reproduce and diagnose the root cause (fixed by the other changes in this patch). There are a number of interesting details in the story of how we worked from the original high-level data corruption symptom to this low-level IO bug. I'll see about writing up the complete story in a blog post at some point. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20179: [SPARK-22982] Remove unsafe asynchronous close() ...
Github user JoshRosen commented on a diff in the pull request: https://github.com/apache/spark/pull/20179#discussion_r160481843 --- Diff: core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala --- @@ -196,11 +196,24 @@ private[spark] class IndexShuffleBlockResolver( // find out the consolidated file, then the offset within that from our index val indexFile = getIndexFile(blockId.shuffleId, blockId.mapId) -val in = new DataInputStream(new FileInputStream(indexFile)) +// SPARK-22982: if this FileInputStream's position is seeked forward by another piece of code +// which is incorrectly using our file descriptor then this code will fetch the wrong offsets +// (which may cause a reducer to be sent a different reducer's data). The explicit position +// checks added here were a useful debugging aid during SPARK-22982 and may help prevent this +// class of issue from re-occurring in the future which is why they are left here even though +// SPARK-22982 is fixed. +val channel = Files.newByteChannel(indexFile.toPath) +channel.position(blockId.reduceId * 8) +val in = new DataInputStream(Channels.newInputStream(channel)) try { - ByteStreams.skipFully(in, blockId.reduceId * 8) val offset = in.readLong() val nextOffset = in.readLong() + val actualPosition = channel.position() + val expectedPosition = blockId.reduceId * 8 + 16 + if (actualPosition != expectedPosition) { +throw new Exception(s"SPARK-22982: Incorrect channel position after index file reads: " + --- End diff -- Any suggestions for a better exception subtype? I don't expect this to be a recoverable error and wanted to avoid the possibility that downstream code catches and handles this error. Maybe I should go further and make it a RuntimeException to make it even more fatal? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20179: [SPARK-22982] Remove unsafe asynchronous close() ...
Github user JoshRosen commented on a diff in the pull request: https://github.com/apache/spark/pull/20179#discussion_r160314055 --- Diff: core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala --- @@ -196,11 +196,24 @@ private[spark] class IndexShuffleBlockResolver( // find out the consolidated file, then the offset within that from our index val indexFile = getIndexFile(blockId.shuffleId, blockId.mapId) -val in = new DataInputStream(new FileInputStream(indexFile)) +// SPARK-22982: if this FileInputStream's position is seeked forward by another piece of code +// which is incorrectly using our file descriptor then this code will fetch the wrong offsets +// (which may cause a reducer to be sent a different reducer's data). The explicit position +// checks added here were a useful debugging aid during SPARK-22982 and may help prevent this +// class of issue from re-occurring in the future which is why they are left here even though +// SPARK-22982 is fixed. +val channel = Files.newByteChannel(indexFile.toPath) +channel.position(blockId.reduceId * 8) --- End diff -- I made sure to incorporate @zsxwing's changes here. The problem originally related to calling `skip()`, but this change is from his fix to explicitly use `position` on a `FileChannel` instead. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20179: [SPARK-22982] Remove unsafe asynchronous close() ...
Github user JoshRosen commented on a diff in the pull request: https://github.com/apache/spark/pull/20179#discussion_r160313840 --- Diff: core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala --- @@ -376,18 +374,13 @@ private[netty] class NettyRpcEnv( def setError(e: Throwable): Unit = { error = e - source.close() } override def read(dst: ByteBuffer): Int = { --- End diff -- Yes. This currently happens in two places: - In Utils.doFetchFile(): https://github.com/apache/spark/blob/28315714ddef3ddcc192375e98dd5207cf4ecc98/core/src/main/scala/org/apache/spark/util/Utils.scala#L661: the stream gets passed a couple of layers down to a `Utils.copyStream(closeStreams = true)` call which is guaranteed to clean up the stream. - In ExecutorClassLoader, where we construct the stream in `fetchFn` from `getclassFileInputStreamFromRpc` and then close it in a `finally` block in `findClassLocally`: https://github.com/apache/spark/blob/e08d06b37bc96cc48fec1c5e40f73e0bca09c616/repl/src/main/scala/org/apache/spark/repl/ExecutorClassLoader.scala#L167 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20179: [SPARK-22982] Remove unsafe asynchronous close() ...
Github user JoshRosen commented on a diff in the pull request: https://github.com/apache/spark/pull/20179#discussion_r160312699 --- Diff: core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala --- @@ -196,11 +196,24 @@ private[spark] class IndexShuffleBlockResolver( // find out the consolidated file, then the offset within that from our index val indexFile = getIndexFile(blockId.shuffleId, blockId.mapId) -val in = new DataInputStream(new FileInputStream(indexFile)) +// SPARK-22982: if this FileInputStream's position is seeked forward by another piece of code +// which is incorrectly using our file descriptor then this code will fetch the wrong offsets +// (which may cause a reducer to be sent a different reducer's data). The explicit position +// checks added here were a useful debugging aid during SPARK-22982 and may help prevent this +// class of issue from re-occurring in the future which is why they are left here even though +// SPARK-22982 is fixed. +val channel = Files.newByteChannel(indexFile.toPath) +channel.position(blockId.reduceId * 8) +val in = new DataInputStream(Channels.newInputStream(channel)) try { - ByteStreams.skipFully(in, blockId.reduceId * 8) val offset = in.readLong() val nextOffset = in.readLong() + val actualPosition = channel.position() + val expectedPosition = blockId.reduceId * 8 + 16 + if (actualPosition != expectedPosition) { --- End diff -- I considered this, but I don't think there's ever a case where we want to elide this particular check: if we read an incorrect offset here then there's (potentially) no other mechanism to detect this error, leading to silent wrong answers. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20179: [SPARK-22982] Remove unsafe asynchronous close() ...
Github user JoshRosen commented on a diff in the pull request: https://github.com/apache/spark/pull/20179#discussion_r160312383 --- Diff: core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala --- @@ -332,15 +332,17 @@ private[netty] class NettyRpcEnv( val pipe = Pipe.open() val source = new FileDownloadChannel(pipe.source()) +var exceptionThrown = true try { val client = downloadClient(parsedUri.getHost(), parsedUri.getPort()) val callback = new FileDownloadCallback(pipe.sink(), source, client) client.stream(parsedUri.getPath(), callback) -} catch { - case e: Exception => + exceptionThrown = false +} finally { + if (exceptionThrown) { pipe.sink().close() --- End diff -- To preserve the same behavior, I think we'd need to do ```scala try { // code } catch { case t: Throwable => pipe.sink().close() source.close() throw t } ``` to ensure that we propagate the original `Throwable`. It could be clearer (and safer, in the case of exceptions thrown from `close()` calls), to use [`Utils.tryWithSafeFinallyAndFailureCallbacks`](https://github.com/apache/spark/blob/849043ce1d28a976659278d29368da0799329db8/core/src/main/scala/org/apache/spark/util/Utils.scala#L1407) here, so let me make that change. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20191: [SPARK-22997] Add additional defenses against use of fre...
Github user JoshRosen commented on the issue: https://github.com/apache/spark/pull/20191 jenkins retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20191: [SPARK-22997] Add additional defenses against use...
Github user JoshRosen commented on a diff in the pull request: https://github.com/apache/spark/pull/20191#discussion_r160297024 --- Diff: common/unsafe/src/main/java/org/apache/spark/unsafe/memory/UnsafeMemoryAllocator.java --- @@ -38,9 +38,20 @@ public MemoryBlock allocate(long size) throws OutOfMemoryError { public void free(MemoryBlock memory) { assert (memory.obj == null) : "baseObject not null; are you trying to use the off-heap allocator to free on-heap memory?"; +assert (memory.pageNumber != MemoryBlock.FREED_IN_ALLOCATOR_PAGE_NUMBER) : + "page has already been freed"; +assert ((memory.pageNumber == MemoryBlock.NO_PAGE_NUMBER) +|| (memory.pageNumber == MemoryBlock.FREED_IN_TMM_PAGE_NUMBER)) : + "TMM-allocated pages must be freed via TMM.freePage(), not directly in allocator free()"; + if (MemoryAllocator.MEMORY_DEBUG_FILL_ENABLED) { memory.fill(MemoryAllocator.MEMORY_DEBUG_FILL_FREED_VALUE); } Platform.freeMemory(memory.offset); +// As an additional layer of defense against use-after-free bugs, we mutate the +// MemoryBlock to reset its pointer. +memory.offset = 0; --- End diff -- Yep, this will guarantee SIGSEGV instead of corruption. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20191: [SPARK-22997] Add additional defenses against use...
Github user JoshRosen commented on a diff in the pull request: https://github.com/apache/spark/pull/20191#discussion_r160296722 --- Diff: common/unsafe/src/main/java/org/apache/spark/unsafe/memory/UnsafeMemoryAllocator.java --- @@ -38,9 +38,20 @@ public MemoryBlock allocate(long size) throws OutOfMemoryError { public void free(MemoryBlock memory) { assert (memory.obj == null) : "baseObject not null; are you trying to use the off-heap allocator to free on-heap memory?"; +assert (memory.pageNumber != MemoryBlock.FREED_IN_ALLOCATOR_PAGE_NUMBER) : + "page has already been freed"; +assert ((memory.pageNumber == MemoryBlock.NO_PAGE_NUMBER) +|| (memory.pageNumber == MemoryBlock.FREED_IN_TMM_PAGE_NUMBER)) : + "TMM-allocated pages must be freed via TMM.freePage(), not directly in allocator free()"; + if (MemoryAllocator.MEMORY_DEBUG_FILL_ENABLED) { memory.fill(MemoryAllocator.MEMORY_DEBUG_FILL_FREED_VALUE); } Platform.freeMemory(memory.offset); +// As an additional layer of defense against use-after-free bugs, we mutate the +// MemoryBlock to reset its pointer. +memory.offset = 0; --- End diff -- I think that it depends on whether the direct memory address stored in `memory.offset` corresponds to the address of memory allocated to the JVM. If we've freed the underlying Unsafe / Direct memory then I would expect a SIGSEGV, but if the address has been re-used by a subsequent allocation (or if we've done some buffer pooling and have recycled the underlying direct memory without freeing it) then this address could point to valid memory allocated by the JVM but which is currently in use by another Spark task, so reads or writes would trigger data corruption. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20191: [SPARK-22997] Add additional defenses against use...
GitHub user JoshRosen opened a pull request: https://github.com/apache/spark/pull/20191 [SPARK-22997] Add additional defenses against use of freed MemoryBlocks ## What changes were proposed in this pull request? This patch modifies Spark's `MemoryAllocator` implementations so that `free(MemoryBlock)` mutates the passed block to clear pointers (in the off-heap case) or null out references to backing `long[]` arrays (in the on-heap case). The goal of this change is to add an extra layer of defense against use-after-free bugs because currently it's hard to detect corruption caused by blind writes to freed memory blocks. ## How was this patch tested? New unit tests in `PlatformSuite`, including new tests for existing functionality because we did not have sufficient mutation coverage of the on-heap memory allocator's pooling logic. You can merge this pull request into a Git repository by running: $ git pull https://github.com/JoshRosen/spark SPARK-22997-add-defenses-against-use-after-free-bugs-in-memory-allocator Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/20191.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #20191 commit a7f8c07fb5158f39bbb6cc1f23cfb13a0d473536 Author: Josh Rosen <joshrosen@...> Date: 2018-01-08T23:50:18Z Add additional defenses against use of freed MemoryBlocks --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20182: [SPARK-22985] Fix argument escaping bug in from_u...
GitHub user JoshRosen opened a pull request: https://github.com/apache/spark/pull/20182 [SPARK-22985] Fix argument escaping bug in from_utc_timestamp / to_utc_timestamp codegen ## What changes were proposed in this pull request? This patch adds additional escaping in `from_utc_timestamp` / `to_utc_timestamp` expression codegen in order to a bug where invalid timezones which contain special characters could cause generated code to fail to compile. ## How was this patch tested? New regression tests in `DateExpressionsSuite`. You can merge this pull request into a Git repository by running: $ git pull https://github.com/JoshRosen/spark SPARK-22985-fix-utc-timezone-function-escaping-bugs Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/20182.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #20182 commit 2d10131892e54182517185102a362dd17de2af9b Author: Josh Rosen <joshrosen@...> Date: 2018-01-08T00:25:30Z regression test commit d94feed8e1f10c1610cae7cb3f7a3ad7014123e9 Author: Josh Rosen <joshrosen@...> Date: 2018-01-08T00:26:05Z Fix --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20181: [SPARK-22984] Fix incorrect bitmap copying and of...
GitHub user JoshRosen opened a pull request: https://github.com/apache/spark/pull/20181 [SPARK-22984] Fix incorrect bitmap copying and offset adjustment in GenerateUnsafeRowJoiner ## What changes were proposed in this pull request? This PR fixes a longstanding correctness bug in `GenerateUnsafeRowJoiner`. This class was introduced in https://github.com/apache/spark/pull/7821 (July 2015 / Spark 1.5.0+) and is used to combine pairs of UnsafeRows in TungstenAggregationIterator, CartesianProductExec, and AppendColumns. ### Bugs fixed by this patch 1. **Incorrect combining of null-tracking bitmaps**: when concatenating two UnsafeRows, the implementation "Concatenate the two bitsets together into a single one, taking padding into account". If one row has no columns then it has a bitset size of 0, but the code was incorrectly assuming that if the left row had a non-zero number of fields then the right row would also have at least one field, so it was copying invalid bytes and and treating them as part of the bitset. I'm not sure whether this bug was also present in the original implementation or whether it was introduced in https://github.com/apache/spark/pull/7892 (which fixed another bug in this code). 2. **Incorrect updating of data offsets for null variable-length fields**: after updating the bitsets and copying fixed-length and variable-length data, we need to perform adjustments to the offsets pointing the start of variable length fields's data. The existing code was _conditionally_ adding a fixed offset to correct for the new length of the combined row, but it is unsafe to do this if the variable-length field has a null value: we always represent nulls by storing `0` in the fixed-length slot, but this code was incorrectly incrementing those values. This bug was present since the original version of `GenerateUnsafeRowJoiner`. ### Why this bug remained latent for so long The PR which introduced `GenerateUnsafeRowJoiner` features several randomized tests, including tests of the cases where one side of the join has no fields and where string-valued fields are null. However, the existing assertions were too weak to uncover this bug: - If a null field has a non-zero value in its fixed-length data slot then this will not cause problems for field accesses because the null-tracking bitmap should still be correct and we will not try to use the incorrect offset for anything. - If the null tracking bitmap is corrupted by joining against a row with no fields then the corruption occurs in field numbers past the actual field numbers contained in the row. Thus valid `isNullAt()` calls will not read the incorrectly-set bits. The existing `GenerateUnsafeRowJoinerSuite` tests only exercised `.get()` and `isNullAt()`, but didn't actually check the UnsafeRows for bit-for-bit equality, preventing these bugs from failing assertions. It turns out that there was even a [GenerateUnsafeRowJoinerBitsetSuite](https://github.com/apache/spark/blob/03377d2522776267a07b7d6ae9bddf79a4e0f516/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeRowJoinerBitsetSuite.scala) but it looks like it also didn't catch this problem because it only tested the bitsets in an end-to-end fashion by accessing them through the `UnsafeRow` interface instead of actually comparing the bitsets' bytes. ### Impact of these bugs - This bug will cause `equals()` and `hashCode()` to be incorrect for these rows, which will be problematic in case`GenerateUnsafeRowJoiner`'s results are used as join or grouping keys. - Chained / repeated invocations of `GenerateUnsafeRowJoiner` may result in reads from invalid null bitmap positions causing fields to incorrectly become NULL (see the end-to-end example below). - It looks like this generally only happens in `CartesianProductExec`, which our query optimizer often avoids executing (usually we try to plan a `BroadcastNestedLoopJoin` instead). ### End-to-end test case demonstrating the problem The following query demonstrates how this bug may result in incorrect query results: ```sql set spark.sql.autoBroadcastJoinThreshold=-1; -- Needed to trigger CartesianProductExec create table a as select * from values 1; create table b as select * from values 2; SELECT t3.col1, t1.col1 FROM a t1 CROSS JOIN b t2 CROSS JOIN b t3 ``` This should return `(2, 1)` but instead was returning `(null, 1)`. Column pruning ends up trimming off all columns from `t2`, so when `t2` joins with another table this triggers the bitmap-copying bug. This incorrect bitmap is subsequently copied again when performing the final join, causing the final output to have an incorrectly-set null bit for the first field. ## How was this pa
[GitHub] spark pull request #20180: [SPARK-22983] Don't push filters beneath aggregat...
GitHub user JoshRosen opened a pull request: https://github.com/apache/spark/pull/20180 [SPARK-22983] Don't push filters beneath aggregates with empty grouping expressions ## What changes were proposed in this pull request? The following SQL query should return zero rows, but in Spark it actually returns one row: ``` SELECT 1 from ( SELECT 1 AS z, MIN(a.x) FROM (select 1 as x) a WHERE false ) b where b.z != b.z ``` The problem stems from the `PushDownPredicate` rule: when this rule encounters a filter on top of an Aggregate operator, e.g. `Filter(Agg(...))`, it removes the original filter and adds a new filter onto Aggregate's child, e.g. `Agg(Filter(...))`. This is sometimes okay, but the case above is a counterexample: because there is no explicit `GROUP BY`, we are implicitly computing a global aggregate over the entire table so the original filter was not acting like a `HAVING` clause filtering the number of groups: if we push this filter then it fails to actually reduce the cardinality of the Aggregate output, leading to the wrong answer. In 2016 I fixed a similar problem involving invalid pushdowns of data-independent filters (filters which reference no columns of the filtered relation). There was additional discussion after my fix was merged which pointed out that my patch was an incomplete fix (see #15289), but it looks I must have either misunderstood the comment or forgot to follow up on the additional points raised there. This patch fixes the problem by choosing to never push down filters in cases where there are no grouping expressions. Since there are no grouping keys, the only columns are aggregate columns and we can't push filters defined over aggregate results, so this change won't cause us to miss out on any legitimate pushdown opportunities. ## How was this patch tested? New regression tests in `SQLQueryTestSuite` and `FilterPushdownSuite`. You can merge this pull request into a Git repository by running: $ git pull https://github.com/JoshRosen/spark SPARK-22983-dont-push-filters-beneath-aggs-with-empty-grouping-expressions Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/20180.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #20180 commit 5568d55255cbdd5313f7391755c7c39d34390c30 Author: Josh Rosen <joshrosen@...> Date: 2018-01-07T05:14:56Z Don't push filters beneath aggregates with empty grouping expressions --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20179: [SPARK-22982] Remove unsafe asynchronous close() ...
GitHub user JoshRosen opened a pull request: https://github.com/apache/spark/pull/20179 [SPARK-22982] Remove unsafe asynchronous close() call from FileDownloadChannel ## What changes were proposed in this pull request? This patch fixes a severe asynchronous IO bug in Spark's Netty-based file transfer code. At a high-level, the problem is that an unsafe asynchronous `close()` of a pipe's source channel creates a race condition where file transfer code closes a file descriptor then attempts to read from it. If the closed file descriptor's number has been reused by an `open()` call then this invalid read may cause unrelated file operations to return incorrect results. **One manifestation of this problem is incorrect query results.** For a high-level overview of how file download works, take a look at the control flow in `NettyRpcEnv.openChannel()`: this code creates a pipe to buffer results, then submits an asynchronous stream request to a lower-level TransportClient. The callback passes received data to the sink end of the pipe. The source end of the pipe is passed back to the caller of `openChannel()`. Thus `openChannel()` returns immediately and callers interact with the returned pipe source channel. Because the underlying stream request is asynchronous, errors may occur after `openChannel()` has returned and after that method's caller has started to `read()` from the returned channel. For example, if a client requests an invalid stream from a remote server then the "stream does not exist" error may not be received from the remote server until after `openChannel()` has returned. In order to be able to propagate the "stream does not exist" error to the file-fetching application thread, this code wraps the pipe's source channel in a special `FileDownloadChannel` which adds an `setError(t: Throwable)` method, then calls this `setError()` method in the FileDownloadCallback's `onFailure` method. It is possible for `FileDownloadChannel`'s `read()` and `setError()` methods to be called concurrently from different threads: the `setError()` method is called from within the Netty RPC system's stream callback handlers, while the `read()` methods are called from higher-level application code performing remote stream reads. The problem lies in `setError()`: the existing code closed the wrapped pipe source channel. Because `read()` and `setError()` occur in different threads, this means it is possible for one thread to be calling `source.read()` while another asynchronously calls `source.close()`. Java's IO libraries do not guarantee that this will be safe and, in fact, it's possible for these operations to interleave in such a way that a lower-level `read()` system call occurs right after a `close()` call. In the best-case, this fails as a read of a closed file descriptor; in the worst-case, the file descriptor number has been re-used by an intervening `open()` operation and the read corrupts the result of an unrelated file IO operation being performed by a different thread. The solution here is to remove the `stream.close()` call in `onError()`: the thread that is performing the `read()` calls is responsible for closing the stream in a `finally` block, so there's no need to close it here. If that thread is blocked in a `read()` then it will become unblocked when the sink end of the pipe is closed in `FileDownloadCallback.onFailure()`. After making this change, we also need to refine the `read()` method to always check for a `setError()` result, even if the underlying channel `read()` call has succeeded. This patch also makes a slight cleanup to a dodgy-looking `catch e: Exception` block to use a safer `try-finally` error handling idiom. This bug was introduced in SPARK-11956 / #9941 and is present in Spark 1.6.0+. ## How was this patch tested? This fix was tested manually against a workload which non-deterministically hit this bug. You can merge this pull request into a Git repository by running: $ git pull https://github.com/JoshRosen/spark SPARK-22982-fix-unsafe-async-io-in-file-download-channel Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/20179.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #20179 commit 2138aa6d8225fb7c343c4a61dadc8c44e902e35b Author: Josh Rosen <joshrosen@...> Date: 2018-01-07T23:21:33Z Add position checks to IndexShuffleBlockResolver. commit 8e5ffa451f66dc64203dede68b9c0ac5fdc952cf Author: Josh Rosen <joshrosen@...> Date: 2018-01-07T23:22:44Z Remove unsafe asynchronous close() call from FileDownloadChannel --- - To unsub
[GitHub] spark issue #19971: [SPARK-22774][SQL][Test] Add compilation check into TPCD...
Github user JoshRosen commented on the issue: https://github.com/apache/spark/pull/19971 Wait, nevermind: I see that this isn't actually running the queries, so setting `spark.sql.codegen.fallback=false` wouldn't be sufficient. Separate from this PR, we might want to set `spark.sql.codegen.fallback=false` in more tests, though: it looks like it's on for Hive tests but might be disabled in some SQL suites. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19971: [SPARK-22774][SQL][Test] Add compilation check into TPCD...
Github user JoshRosen commented on the issue: https://github.com/apache/spark/pull/19971 Could we address this more broadly by ensuring that whole stage codegen fallback is disabled in tests? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19959: [SPARK-22766] Install R linter package in spark lib dire...
Github user JoshRosen commented on the issue: https://github.com/apache/spark/pull/19959 Because `LOCAL_LIB_LOC` seems to default to `$SPARK_HOME/R/lib`, will installing to this folder create the potential for `lintr` to be installed into a `lib` folder which gets packaged up in binary builds? Just want to double-check to make sure this won't mess up any packaging scripts which might be interacting with that directory. (I'm not super familiar with how R packaging works, so just wanted to double-check and be safe). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19788: [SPARK-9853][Core] Optimize shuffle fetch of contiguous ...
Github user JoshRosen commented on the issue: https://github.com/apache/spark/pull/19788 Is there an implicit assumption here that contiguous partitions' data can be decompressed / deserialized in a single stream? If the shuffled data is written with a non-relocatable serializer (Java serialization) or non-concatenatable compression format then I'm not sure that you'll actually be able to successfully deserialize a multi-reducer range of the map output using a single decompression / deserialization stream. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19433: [SPARK-3162] [MLlib] Add local tree training for decisio...
Github user JoshRosen commented on the issue: https://github.com/apache/spark/pull/19433 jenkins retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19077: [SPARK-21860][core]Improve memory reuse for heap ...
Github user JoshRosen commented on a diff in the pull request: https://github.com/apache/spark/pull/19077#discussion_r136487281 --- Diff: common/unsafe/src/main/java/org/apache/spark/unsafe/memory/HeapMemoryAllocator.java --- @@ -47,23 +47,29 @@ private boolean shouldPool(long size) { @Override public MemoryBlock allocate(long size) throws OutOfMemoryError { -if (shouldPool(size)) { +int arraySize = (int)((size + 7) / 8); --- End diff -- You might be able to use `ByteAraryMethods.roundNumberOfBytesToNearestWord` for this, which we'e done for similar rounding elsewhere. Makes it a bit easier to spot what's happening. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19077: [SPARK-21860][core]Improve memory reuse for heap memory ...
Github user JoshRosen commented on the issue: https://github.com/apache/spark/pull/19077 Just curious: do you know where are we allocating these close-in-size chunks of memory? I understand the motivation, but just curious to know what's causing this pattern. I think the original idea here was that most allocations would come from a small set of sizes (usually the page size, or a configurable buffer size) and would not generally be arbitrary sized allocations. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19056: [SPARK-21765] Check that optimization doesn't affect isS...
Github user JoshRosen commented on the issue: https://github.com/apache/spark/pull/19056 (Dummy comment to test JIRA linkage) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18752: [SPARK-21551][Python] Increase timeout for PythonRDD.ser...
Github user JoshRosen commented on the issue: https://github.com/apache/spark/pull/18752 Jenkins, this is ok to test. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18752: [SPARK-21551][Python] Increase timeout for PythonRDD.ser...
Github user JoshRosen commented on the issue: https://github.com/apache/spark/pull/18752 This seems fine to me, especially since it's plausible that you might have a few-second GC pause in some situations. Let me go ahead and have Jenkins test this, then I'll merge it if tests pass (which I assume they will). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18814: [SPARK-21608][SPARK-9221][SQL] Window rangeBetween() API...
Github user JoshRosen commented on the issue: https://github.com/apache/spark/pull/18814 (test comment to test PR dashboard linking) -- --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18814: [SPARK-21608][SPARK-9221][SQL] Window rangeBetween() API...
Github user JoshRosen commented on the issue: https://github.com/apache/spark/pull/18814 (test comment to test PR dashboard linking) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17180: [SPARK-19839][Core]release longArray in BytesToBytesMap
Github user JoshRosen commented on the issue: https://github.com/apache/spark/pull/17180 This seems fine to me. That said, the updated test case is a bit confusing, but I don't think the original test was too clear to begin with. The original test was using the `iterator()` method to make an assertion about the internal state of the map, then was checking whether the pattern of getting a buffer from the map, updating it, then getting it again from the map would reflect the update. After your update to the test, the comment doesn't quite align with the test anymore. The right way to fix this involves splitting up the affected test into two separate tests. We can do that in a followup, though, since I think that we still have sufficient code coverage via other tests and this PR has already been under review for months now so it'd be better to get it merged and move on. On Sat, Jul 29, 2017 at 6:19 PM Xiao Li <notificati...@github.com> wrote: > LGTM Wait for @JoshRosen <https://github.com/joshrosen> for final sign > off. > > â > You are receiving this because you were mentioned. > > > Reply to this email directly, view it on GitHub > <https://github.com/apache/spark/pull/17180#issuecomment-318870735>, or mute > the thread > <https://github.com/notifications/unsubscribe-auth/AADGPLGZbKkiLGK6qsrunDEpRj_Vhs0mks5sS9ojgaJpZM4MUoYt> > . > --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18645: [SPARK-14280][BUILD][WIP] Update change-version.sh and p...
Github user JoshRosen commented on the issue: https://github.com/apache/spark/pull/18645 Looking at the source compatibility changes made here, I was a little confused about why we didn't need to make many more changes. In principle, it seemed like the ambiguous overload issue affecting that `foreachPartition` call should have also impacted calls like `.filter(_ > 0)`, so I was originally expecting that we'd have to update hundreds of call sites. It turns out that the final Scala 2.12 release has a nice feature which resolves this ambiguity in many cases. In the release notes at https://www.scala-lang.org/news/2.12.0/#lambda-syntax-for-sam-types, there's an example where even though there's a potentially ambiguous overload, "overloading resolution selects the one with the `Function1` argument type". This is explained in more detail at https://www.scala-lang.org/news/2.12.0/#sam-conversion-in-overloading-resolution. Quoting: > In Scala 2.12, both methods are applicable, therefore [overloading resolution](http://www.scala-lang.org/files/archive/spec/2.12/06-expressions.html#overloading-resolution) needs to pick the most specific alternative. The specification for [type compatibility](http://www.scala-lang.org/files/archive/spec/2.12/03-types.html#compatibility) has been updated to consider SAM conversion, so that the first alternative is more specific. This explains why we didn't have to update all call sites. However, why did the `.foreachPartition` example break? I played around with this in the Scastie Scala paste bin and I think that I've spotted the problem: all of the cases where we had ambiguity seem to be cases where we're operating on Datasets in a generic way and have a type parameter representing the Dataset's type. So, this means that ``` def f(ds: Dataset[Int]): Dataset[Int] = ds.filter(_ != null) ``` is unambiguous but ``` def f[T](ds: Dataset[T]): Dataset[T] = ds.filter(_ != null) ``` fails with a compiler error in Scala 2.12. You can try this out at https://scastie.scala-lang.org/JoshRosen/eBcxUGdORsiOQNULMUnAsw. I took a look at the sections of the language spec linked in the quote above but it's not immediately clear to me whether this is a compiler bug or expected behavior (I'll have to take some more time to try to understand the spec). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18645: [SPARK-14280][BUILD][WIP] Update change-version.s...
Github user JoshRosen commented on a diff in the pull request: https://github.com/apache/spark/pull/18645#discussion_r128892346 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala --- @@ -353,7 +353,7 @@ class DatasetSuite extends QueryTest with SharedSQLContext { test("foreachPartition") { val ds = Seq(("a", 1), ("b", 2), ("c", 3)).toDS() val acc = sparkContext.longAccumulator -ds.foreachPartition(_.foreach(v => acc.add(v._2))) +ds.foreachPartition((it: Iterator[(String, Int)]) => it.foreach(v => acc.add(v._2))) --- End diff -- I think this is primarily going to be an issue for end users who want to use an existing source tree to cross-compile for Scala 2.10, 2.11, and 2.12. Thus the pain of the source incompatibility would mostly be felt by library/package maintainers but it can be worked around as long as there's at least some common subset which is source compatible across all of those versions. A similar ambiguity issue exists for people writing Spark code in Java 8 and who use lambda syntax. The number of such users must also upgrade to Scala 2.12 is probably fairly small and the number of those users who have a requirement to cross-build their Java 8 lambda syntax Spark code against 2.11 and 2.12 is probably even smaller, so maybe this isn't a huge deal to accept the ambiguity and require those users to do a bit of work when upgrading. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18645: [SPARK-14280][BUILD][WIP] Update change-version.s...
Github user JoshRosen commented on a diff in the pull request: https://github.com/apache/spark/pull/18645#discussion_r128891202 --- Diff: core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala --- @@ -54,7 +54,10 @@ class TaskContextSuite extends SparkFunSuite with BeforeAndAfter with LocalSpark val rdd = new RDD[String](sc, List()) { override def getPartitions = Array[Partition](StubPartition(0)) override def compute(split: Partition, context: TaskContext) = { -context.addTaskCompletionListener(context => TaskContextSuite.completed = true) +context.addTaskCompletionListener(new TaskCompletionListener { --- End diff -- I think that you can avoid source incompatibilities for Scala users by removing the overloads which accept Scala functions and then adding in a package-level implicit conversion to convert from Scala functions back into our own custom trait / interface. The trickiness here is that you need to preserve binary compatibility on Scala 2.10/2.11, so the removal of the overload needs to be done conditionally so that it only occurs when building with Scala 2.12. Rather than having a separate source tree for 2.12, I'd propose defining the removed overload in a mixin trait which comes from a separate source file and then configure the build to use different versions of that file for 2.10/2.11 and for 2.12. For details on this proposal, see https://docs.google.com/document/d/1P_wmH3U356f079AYgSsN53HKixuNdxSEvo8nw_tgLgM/edit, a document that I wrote in March 2016 which explores these source incompatibility difficulties. Applying that idea here, the idea would be to remove the method ``` def addTaskCompletionListener(f: (TaskContext) => Unit) ``` and add a package-level implicit conversion from `TaskContext => Unit` to `TaskCompletionListener`, but to do this only in the 2.12 source tree / shim. This approach has some caveats and could potentially impact Java users who are doing weird things (violating the goal that Java Spark code is source and binary compatible with all Scala versions). See the linked doc for a full discussion of this problem. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18662: [SPARK-21444] Be more defensive when removing broadcasts...
Github user JoshRosen commented on the issue: https://github.com/apache/spark/pull/18662 Merged to master. Thanks for the quick reviews. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18662: [SPARK-21444] Be more defensive when removing bro...
GitHub user JoshRosen opened a pull request: https://github.com/apache/spark/pull/18662 [SPARK-21444] Be more defensive when removing broadcasts in MapOutputTracker ## What changes were proposed in this pull request? In SPARK-21444, @sitalkedia reported an issue where the `Broadcast.destroy()` call in `MapOutputTracker`'s `ShuffleStatus.invalidateSerializedMapOutputStatusCache()` was failing with an `IOException`, causing the DAGScheduler to crash and bring down the entire driver. This is a bug introduced by #17955. In the old code, we removed a broadcast variable by calling `BroadcastManager.unbroadcast` with `blocking=false`, but the new code simply calls `Broadcast.destroy()` which is capable of failing with an IOException in case certain blocking RPCs time out. The fix implemented here is to replace this with a call to `destroy(blocking = false)` and to wrap the entire operation in `Utils.tryLogNonFatalError`. ## How was this patch tested? I haven't written regression tests for this because it's really hard to inject mocks to simulate RPC failures here. Instead, this class of issue is probably best uncovered with more generalized error injection / network unreliability / fuzz testing tools. You can merge this pull request into a Git repository by running: $ git pull https://github.com/JoshRosen/spark SPARK-21444 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/18662.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #18662 commit a5ebcac4ceb14eb8342ce085965b370186b4aba9 Author: Josh Rosen <joshro...@databricks.com> Date: 2017-07-17T23:56:45Z Use blocking=false and add try logging. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17150: [SPARK-19810][BUILD][CORE] Remove support for Scala 2.10
Github user JoshRosen commented on the issue: https://github.com/apache/spark/pull/17150 @srowen, I'll disable those master branch 2.10 jobs and will update scripts to prevent them from being redeployed by accident. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18476: [SPARK-20858][DOC][MINOR] Document ListenerBus ev...
Github user JoshRosen commented on a diff in the pull request: https://github.com/apache/spark/pull/18476#discussion_r125149417 --- Diff: docs/configuration.md --- @@ -1398,6 +1398,15 @@ Apart from these, the following properties are also available, and may be useful + spark.scheduler.listenerbus.eventqueue.size --- End diff -- If you're only documenting this in master then please use `spark.scheduler.listenerbus.eventqueue.capacity` instead (see definition in code and git blame for explanation). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18467: [SPARK-21253][Core]Disable spark.reducer.maxReqSi...
Github user JoshRosen commented on a diff in the pull request: https://github.com/apache/spark/pull/18467#discussion_r124945147 --- Diff: core/src/main/scala/org/apache/spark/internal/config/package.scala --- @@ -326,7 +326,7 @@ package object config { .doc("The blocks of a shuffle request will be fetched to disk when size of the request is " + "above this threshold. This is to avoid a giant request takes too much memory.") .bytesConf(ByteUnit.BYTE) - .createWithDefaultString("200m") --- End diff -- Might want to put a `.internal()` here. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org