[GitHub] spark issue #22320: [SPARK-25313][SQL]Fix regression in FileFormatWriter out...

2018-09-03 Thread JoshRosen
Github user JoshRosen commented on the issue:

https://github.com/apache/spark/pull/22320
  
(This is a test comment to test a GitHub Integration; please ignore)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21559: [SPARK-24525][SS] Provide an option to limit number of r...

2018-06-13 Thread JoshRosen
Github user JoshRosen commented on the issue:

https://github.com/apache/spark/pull/21559
  
jenkins add to whitelist


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21481: [SPARK-24452][SQL][Core] Avoid possible overflow in int ...

2018-06-10 Thread JoshRosen
Github user JoshRosen commented on the issue:

https://github.com/apache/spark/pull/21481
  
Let's merge this as-is and do the build improvements in a separate PR. 
That's important because we may want to backport the overflow fix to 
maintenance branches and may want to do so independent of the build changes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21346: [SPARK-6237][NETWORK] Network-layer changes to al...

2018-06-02 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/21346#discussion_r192566116
  
--- Diff: 
common/network-common/src/main/java/org/apache/spark/network/client/TransportClient.java
 ---
@@ -141,26 +141,14 @@ public void fetchChunk(
 StreamChunkId streamChunkId = new StreamChunkId(streamId, chunkIndex);
 handler.addFetchRequest(streamChunkId, callback);
 
-channel.writeAndFlush(new 
ChunkFetchRequest(streamChunkId)).addListener(future -> {
--- End diff --

Thanks for explaining. I guess the re-ordering of `channel.close()` and the 
`handler` operations is safe because the handler doesn't hold references to the 
channel / otherwise does not interact with it (and doesn't hold references to 
objects tied to channel lifecycle (like buffers))?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21346: [SPARK-6237][NETWORK] Network-layer changes to al...

2018-06-02 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/21346#discussion_r192565980
  
--- Diff: 
common/network-common/src/main/java/org/apache/spark/network/server/RpcHandler.java
 ---
@@ -38,15 +38,24 @@
*
* This method will not be called in parallel for a single 
TransportClient (i.e., channel).
*
+   * The rpc *might* included a data stream in streamData 
(eg. for uploading a large
+   * amount of data which should not be buffered in memory here).  Any 
errors while handling the
+   * streamData will lead to failing this entire connection -- all other 
in-flight rpcs will fail.
--- End diff --

I'm trying to think through whether we'll risk introducing any weird new 
failure modes (or increasing the occurrence of existing-but-improbable failure 
modes). For example, causing in-flight RPCs to fail could surface latent RPC 
timeout issues: if we have a timeout which is way too long and we drop 
in-flight responses on the floor without sending back negative ACKs then we 
could see (finite but potentially long) hangs.

On the other hand, this pathway is used for executor <-> executor transfers 
and generally not executor <-> driver transfers, so my understanding is that 
failures in this channel generally won't impact control RPCs.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21346: [SPARK-6237][NETWORK] Network-layer changes to al...

2018-06-02 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/21346#discussion_r192565530
  
--- Diff: 
common/network-common/src/main/java/org/apache/spark/network/protocol/UploadStream.java
 ---
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.protocol;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import com.google.common.base.Objects;
+import io.netty.buffer.ByteBuf;
+
+import org.apache.spark.network.buffer.ManagedBuffer;
+import org.apache.spark.network.buffer.NettyManagedBuffer;
+
+/**
+ * An RPC with data that is sent outside of the frame, so it can be read 
as a stream.
+ */
+public final class UploadStream extends AbstractMessage implements 
RequestMessage {
+  /** Used to link an RPC request with its response. */
+  public final long requestId;
+  public final ManagedBuffer meta;
+  public final long bodyByteCount;
+
+  public UploadStream(long requestId, ManagedBuffer meta, ManagedBuffer 
body) {
+super(body, false); // body is *not* included in the frame
+this.requestId = requestId;
+this.meta = meta;
+bodyByteCount = body.size();
+  }
+
+  // this version is called when decoding the bytes on the receiving end.  
The body is handled
+  // separately.
+  private UploadStream(long requestId, ManagedBuffer meta, long 
bodyByteCount) {
+super(null, false);
+this.requestId = requestId;
+this.meta = meta;
+this.bodyByteCount = bodyByteCount;
+  }
+
+  @Override
+  public Type type() { return Type.UploadStream; }
+
+  @Override
+  public int encodedLength() {
+// the requestId, meta size, meta and bodyByteCount (body is not 
included)
+return 8 + 4 + ((int) meta.size()) + 8;
+  }
+
+  @Override
+  public void encode(ByteBuf buf) {
+buf.writeLong(requestId);
+try {
+  ByteBuffer metaBuf = meta.nioByteBuffer();
+  buf.writeInt(metaBuf.remaining());
+  buf.writeBytes(metaBuf);
+} catch (IOException io) {
+  throw new RuntimeException(io);
+}
+buf.writeLong(bodyByteCount);
+  }
+
+  public static UploadStream decode(ByteBuf buf) {
+long requestId = buf.readLong();
+int metaSize = buf.readInt();
+ManagedBuffer meta = new 
NettyManagedBuffer(buf.readRetainedSlice(metaSize));
+long bodyByteCount = buf.readLong();
+// This is called by the frame decoder, so the data is still null.  We 
need a StreamInterceptor
+// to read the data.
+return new UploadStream(requestId, meta, bodyByteCount);
+  }
+
+  @Override
+  public int hashCode() {
+return Objects.hashCode(requestId, body());
+  }
+
+  @Override
+  public boolean equals(Object other) {
+if (other instanceof UploadStream) {
+  UploadStream o = (UploadStream) other;
+  return requestId == o.requestId && super.equals(o);
+}
+return false;
+  }
+
+  @Override
+  public String toString() {
+return Objects.toStringHelper(this)
+  .add("requestId", requestId)
+  .add("body", body())
--- End diff --

I'm not actually sure. I wonder if this is a latent problem in the old code 
waiting to happen in case we turn on trace logging. We can probably investigate 
that separately, but just wanted to note it since it seemed a little dodgy.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21481: [SPARK-24452][SQL][Core] Avoid possible overflow in int ...

2018-06-02 Thread JoshRosen
Github user JoshRosen commented on the issue:

https://github.com/apache/spark/pull/21481
  
Hey @kiszk, thanks for tracking this down. This change looks good to me.

I have a couple of questions, mostly aimed towards figuring out how we can 
categorically solve this problem:

1. What's the impact of this issue? Have you observed actual crashes or 
silent data corruption caused by this problem? I ask because it looks like this 
could be a plausible cause of a data corruption bug that I've been 
investigating.
2. Are these five files the only occurrence of this problem or are there 
potentially others? I've noticed that all of the files modified here are 
`.java` files: is that a coincidence or is that the result of running some Java 
linting tool? If the latter, is it possible that we have similar issues in 
other files which we haven't found yet?
3. Do you have any ideas for how we can categorically prevent this class of 
problem in the future? Are there compiler plugins or linters which can warn on 
these cases and turn them into compile-time errors? Or code-review checklists 
that we can employ to more easily spot these potential overflows? This is a 
hard problem, but I think it's worth brainstorming on categorical solutions 
since this isn't the first time we've hit this class of problem (but hopefully 
it can be the last!).

I think we should definitely backport this fix, at least to 2.3 and 
possibly earlier.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21390: [SPARK-24340][Core] Clean up non-shuffle disk block mana...

2018-06-02 Thread JoshRosen
Github user JoshRosen commented on the issue:

https://github.com/apache/spark/pull/21390
  
Feel free to do the TTL in a followup. My feeling is that it won't be super 
useful in practice, though:

1. Cleanup of non-shuffle disk block manager files following executor exit 
only really matters for super-long-running applications. For short-running 
applications, you can just remove the entire application directory via the 
existing TTL cleaner mechanism.
2. If production jobs would fail with this change due to user code relying 
on undocumented internal behavior then I think the right solution is to disable 
this cleanup completely vs. putting it on a TTL. We've tried TTL-based cleanup 
before in the predecessor to the ContextCleaner and it was a huge source of 
user issues / JIRA tickets in cases where the cleanup was happening too soon 
(but not immediately, e.g. a 20 minute delay).
3. If you want this feature only for debugging (e.g. manual inspection of 
the contents of spill files) then I again image that you probably want an 
infinite timeout. Let's say I have a hard-to-reproduce production failure and 
I'd like to debug from the production repro by looking at spill files. In that 
case, the problem could occur at any hour, possibly when I'm asleep, so if I 
want the files to stick around long enough for a human to look at them then 
that could be several hours (possibly days in case we're running something over 
a weekend) and I feel like at a certain point a large timeout might as well 
become infinite. 

Feel free to push back if you have a concrete use case where TTL-based 
cleanup of this specific file category is preferable to the binary on/off 
option implemented here. I'm just worried that it will be a lot of additional 
work to implement and will be harder to reason about (while offering relatively 
little additional marginal benefit compared to the simple "right after executor 
exit" approach).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21346: [SPARK-6237][NETWORK] Network-layer changes to allow str...

2018-05-30 Thread JoshRosen
Github user JoshRosen commented on the issue:

https://github.com/apache/spark/pull/21346
  
Summary of key changes (WIP; notes to self):


> Summary of changes:
> 
> - Introduce a new `UploadStream` RPC which is sent to push a large 
payload as a stream (in contrast, the pre-existing `StreamRequest` and 
`StreamResponse` RPCs are used for pull-based  streaming).
> - Generalize `RpcHandler.receive()` to support requests which contain 
streams. 
> - Generalize `StreamInterceptor` to handle both request and response 
messages (previously it only handled responses).
> - Introduce `StdChannelListener` to abstract away common logging logic in 
`ChannelFuture` listeners.

Question: is this effectively dead code at this point? In other words, this 
PR just adds the lower-level pieces but there's nothing currently using the new 
API? So this patch as of now has no behavior change and actual functional 
changes impacting queries / actual usage will come later when we wire this up 
to the block replicator?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21346: [SPARK-6237][NETWORK] Network-layer changes to al...

2018-05-30 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/21346#discussion_r191964329
  
--- Diff: project/MimaExcludes.scala ---
@@ -36,6 +36,9 @@ object MimaExcludes {
 
   // Exclude rules for 2.4.x
   lazy val v24excludes = v23excludes ++ Seq(
+// [SPARK-6237][NETWORK] Network-layer changes to allow stream upload
+
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.network.netty.NettyBlockRpcServer.receive"),
--- End diff --

I suspect that it's because we might want to access these across Java 
package boundaries and Java doesn't have the equivalent of Scala's nested 
package scoped `private[package]`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21346: [SPARK-6237][NETWORK] Network-layer changes to al...

2018-05-30 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/21346#discussion_r191941962
  
--- Diff: 
common/network-common/src/main/java/org/apache/spark/network/server/StreamData.java
 ---
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.server;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.spark.network.client.RpcResponseCallback;
+import org.apache.spark.network.client.StreamCallback;
+import org.apache.spark.network.client.StreamInterceptor;
+import org.apache.spark.network.util.TransportFrameDecoder;
+
+/**
+ * A holder for streamed data sent along with an RPC message.
+ */
+public class StreamData {
+
+  private final TransportRequestHandler handler;
+  private final TransportFrameDecoder frameDecoder;
+  private final RpcResponseCallback rpcCallback;
+  private final ByteBuffer meta;
--- End diff --

It looks like this field is not actually used in the current 
implementation. Is that intentional?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21346: [SPARK-6237][NETWORK] Network-layer changes to al...

2018-05-30 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/21346#discussion_r191941503
  
--- Diff: 
common/network-common/src/main/java/org/apache/spark/network/server/RpcHandler.java
 ---
@@ -38,15 +38,24 @@
*
* This method will not be called in parallel for a single 
TransportClient (i.e., channel).
*
+   * The rpc *might* included a data stream in streamData 
(eg. for uploading a large
+   * amount of data which should not be buffered in memory here).  Any 
errors while handling the
+   * streamData will lead to failing this entire connection -- all other 
in-flight rpcs will fail.
--- End diff --

Perhaps naive question: what are the implications of this? Is this 
referring to a scenario where we've multiplexed multiple asynchronous requests 
/ responses over a single network connection? I think I understand _why_ the 
failure mode is as stated (we're worried about leaving non-consumed leftover 
data in the channel) but I just wanted to ask about the implications of failing 
other in-flight RPCs.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21346: [SPARK-6237][NETWORK] Network-layer changes to al...

2018-05-30 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/21346#discussion_r191940304
  
--- Diff: 
common/network-common/src/main/java/org/apache/spark/network/protocol/UploadStream.java
 ---
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.protocol;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import com.google.common.base.Objects;
+import io.netty.buffer.ByteBuf;
+
+import org.apache.spark.network.buffer.ManagedBuffer;
+import org.apache.spark.network.buffer.NettyManagedBuffer;
+
+/**
+ * An RPC with data that is sent outside of the frame, so it can be read 
as a stream.
+ */
+public final class UploadStream extends AbstractMessage implements 
RequestMessage {
+  /** Used to link an RPC request with its response. */
+  public final long requestId;
+  public final ManagedBuffer meta;
+  public final long bodyByteCount;
+
+  public UploadStream(long requestId, ManagedBuffer meta, ManagedBuffer 
body) {
+super(body, false); // body is *not* included in the frame
+this.requestId = requestId;
+this.meta = meta;
+bodyByteCount = body.size();
+  }
+
+  // this version is called when decoding the bytes on the receiving end.  
The body is handled
+  // separately.
+  private UploadStream(long requestId, ManagedBuffer meta, long 
bodyByteCount) {
+super(null, false);
+this.requestId = requestId;
+this.meta = meta;
+this.bodyByteCount = bodyByteCount;
+  }
+
+  @Override
+  public Type type() { return Type.UploadStream; }
+
+  @Override
+  public int encodedLength() {
+// the requestId, meta size, meta and bodyByteCount (body is not 
included)
+return 8 + 4 + ((int) meta.size()) + 8;
+  }
+
+  @Override
+  public void encode(ByteBuf buf) {
+buf.writeLong(requestId);
+try {
+  ByteBuffer metaBuf = meta.nioByteBuffer();
+  buf.writeInt(metaBuf.remaining());
+  buf.writeBytes(metaBuf);
+} catch (IOException io) {
+  throw new RuntimeException(io);
+}
+buf.writeLong(bodyByteCount);
+  }
+
+  public static UploadStream decode(ByteBuf buf) {
+long requestId = buf.readLong();
+int metaSize = buf.readInt();
+ManagedBuffer meta = new 
NettyManagedBuffer(buf.readRetainedSlice(metaSize));
+long bodyByteCount = buf.readLong();
+// This is called by the frame decoder, so the data is still null.  We 
need a StreamInterceptor
+// to read the data.
+return new UploadStream(requestId, meta, bodyByteCount);
+  }
+
+  @Override
+  public int hashCode() {
+return Objects.hashCode(requestId, body());
+  }
+
+  @Override
+  public boolean equals(Object other) {
+if (other instanceof UploadStream) {
+  UploadStream o = (UploadStream) other;
+  return requestId == o.requestId && super.equals(o);
+}
+return false;
+  }
+
+  @Override
+  public String toString() {
+return Objects.toStringHelper(this)
+  .add("requestId", requestId)
+  .add("body", body())
--- End diff --

Similar question here about whether `body()` is useful in this context: 
will this actually end up printing buffer contents, which are potentially huge? 
Or will it do something reasonable and print only the buffer type?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21346: [SPARK-6237][NETWORK] Network-layer changes to al...

2018-05-30 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/21346#discussion_r191939431
  
--- Diff: 
common/network-common/src/main/java/org/apache/spark/network/protocol/UploadStream.java
 ---
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.protocol;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import com.google.common.base.Objects;
+import io.netty.buffer.ByteBuf;
+
+import org.apache.spark.network.buffer.ManagedBuffer;
+import org.apache.spark.network.buffer.NettyManagedBuffer;
+
+/**
+ * An RPC with data that is sent outside of the frame, so it can be read 
as a stream.
+ */
+public final class UploadStream extends AbstractMessage implements 
RequestMessage {
+  /** Used to link an RPC request with its response. */
+  public final long requestId;
+  public final ManagedBuffer meta;
+  public final long bodyByteCount;
+
+  public UploadStream(long requestId, ManagedBuffer meta, ManagedBuffer 
body) {
+super(body, false); // body is *not* included in the frame
+this.requestId = requestId;
+this.meta = meta;
+bodyByteCount = body.size();
+  }
+
+  // this version is called when decoding the bytes on the receiving end.  
The body is handled
+  // separately.
+  private UploadStream(long requestId, ManagedBuffer meta, long 
bodyByteCount) {
+super(null, false);
+this.requestId = requestId;
+this.meta = meta;
+this.bodyByteCount = bodyByteCount;
+  }
+
+  @Override
+  public Type type() { return Type.UploadStream; }
+
+  @Override
+  public int encodedLength() {
+// the requestId, meta size, meta and bodyByteCount (body is not 
included)
+return 8 + 4 + ((int) meta.size()) + 8;
+  }
+
+  @Override
+  public void encode(ByteBuf buf) {
+buf.writeLong(requestId);
+try {
+  ByteBuffer metaBuf = meta.nioByteBuffer();
+  buf.writeInt(metaBuf.remaining());
+  buf.writeBytes(metaBuf);
+} catch (IOException io) {
+  throw new RuntimeException(io);
+}
+buf.writeLong(bodyByteCount);
+  }
+
+  public static UploadStream decode(ByteBuf buf) {
+long requestId = buf.readLong();
+int metaSize = buf.readInt();
+ManagedBuffer meta = new 
NettyManagedBuffer(buf.readRetainedSlice(metaSize));
+long bodyByteCount = buf.readLong();
+// This is called by the frame decoder, so the data is still null.  We 
need a StreamInterceptor
+// to read the data.
+return new UploadStream(requestId, meta, bodyByteCount);
+  }
+
+  @Override
+  public int hashCode() {
+return Objects.hashCode(requestId, body());
--- End diff --

The `equals()` and `hashCode()` implementations of this `UploadStream` 
class appear to differ slightly: the `equals()` method only checks equality of 
the `requestIds`, whereas this hashCode is checking both the `requestId` and 
the `body()`. I'm not sure what a `ManagedBuffer`'s `hashCode()` is: the 
`hashCode()` might not depend on the buffer contents, in which case this could 
lead to false hashCode mismatches for equal requests. Should we use just 
`requestId` here instead?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21346: [SPARK-6237][NETWORK] Network-layer changes to al...

2018-05-30 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/21346#discussion_r191938203
  
--- Diff: 
common/network-common/src/main/java/org/apache/spark/network/client/TransportClient.java
 ---
@@ -141,26 +141,14 @@ public void fetchChunk(
 StreamChunkId streamChunkId = new StreamChunkId(streamId, chunkIndex);
 handler.addFetchRequest(streamChunkId, callback);
 
-channel.writeAndFlush(new 
ChunkFetchRequest(streamChunkId)).addListener(future -> {
--- End diff --

Are the changes to these `.addListener()` calls primarily cleanup / 
refactoring? Is the intent to reduce the amount of _new_ duplicate code which 
would otherwise be added to `uploadStream` in this file?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21346: [SPARK-6237][NETWORK] Network-layer changes to al...

2018-05-30 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/21346#discussion_r191935821
  
--- Diff: 
common/network-common/src/main/java/org/apache/spark/network/client/StreamInterceptor.java
 ---
@@ -50,16 +52,22 @@
 
   @Override
   public void exceptionCaught(Throwable cause) throws Exception {
-handler.deactivateStream();
+deactivateStream();
 callback.onFailure(streamId, cause);
   }
 
   @Override
   public void channelInactive() throws Exception {
-handler.deactivateStream();
+deactivateStream();
 callback.onFailure(streamId, new ClosedChannelException());
   }
 
+  private void deactivateStream() {
+if (handler instanceof TransportResponseHandler) {
--- End diff --

Why don't we need to do this for `TransportRequestHandler`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21346: [SPARK-6237][NETWORK] Network-layer changes to allow str...

2018-05-30 Thread JoshRosen
Github user JoshRosen commented on the issue:

https://github.com/apache/spark/pull/21346
  
I'm going to be starting a more detailed review pass on this now and will 
be getting caught back up with the discussion that's happened so far.

One high-level point I'd like to keep in mind: what are the major risks of 
this change in terms of introducing performance or correctness issues? If we 
identify risks (e.g. "this is a historically tricky area of code?"), can we 
mitigate those risks through correctness testing /  load testing?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21437: [SPARK-24397][PYSPARK] Added TaskContext.getLocal...

2018-05-30 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/21437#discussion_r191881226
  
--- Diff: python/pyspark/taskcontext.py ---
@@ -88,3 +89,9 @@ def taskAttemptId(self):
 TaskAttemptID.
 """
 return self._taskAttemptId
+
+def getLocalProperty(self, key):
+"""
+Get a local property set upstream in the driver, or None if it is 
missing.
--- End diff --

FWIW we can always evolve what you have here towards the 
`getLocalProperty(key, default=None)` case; that evolution would maintain 
behavior and source compatibility. Therefore maybe it's fine to defer that 
until later if we're not sure whether that variant is useful.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21437: [SPARK-24397][PYSPARK] Added TaskContext.getLocal...

2018-05-30 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/21437#discussion_r191877900
  
--- Diff: python/pyspark/taskcontext.py ---
@@ -88,3 +89,9 @@ def taskAttemptId(self):
 TaskAttemptID.
 """
 return self._taskAttemptId
+
+def getLocalProperty(self, key):
+"""
+Get a local property set upstream in the driver, or None if it is 
missing.
--- End diff --

The Java / Scala equivalents of this API return `null` for missing keys, so 
on the one hand returning `None` is kinda consistent with that.

On the other hand, consider a case where you want to specify an alternative 
in case a key is not set:

With this API, you might think of doing something like 
`tc.getLocalProperty('key') or 'defaultValue'`, which potentially could be a 
problem in case a non-None key could have a `False`-y value. I suppose we're 
only dealing with strings here, though, and that'd only happen for empty 
strings. If we allowed non-strings to be returned here, though, then we'd have 
problems if we're returning values like `0`. For that case, having a 
`getLocalProperty('key', 'defaultValue')` is a bit more useful.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21390: [SPARK-24340][Core] Clean up non-shuffle disk block mana...

2018-05-25 Thread JoshRosen
Github user JoshRosen commented on the issue:

https://github.com/apache/spark/pull/21390
  
Yeah, this is only concerned with non-shuffle files which are located in 
the block manager temp directories (e.g. large sorter spill files).

There is a related issue where shuffle files can be leaked indefinitely 
following executor death because the external shuffle service is never directly 
told that shuffles are safe to remove (the context cleaner sends RPCs to 
executors and executors clean up their own shuffle files). That issue is 
substantially harder to fix, though, since it likely requires protocol changes 
to the shuffle service or an inversion-of-control where the shuffle service can 
periodically ask the driver "do any of these shuffle IDs correspond to cleaned 
shuffles?". As a result, I think the strategy here is to decompose that disk 
leak into two separate sets of fixes, where this patch is concerned with the 
simpler case of non-shuffle files (we'll defer the more complex case to a 
separate PR because it requires a lot more design).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21390: [SPARK-24340][Core] Clean up non-shuffle disk blo...

2018-05-24 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/21390#discussion_r190705466
  
--- Diff: core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala 
---
@@ -97,6 +99,10 @@ private[deploy] class Worker(
   private val APP_DATA_RETENTION_SECONDS =
 conf.getLong("spark.worker.cleanup.appDataTtl", 7 * 24 * 3600)
 
+  // Whether or not cleanup the non-shuffle files on executor death.
+  private val CLEANUP_NON_SHUFFLE_FILES_ENABLED =
+conf.getBoolean("spark.storage.cleanupFilesAfterExecutorDeath", true)
--- End diff --

@jerryshao, that's a good point; I prefer your suggested `*Exit` name.

If executors exit in a healthy / controlled fashion then we don't expect 
non-shuffle files to be leaked, but the cleanup logic still will be run.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21383: [SPARK-23754][Python] Re-raising StopIteration in...

2018-05-23 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/21383#discussion_r190438434
  
--- Diff: python/pyspark/shuffle.py ---
@@ -67,6 +67,19 @@ def get_used_memory():
 return 0
 
 
+def safe_iter(f):
+""" wraps f to make it safe (= does not lead to data loss) to use 
inside a for loop
+make StopIteration's raised inside f explicit
+"""
+def wrapper(*args, **kwargs):
+try:
+return f(*args, **kwargs)
+except StopIteration as exc:
+raise RuntimeError('StopIteration in client code', exc)
--- End diff --

What about `Caught StopIteration thrown from user's code; failing the task` 
in order to make it clear that it's expected that the exception bubbles and 
fails the task?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21342: [SPARK-24294] Throw SparkException when OOM in Broadcast...

2018-05-23 Thread JoshRosen
Github user JoshRosen commented on the issue:

https://github.com/apache/spark/pull/21342
  
Updated changes LGTM. Thanks for working on this!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21390: [SPARK-24340][Core] Clean up non-shuffle disk blo...

2018-05-22 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/21390#discussion_r190105740
  
--- Diff: core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala 
---
@@ -97,6 +97,10 @@ private[deploy] class Worker(
   private val APP_DATA_RETENTION_SECONDS =
 conf.getLong("spark.worker.cleanup.appDataTtl", 7 * 24 * 3600)
 
+  // Whether or not cleanup the non-shuffle files on executor finishes.
+  private val CLEANUP_NON_SHUFFLE_FILES_ENABLED =
+conf.getBoolean("spark.worker.cleanup.nonShuffleFiles.enabled", true)
--- End diff --

I think these configurations might actually be independent:

- `spark.worker.cleanup.enabled` controls whether directories from finished 
applications (i.e. terminated drivers) are cleaned up is a decision which can 
impact users' ability to debug failed applications. In environments where 
clusters / underlying physical workers are ephemeral this configuration tends 
to matter a little less, but in environments with long-lived machines which 
host many Spark drivers over a long period of time (e.g. on prem) this 
configuration can matter. For example, I think that setting 
`spark.worker.cleanup.enabled = true` could mean that completely applications' 
executor  logs get purged from the underlying host machine after a time period.
- The new configuration being proposed here generally matters only _within 
a long-running Spark application_ and is kind of orthogonal to the choice to 
keep or discard failed applications' executor directories. I think that we're 
only removing files within block managers' nested folders, right? So there 
should be no user-controlled / user-generated files here, at least in theory. 
Therefore the impact of this configuration on ordinary users' ability to debug 
is likely to be small (I think you'd only ever really want access to the files 
that we're deleting if you're going to hex-dump files to debug corruption 
issues which trigger segfaults, an unlikely scenario).

Given the kind of different motivations and lifecycles for these cleanup 
mechanisms and the motivations for different choices of defaults, I'm thinking 
that it might make sense to rename the new configuration so that we don't 
confuse users via similar names.

What do you think about something like 
`spark.storage.removeNonShuffleBlockManagerFilesAfterExecutorDeath`? Really 
verbose, but a precise description of what's going on. I'm open to better name 
suggestions (since I tend to pick overly-complicated ones); this is just a 
conversation-starting strawman.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21390: [SPARK-24340][Core] Clean up non-shuffle disk blo...

2018-05-22 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/21390#discussion_r190104118
  
--- Diff: core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala 
---
@@ -732,6 +736,9 @@ private[deploy] class Worker(
   trimFinishedExecutorsIfNecessary()
   coresUsed -= executor.cores
   memoryUsed -= executor.memory
+  if (CLEANUP_NON_SHUFFLE_FILES_ENABLED) {
+
shuffleService.executorRemoved(executorStateChanged.execId.toString, appId)
--- End diff --

Nah, no need to rename since this matches existing convention. I was just 
thinking aloud here / explaining the reasoning for other future readers.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21383: [SPARK-23754][Python] Re-raising StopIteration in...

2018-05-22 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/21383#discussion_r189967162
  
--- Diff: python/pyspark/shuffle.py ---
@@ -67,6 +67,19 @@ def get_used_memory():
 return 0
 
 
+def safe_iter(f):
+""" wraps f to make it safe (= does not lead to data loss) to use 
inside a for loop
+make StopIteration's raised inside f explicit
+"""
+def wrapper(*args, **kwargs):
+try:
+return f(*args, **kwargs)
+except StopIteration as exc:
+raise RuntimeError('StopIteration in client code', exc)
--- End diff --

What about `user code`? That's a bit clearer, making it clear that the 
exception originates in code that was not part of Spark itself but, instead, 
was written by a user of the Spark framework? Words like `client` and 
`consumer` could be confusing because both are used in other more precise 
technical clients within our codebase (e.g. client deploy mode, streaming 
consumer).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21311: [SPARK-24257][SQL]LongToUnsafeRowMap calculate the new s...

2018-05-22 Thread JoshRosen
Github user JoshRosen commented on the issue:

https://github.com/apache/spark/pull/21311
  
@cxzl25, to clarify:

> Some data is lost and the data read out is dirty

To clarify, is this a potential cause of a wrong-answer correctness bug? If 
so, we should be sure to backport the resulting fix to maintenance branches. 
/cc @cloud-fan @gatorsmile 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21383: [SPARK-23754][Python] Re-raising StopIteration in client...

2018-05-22 Thread JoshRosen
Github user JoshRosen commented on the issue:

https://github.com/apache/spark/pull/21383
  
jenkins this is ok to test


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21383: [SPARK-23754][Python] Re-raising StopIteration in...

2018-05-22 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/21383#discussion_r189818845
  
--- Diff: python/pyspark/shuffle.py ---
@@ -67,6 +67,19 @@ def get_used_memory():
 return 0
 
 
+def safe_iter(f):
+""" wraps f to make it safe (= does not lead to data loss) to use 
inside a for loop
--- End diff --

It sounds like this is a potential correctness issue, so the eventual fix 
for this should be backported to maintenance releases (at least the most recent 
ones and the next 2.3.x).

I saw the examples provided on the linked JIRAs, but do you have an example 
of a realistic user workload where this problem can occur (i.e. a case where 
the problem is more subtle than explicitly throwing `StopIteration()`)? Would 
that be something like calling `next()` past the end of an iterator (which I 
suppose could occur deep in library code)?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21390: [SPARK-24340][Core] Clean up non-shuffle disk blo...

2018-05-22 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/21390#discussion_r189816085
  
--- Diff: 
common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/NonShuffleFilesCleanupSuite.java
 ---
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.shuffle;
+
+import java.io.File;
+import java.io.FilenameFilter;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.Random;
+import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import com.google.common.util.concurrent.MoreExecutors;
+import org.junit.Test;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import org.apache.spark.network.util.MapConfigProvider;
+import org.apache.spark.network.util.TransportConf;
+
+public class NonShuffleFilesCleanupSuite {
--- End diff --

Do we need a test to check that we preserve the old behavior in case the 
new configuration is set to `false`? An end-to-end test will likely be prone to 
flakiness, so instead maybe we could somehow test that `shuffleService. 
executorRemoved()` is _not_ called if the configuration is `false`. One way to 
do that would be to move the construction of `new ExternalShuffleService` from 
the default constructor of `Worker` into its public constructor and then inject 
it in the `new Worker` call. This, in turn, would let you inject either a mock 
or spy in order to verify call counts. Do you know if we have this style of 
test for other `Worker` functionality? Is this a ton of work or is it 
relatively simple to do?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21390: [SPARK-24340][Core] Clean up non-shuffle disk blo...

2018-05-22 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/21390#discussion_r189813626
  
--- Diff: core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala 
---
@@ -97,6 +97,10 @@ private[deploy] class Worker(
   private val APP_DATA_RETENTION_SECONDS =
 conf.getLong("spark.worker.cleanup.appDataTtl", 7 * 24 * 3600)
 
+  // Whether or not cleanup the non-shuffle files on executor finishes.
+  private val CLEANUP_NON_SHUFFLE_FILES_ENABLED =
+conf.getBoolean("spark.worker.cleanup.nonShuffleFiles.enabled", true)
--- End diff --

Is there potential confusion from the fact that 
`spark.worker.cleanup.nonShuffleFiles.enabled`'s effects are not controlled by 
`spark.worker.cleanup.enabled`? Should they be? The 
`spark.worker.cleanup.enabled` configuration only seems to be dealing with the 
cleanup of completed applications' directories, whereas this is dealing with 
cleanup from completed executors whose application continues running (likely 
with new executors).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21390: [SPARK-24340][Core] Clean up non-shuffle disk blo...

2018-05-22 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/21390#discussion_r189813156
  
--- Diff: core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala 
---
@@ -732,6 +736,9 @@ private[deploy] class Worker(
   trimFinishedExecutorsIfNecessary()
   coresUsed -= executor.cores
   memoryUsed -= executor.memory
+  if (CLEANUP_NON_SHUFFLE_FILES_ENABLED) {
+
shuffleService.executorRemoved(executorStateChanged.execId.toString, appId)
--- End diff --

Re-reading this code with fresh eyes made me realize that maybe the method 
naming here is a bit confusing: from the name alone, it sounds like 
`executorRemoved` is a notification-sending method, the kind of thing where 
you'd always want to call it when the corresponding event (executor removal) 
occurs and then leave it up to the method implementation to decide whether to 
actually carry out specific cleanup logic.

On the other hand, we call `shuffleService.applicationRemoved()` in 
`maybeCleanupApplication()` and there's a flag in that method controlling 
whether we actually clean up, so we have a similar pattern there.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21390: [SPARK-24340][Core] Clean up non-shuffle disk blo...

2018-05-22 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/21390#discussion_r189809797
  
--- Diff: core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala 
---
@@ -97,6 +97,10 @@ private[deploy] class Worker(
   private val APP_DATA_RETENTION_SECONDS =
 conf.getLong("spark.worker.cleanup.appDataTtl", 7 * 24 * 3600)
 
+  // Whether or not cleanup the non-shuffle files on executor finishes.
+  private val CLEANUP_NON_SHUFFLE_FILES_ENABLED =
+conf.getBoolean("spark.worker.cleanup.nonShuffleFiles.enabled", true)
--- End diff --

Should we document this configuration at 
https://github.com/apache/spark/blob/master/docs/spark-standalone.md ?

I can't imagine a great reason for users wanting to disable this and 
believe that this is functioning primarily as a feature-flag to give us an 
escape-hatch in case of regressions, so maybe it's not a huge deal to leave it 
undocumented?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21390: [SPARK-24340][Core] Clean up non-shuffle disk blo...

2018-05-22 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/21390#discussion_r189809055
  
--- Diff: 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java
 ---
@@ -226,6 +246,29 @@ private void deleteExecutorDirs(String[] dirs) {
 }
   }
 
+  private FilenameFilter filter = new FilenameFilter() {
--- End diff --

Is it feasible to define this as a locale variable inside of 
`deleteNonShuffleFiles` in order to reduce its scope to be as small as possible 
and make it clear that this is only used within `deleteNonShuffleFiles`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21390: [SPARK-24340][Core] Clean up non-shuffle disk blo...

2018-05-22 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/21390#discussion_r189808705
  
--- Diff: 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java
 ---
@@ -211,6 +211,26 @@ public void applicationRemoved(String appId, boolean 
cleanupLocalDirs) {
 }
   }
 
+  /**
+   * Removes all the non-shuffle files in any local directories associated 
with the finished
+   * executor.
+   */
+  public void executorRemoved(String executorId, String appId) {
+logger.info("Clean up non-shuffle files associated with the finished 
executor {}", executorId);
+AppExecId fullId = new AppExecId(appId, executorId);
+final ExecutorShuffleInfo executor = executors.get(fullId);
+if (executor == null) {
+  // Executor not registered, skip clean up of the local directories.
+  logger.info("Executor is not registered (appId={}, execId={})", 
appId, executorId);
+} else {
+  logger.info("Cleaning up non-shuffle files in executor {}'s {} local 
dirs", fullId,
+  executor.localDirs.length);
+
+  // Execute the actual deletion in a different thread, as it may take 
some time.
--- End diff --

👍 , thanks for doing this in a separate thread. This indeed could take a 
_very_ long time under certain rare conditions.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21390: [SPARK-24340][Core] Clean up non-shuffle disk block mana...

2018-05-22 Thread JoshRosen
Github user JoshRosen commented on the issue:

https://github.com/apache/spark/pull/21390
  
Context for other reviewers: the issue addressed by this patch is actually 
a real issue in practice, especially for long-lived Spark clusters; I have seen 
this specific problem play a large contributing role to certain production 
out-of-disk-space failures.

One thing I'd like to note: as implemented here, this patch only addresses 
this problem for Spark's built-in "Standalone" cluster manager. @jiangxb1987, 
could you mention that limitation in the PR title and description? My personal 
preference is to proceed incrementally by merging this Standalone-only PR and 
and deferring support for other cluster managers to future PRs (perhaps from 
experts familiar with those other cluster managers).

I'll take a more detailed look tomorrow, but just wanted to provide 
motivation for other reviewers who might leave comments before then.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21342: [SPARK-24294] Throw SparkException when OOM in Broadcast...

2018-05-21 Thread JoshRosen
Github user JoshRosen commented on the issue:

https://github.com/apache/spark/pull/21342
  
Thanks for the updates. The net change / scope of changes have been 
significantly reduced here, so I feel that this change is a lot less risky now.

I left only one nitpicky comment at 
https://github.com/apache/spark/pull/21342/files#r189753488 worrying about 
potential future risks from people coming along and writing new code throwing 
`SparkFatalException` in a context where it can bubble up to the uncaught 
exception handler. If we want to be super defensive we could add some logic at 
https://github.com/apache/spark/blob/32447079e9d0fa9f7e180b94ecac19091b6af1ab/core/src/main/scala/org/apache/spark/util/SparkUncaughtExceptionHandler.scala#L42
 to also catch a `SparkFatalException` on top of an OOM and treat that as an 
OOM. Debatable if we want to do that, but it's a great way of addressing 
@gatorsmile's comment at 
https://github.com/apache/spark/pull/21342#discussion_r189627101 and avoids 
future breakage.

Otherwise, LGTM.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21342: [SPARK-24294] Throw SparkException when OOM in Br...

2018-05-21 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/21342#discussion_r189754203
  
--- Diff: 
core/src/main/scala/org/apache/spark/util/SparkFatalException.scala ---
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.util
+
+/**
+ * SPARK-24294: To bypass scala bug: 
https://github.com/scala/bug/issues/9554, we catch
+ * fatal throwable in {@link scala.concurrent.Future}'s body, and re-throw
+ * SparkFatalException, which wraps the fatal throwable inside.
+ */
+private[spark] final class SparkFatalException(val throwable: Throwable) 
extends Exception
--- End diff --

OTOH I guess we're actually only using this in one place right now, so I 
think things are correct as written, but I was just kind of abstractly worrying 
about potential future pitfalls in case people start using this pattern in new 
code without also noticing the `ThreadUtils.awayResult` requirement.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21342: [SPARK-24294] Throw SparkException when OOM in Br...

2018-05-21 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/21342#discussion_r189754010
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala
 ---
@@ -111,12 +112,18 @@ case class BroadcastExchangeExec(
   SQLMetrics.postDriverMetricUpdates(sparkContext, executionId, 
metrics.values.toSeq)
   broadcasted
 } catch {
+  // SPARK-24294: To bypass scala bug: 
https://github.com/scala/bug/issues/9554, we throw
+  // SparkFatalException, which is a subclass of Exception. 
ThreadUtils.awaitResult
+  // will catch this exception and re-throw the wrapped fatal 
throwable.
   case oe: OutOfMemoryError =>
-throw new OutOfMemoryError(s"Not enough memory to build and 
broadcast the table to " +
+throw new SparkFatalException(
+  new OutOfMemoryError(s"Not enough memory to build and 
broadcast the table to " +
--- End diff --

I agree that we're likely to have reclaimable space at this point, so the 
chance of a second OOM / failure here seems small. I'm pretty sure that the 
OutOfMemoryError being caught here often originates from Spark itself where we 
explicitly throw another `OutOfMemoryError` at a lower layer of the system, in 
which case we still actually have heap to allocate strings. We should 
investigate and clean up that practice, but let's do that in a separate PR.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21342: [SPARK-24294] Throw SparkException when OOM in Br...

2018-05-21 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/21342#discussion_r189753564
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala
 ---
@@ -111,12 +112,18 @@ case class BroadcastExchangeExec(
   SQLMetrics.postDriverMetricUpdates(sparkContext, executionId, 
metrics.values.toSeq)
   broadcasted
 } catch {
+  // SPARK-24294: To bypass scala bug: 
https://github.com/scala/bug/issues/9554, we throw
+  // SparkFatalException, which is a subclass of Exception. 
ThreadUtils.awaitResult
+  // will catch this exception and re-throw the wrapped fatal 
throwable.
   case oe: OutOfMemoryError =>
--- End diff --

Agreed; let's fix that separately.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21342: [SPARK-24294] Throw SparkException when OOM in Br...

2018-05-21 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/21342#discussion_r189753488
  
--- Diff: 
core/src/main/scala/org/apache/spark/util/SparkFatalException.scala ---
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.util
+
+/**
+ * SPARK-24294: To bypass scala bug: 
https://github.com/scala/bug/issues/9554, we catch
+ * fatal throwable in {@link scala.concurrent.Future}'s body, and re-throw
+ * SparkFatalException, which wraps the fatal throwable inside.
+ */
+private[spark] final class SparkFatalException(val throwable: Throwable) 
extends Exception
--- End diff --

Are there places where we fetch results from Futures without going through 
the `ThreadUtils.awaitResult`? In other words, is that a narrow waist? Would it 
make sense to add a second redundant layer of unwrapping at the top of 
`SparkUncaughtExceptionHandler` to handle that case? Not sure yet, but just 
thinking aloud here.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21329: [SPARK-24277][SQL] Code clean up in SQL module: HadoopMa...

2018-05-18 Thread JoshRosen
Github user JoshRosen commented on the issue:

https://github.com/apache/spark/pull/21329
  
I'd also like to note that commit protocols have historically been a very 
high risk area of the code, so I think we should have a much higher bar for 
explaining changes to that component.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21329: [SPARK-24277][SQL] Code clean up in SQL module: HadoopMa...

2018-05-18 Thread JoshRosen
Github user JoshRosen commented on the issue:

https://github.com/apache/spark/pull/21329
  
In general, I'm very wary of cleanup changes like this: unless we have a 
_need_ to do this (i.e. it causes negative side effects, breaks workloads, 
prevents specific concrete improvements, etc.) then the risk of changing 
longstanding old code outweighs any benefits of "cleanliness".

In this specific case, I'm most concerned about the removal of those 
`jobContext` `Configuration` changes: they might appear superfluous but I'd bet 
that they were originally added for a reason which made sense at the time the 
original code was authored. If we're going to remove such old code, I'd like to 
see a written explanation of why the change is safe which explains the original 
context for adding that code. I chased through the git blame for the removed 
lines and the oldest version I found was 
https://github.com/apache/spark/commit/0595b6de8f1da04baceda082553c2aa1aa2cb006#diff-5c7d283b2f533b6f491dd1845dd86797R299,
 which is #5526: "[SPARK-3928] [SPARK-5182] [SQL] Partitioning support for the 
data sources API" by @liancheng. It looks that PR simply copied even older code 
from either `hiveWriterContainers.scala` or `HadoopRDD`. It looks like these 
properties actually _did_ have some effect for something at one point in time 
because https://github.com/apache/spark/pull/101 was 
 fixing some sort of corner-case or bug in that code in the `HadoopRDD` case. 
If you go even further back, similar lines are present in a commit which 
predated our merge script (so it's an intermediate commit from @mateiz as part 
of some larger changeset): 
https://github.com/apache/spark/commit/0ccfe20755665aa4c347b82e18297c5b3a2284ee#diff-8382b1a276e6cbfae1efb3ee49c7e06eR144

Given this long history, I'd like to flag that change as potentially 
high-risk: it's not obvious to me that this code is unneeded and if we don't 
have a strong reason to change it then I'd prefer to leave it as it was before 
simply to help us manage / reduce risk.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21342: [SPARK-24294] Throw SparkException when OOM in Broadcast...

2018-05-16 Thread JoshRosen
Github user JoshRosen commented on the issue:

https://github.com/apache/spark/pull/21342
  
I'm also in favor of delaying for a couple of days for more detailed review 
because historically I think these types of changes have been high risk. The 
risk calculus might be a bit different if this was fixing a critical "wrong 
result" correctness bug, but it seems like this is a longstanding annoyance 
which causes either poor performance or visible job failures, so I don't see an 
extreme urgency to get this in immediately. Therefore let's take a bit of time 
to ponder things and sleep on it to be sure that we've thought through 
corner-cases (to be clear, I do think this patch is generally in a good 
direction).

Some specifics:

1. The old code had some places which purposely caught `OutOfMemoryError` 
thrown from layers of the spilling code. I do not know whether the expected 
sources of OOMs were only the throw sites modified here or whether the intent 
was also to catch OOMs from allocating too big arrays, etc. The latter would 
have been a dodgy pattern and bad idea in the first place, but I just wanted to 
note this as a potential risk for unintended / implicit behavior changes. If we 
want to be super conservative about that we could update throw sites but keep 
catch sites and extend them to catch _both_ OOM cases.
2. Should we maybe throw an `OutOfMemoryError` subclass and then 
pattern-match on our subclass in a couple of specific places? That might help 
reduce change-surface.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21346: [SPARK-6237][NETWORK] Network-layer changes to allow str...

2018-05-16 Thread JoshRosen
Github user JoshRosen commented on the issue:

https://github.com/apache/spark/pull/21346
  
It's been a little while since I've thought about this issue, so I have a 
few clarifying questions to help me understand the high-level changes:

1. I recall that the problem with large shuffle blocks was that the 
OneForOneBlockFetcher strategy basically read the entire block as a single 
chunk, which becomes a problem for large blocks. I understand that we have now 
removed this limitation for shuffles by using a streaming transfer strategy 
only for large blocks (above some threshold). Is this patch conceptually doing 
the same thing for push-based communication where the action is initiated by a 
sender (e.g. to push a block for replication)? Does it also affect pull-based 
remote cache block reads or will that be handled separately?
2. Given that we already seem to have pull-based `openStream()` calls which 
can be initiated from the receive side, could we simplify things here by 
pushing a "this value is big, pull it" message and then have the remote end 
initiate a streaming read, similar to how DirectTaskResult and 
IndirectTaskResult work?
3. For remote reads of large cached blocks: is it true that this works 
today _only if_ the block is on disk but fails if the block is in memory? If 
certain size limit problems only occur when things are cached in memory, can we 
simplify anything if we add a requirement that blocks above 2GB can _only_ be 
cached on disk (regardless of storage level)?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21327: [SPARK-24107][CORE][followup] ChunkedByteBuffer.writeFul...

2018-05-16 Thread JoshRosen
Github user JoshRosen commented on the issue:

https://github.com/apache/spark/pull/21327
  
LGTM. Thanks for these changes; they really help to clarify this tricky 
piece of code for readers.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21175: [SPARK-24107][CORE] ChunkedByteBuffer.writeFully method ...

2018-05-14 Thread JoshRosen
Github user JoshRosen commented on the issue:

https://github.com/apache/spark/pull/21175
  
No, I mean that the code here can simply follow the write call as straight
through code. We don't need to guard against exceptions here because the
duplicate of the buffer is used only by a single thread, so you can omit
the try block and just concatenate the try contents to the finally
contents. Minor bit but I wanted to comment because I initially was
confused about when errors could occur and thread safety / sharing until I
realized that the modified state does not escape this method.
On Mon, May 14, 2018 at 9:03 PM Wenchen Fan <notificati...@github.com>
wrote:

> *@cloud-fan* commented on this pull request.
> --
>
> In core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala
> <https://github.com/apache/spark/pull/21175#discussion_r188160044>:
>
> > @@ -63,10 +63,15 @@ private[spark] class ChunkedByteBuffer(var chunks: 
Array[ByteBuffer]) {
> */
>def writeFully(channel: WritableByteChannel): Unit = {
>  for (bytes <- getChunks()) {
> -  while (bytes.remaining() > 0) {
> -val ioSize = Math.min(bytes.remaining(), bufferWriteChunkSize)
> -bytes.limit(bytes.position() + ioSize)
> -channel.write(bytes)
> +  val curChunkLimit = bytes.limit()
> +  while (bytes.hasRemaining) {
> +try {
> +  val ioSize = Math.min(bytes.remaining(), bufferWriteChunkSize)
> +  bytes.limit(bytes.position() + ioSize)
> +  channel.write(bytes)
> +} finally {
>
> Do you mean this is not a real bug that can cause real workload to fail?
>
> —
> You are receiving this because you commented.
> Reply to this email directly, view it on GitHub
> <https://github.com/apache/spark/pull/21175#discussion_r188160044>, or 
mute
> the thread
> 
<https://github.com/notifications/unsubscribe-auth/AADGPJvZNC5LYjHl2WZ44YEIBVGLrehEks5tylODgaJpZM4TptO_>
> .
>



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21175: [SPARK-24107][CORE] ChunkedByteBuffer.writeFully ...

2018-05-14 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/21175#discussion_r188154900
  
--- Diff: 
core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala ---
@@ -63,10 +63,15 @@ private[spark] class ChunkedByteBuffer(var chunks: 
Array[ByteBuffer]) {
*/
   def writeFully(channel: WritableByteChannel): Unit = {
 for (bytes <- getChunks()) {
-  while (bytes.remaining() > 0) {
-val ioSize = Math.min(bytes.remaining(), bufferWriteChunkSize)
-bytes.limit(bytes.position() + ioSize)
-channel.write(bytes)
+  val curChunkLimit = bytes.limit()
+  while (bytes.hasRemaining) {
+try {
+  val ioSize = Math.min(bytes.remaining(), bufferWriteChunkSize)
+  bytes.limit(bytes.position() + ioSize)
--- End diff --

The rationale for the `limit()` isn't super-clear, but that was a problem 
in the original PR which introduced the bug (#18730). I'm commenting here only 
for cross-reference reference for folks who come across this patch in the 
future. I believe that the original motivation was 
http://www.evanjones.ca/java-bytebuffer-leak.html


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21175: [SPARK-24107][CORE] ChunkedByteBuffer.writeFully ...

2018-05-14 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/21175#discussion_r188154716
  
--- Diff: 
core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala ---
@@ -63,10 +63,15 @@ private[spark] class ChunkedByteBuffer(var chunks: 
Array[ByteBuffer]) {
*/
   def writeFully(channel: WritableByteChannel): Unit = {
 for (bytes <- getChunks()) {
-  while (bytes.remaining() > 0) {
-val ioSize = Math.min(bytes.remaining(), bufferWriteChunkSize)
-bytes.limit(bytes.position() + ioSize)
-channel.write(bytes)
+  val curChunkLimit = bytes.limit()
+  while (bytes.hasRemaining) {
+try {
+  val ioSize = Math.min(bytes.remaining(), bufferWriteChunkSize)
+  bytes.limit(bytes.position() + ioSize)
+  channel.write(bytes)
+} finally {
--- End diff --

I don't think we need the `try` and `finally` here because `getChunks()` 
returns duplicated ByteBuffers which have their own position and limit.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21212: [SPARK-24143] filter empty blocks when convert ma...

2018-05-06 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/21212#discussion_r186315362
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala 
---
@@ -267,28 +269,30 @@ final class ShuffleBlockFetcherIterator(
 // at most maxBytesInFlight in order to limit the amount of data in 
flight.
 val remoteRequests = new ArrayBuffer[FetchRequest]
 
-// Tracks total number of blocks (including zero sized blocks)
-var totalBlocks = 0
 for ((address, blockInfos) <- blocksByAddress) {
-  totalBlocks += blockInfos.size
   if (address.executorId == blockManager.blockManagerId.executorId) {
-// Filter out zero-sized blocks
-localBlocks ++= blockInfos.filter(_._2 != 0).map(_._1)
+blockInfos.find(_._2 <= 0) match {
+  case Some((blockId, size)) if size < 0 =>
+throw new BlockException(blockId, "Negative block size " + 
size)
+  case Some((blockId, size)) if size == 0 =>
+throw new BlockException(blockId, "Zero-sized blocks should be 
excluded.")
--- End diff --

I think that failing with an exception here is a great idea, so thanks for 
adding these checks. In general, I'm in favor of adding explicit fail-fast 
checks for invariants like this because it can help to defend against silent 
corruption bugs. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18801: SPARK-10878 Fix race condition when multiple clie...

2018-05-04 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/18801#discussion_r186160092
  
--- Diff: 
core/src/test/scala/org/apache/spark/deploy/SparkSubmitUtilsSuite.scala ---
@@ -255,4 +256,20 @@ class SparkSubmitUtilsSuite extends SparkFunSuite with 
BeforeAndAfterAll {
   assert(jarPath.indexOf("mydep") >= 0, "should find dependency")
 }
   }
+
+  test("test resolution files cleaned after resolving artifact") {
--- End diff --

Actually triggering the original race in a test could be pretty hard, so 
I'm not sure that we necessarily should block this fix on having an end-to-end 
integration test which could reproduce the original race. I think that the use 
of the UUID should be sufficient and therefore the only important thing to test 
is that we're still cleaning up the files properly (as is being done here).

Therefore I would welcome a re-submitted and cleaned-up version of this PR 
which addresses the other review comments (which I hope to merge soon if it's 
in good shape).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21219: [SPARK-24160] ShuffleBlockFetcherIterator should ...

2018-05-03 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/21219#discussion_r185949405
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala 
---
@@ -407,6 +407,25 @@ final class ShuffleBlockFetcherIterator(
 logDebug("Number of requests in flight " + reqsInFlight)
   }
 
+  if (buf.size == 0) {
+// We will never legitimately receive a zero-size block. All 
blocks with zero records
+// have zero size and all zero-size blocks have no records 
(and hence should never
+// have been requested in the first place). This statement 
relies on behaviors of the
+// shuffle writers, which are guaranteed by the following test 
cases:
+//
+// - BypassMergeSortShuffleWriterSuite: "write with some empty 
partitions"
+// - UnsafeShuffleWriterSuite: "writeEmptyIterator"
+// - DiskBlockObjectWriterSuite: "commit() and close() without 
ever opening or writing"
+//
+// There is not an explicit test for SortShuffleWriter but the 
underlying APIs that
+// uses are shared by the UnsafeShuffleWriter (both writers 
use DiskBlockObjectWriter
+// which returns a zero-size from commitAndGet() in case the 
no records were written
--- End diff --

Fixed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21219: [SPARK-24160] ShuffleBlockFetcherIterator should fail if...

2018-05-03 Thread JoshRosen
Github user JoshRosen commented on the issue:

https://github.com/apache/spark/pull/21219
  
jenkins retest this please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21219: [SPARK-24160] ShuffleBlockFetcherIterator should fail if...

2018-05-02 Thread JoshRosen
Github user JoshRosen commented on the issue:

https://github.com/apache/spark/pull/21219
  
jenkins retest this please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21219: [SPARK-24160] ShuffleBlockFetcherIterator should ...

2018-05-02 Thread JoshRosen
GitHub user JoshRosen opened a pull request:

https://github.com/apache/spark/pull/21219

[SPARK-24160] ShuffleBlockFetcherIterator should fail if it receives 
zero-size blocks

## What changes were proposed in this pull request?

This patch modifies `ShuffleBlockFetcherIterator` so that the receipt of 
zero-size blocks is treated as an error. This is done as a preventative measure 
to guard against a potential source of data loss bugs.

In the shuffle layer, we guarantee that zero-size blocks will never be 
requested (a block containing zero records is always 0 bytes in size and is 
marked as empty such that it will never be legitimately requested by 
executors). However, the existing code does not fully take advantage of this 
invariant in the shuffle-read path: the existing code did not explicitly check 
whether blocks are non-zero-size.

Additionally, our decompression and deserialization streams treat zero-size 
inputs as empty streams rather than errors (EOF might actually be treated as 
"end-of-stream" in certain layers (longstanding behavior dating to earliest 
versions of Spark) and decompressors like Snappy may be tolerant to zero-size 
inputs).

As a result, if some other bug causes legitimate buffers to be replaced 
with zero-sized buffers (due to corruption on either the send or receive sides) 
then this would translate into silent data loss rather than an explicit 
fail-fast error. 

This patch addresses this problem by adding a `buf.size != 0` check. See 
code comments for pointers to tests which guarantee the invariants relied on 
here.

## How was this patch tested?

Existing tests (which required modifications, since some were creating 
empty buffers in mocks). I also added a test to make sure we fail on zero-size 
blocks.

To test that the zero-size blocks are indeed a potential corruption source, 
I manually ran a workload in `spark-shell` with a modified build which replaces 
all buffers with zero-size buffers in the receive path.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/JoshRosen/spark SPARK-24160

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/21219.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #21219


commit 41d06e13d0f95f1dd146b6b512a0becc88eb2caa
Author: Josh Rosen <joshrosen@...>
Date:   2018-05-02T21:59:26Z

ShuffleBlockFetcherIterator should fail if it receives zero-size blocks




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21101: [SPARK-23989][SQL] exchange should copy data befo...

2018-04-18 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/21101#discussion_r182609675
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala
 ---
@@ -167,22 +164,24 @@ object ShuffleExchangeExec {
 val shuffleManager = SparkEnv.get.shuffleManager
 val sortBasedShuffleOn = 
shuffleManager.isInstanceOf[SortShuffleManager]
 val bypassMergeThreshold = 
conf.getInt("spark.shuffle.sort.bypassMergeThreshold", 200)
+val numParts = partitioner.numPartitions
 if (sortBasedShuffleOn) {
-  val bypassIsSupported = 
SparkEnv.get.shuffleManager.isInstanceOf[SortShuffleManager]
-  if (bypassIsSupported && partitioner.numPartitions <= 
bypassMergeThreshold) {
+  if (numParts <= bypassMergeThreshold) {
 // If we're using the original SortShuffleManager and the number 
of output partitions is
 // sufficiently small, then Spark will fall back to the hash-based 
shuffle write path, which
 // doesn't buffer deserialized records.
 // Note that we'll have to remove this case if we fix SPARK-6026 
and remove this bypass.
 false
-  } else if (serializer.supportsRelocationOfSerializedObjects) {
+  } else if (numParts <= 
SortShuffleManager.MAX_SHUFFLE_OUTPUT_PARTITIONS_FOR_SERIALIZED_MODE) {
--- End diff --

I was almost going to suggest that we should we check for both conditions 
with an `&&` here just as future-proofing in case `serializer` was changed, but 
I can now see why that isn't necessary: we always use an `UnsafeRowSerializer` 
here now. It was only in the pre-Tungsten era that we could use either 
`UnsafeRowSerializer` or `SparkSqlSerializer` here.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20310: revert [SPARK-10030] Use tags to control which te...

2018-01-17 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/20310#discussion_r162268244
  
--- Diff: common/tags/src/test/java/org/apache/spark/tags/DockerTest.java 
---
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.tags;
-
-import java.lang.annotation.*;
-import org.scalatest.TagAnnotation;
-
-@TagAnnotation
-@Retention(RetentionPolicy.RUNTIME)
-@Target({ElementType.METHOD, ElementType.TYPE})
-public @interface DockerTest { }
--- End diff --

If you search through the commit history, I'm pretty sure that we 
originally tried running those DockerTests on RISELAB Jenkins but ran into 
problems with the Docker Daemon becoming unstable under heavy build load. This 
should be fixed in the newer-generation Ubuntu build workers, but we haven't 
quite finished migrating the PRBs onto those.

Given this, my hunch is that those tests aren't running anywhere right now, 
which isn't great. I think they're primarily used for testing JDBC data source 
SQL dialect mappings. It's been a year or more since I last looked into this, 
though, so I might be misremembering.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20310: revert [SPARK-10030] Use tags to control which tests to ...

2018-01-17 Thread JoshRosen
Github user JoshRosen commented on the issue:

https://github.com/apache/spark/pull/20310
  
Are you sure that we want to blanket revert this entire patch? Is there a 
more surgical short-term fix we can make in `dev/sparktestsupport/modules.py` 
to just always unconditionally enable the tag for now?

Also, is this the first time recently that we've failed the YARN 
integration tests? How much time do they add?

The trade off here seems to be between slightly slower after-the-fact 
detection of a test failure / build break due to YARN vs. faster tests for the 
majority of PRs that don't touch YARN code. I think we've had one or two such 
breaks in the 2+ years that we've been using these test tags, so I'd also be 
fine with postponing this change if you agree that it's unlikely that we're 
going to have many such failures here.

If the motivation is that it's hard to test the fix for such build breaks 
(because the failing test wouldn't be exercised in the PR builder) then I think 
we might already have a solution via special tags placed into the PR title (I 
think `test-yarn` or something similar; see `run-tests-jenkins.py`).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20264: [SPARK-23070] Bump previousSparkVersion in MimaBu...

2018-01-14 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/20264#discussion_r161418395
  
--- Diff: project/MimaBuild.scala ---
@@ -88,7 +88,7 @@ object MimaBuild {
 
   def mimaSettings(sparkHome: File, projectRef: ProjectRef) = {
 val organization = "org.apache.spark"
-val previousSparkVersion = "2.0.0"
+val previousSparkVersion = "2.2.0"
--- End diff --

Yeah, 2.2.0 is right.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20222: [SPARK-23028] Bump master branch version to 2.4.0...

2018-01-10 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/20222#discussion_r160741170
  
--- Diff: project/MimaExcludes.scala ---
@@ -34,6 +34,10 @@ import com.typesafe.tools.mima.core.ProblemFilters._
  */
 object MimaExcludes {
 
+  // Exclude rules for 2.4.x
--- End diff --

We might want to make my proposed MiMa changes in a separate patch so the 
same set of reviewed changes can go to both master and branch-2.3.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20222: [SPARK-23028] Bump master branch version to 2.4.0...

2018-01-10 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/20222#discussion_r160740904
  
--- Diff: project/MimaExcludes.scala ---
@@ -34,6 +34,10 @@ import com.typesafe.tools.mima.core.ProblemFilters._
  */
 object MimaExcludes {
 
+  // Exclude rules for 2.4.x
--- End diff --

This reminds me that we should probably bump `previousSparkVersion` in 
`MimaBuild.scala` to be 2.2.0: 
https://github.com/apache/spark/blame/f340b6b3066033d40b7e163fd5fb68e9820adfb1/project/MimaBuild.scala#L91.
 I think this should happen for both master and branch-2.3.

See #15061 for an example of a similar change when 2.1.x was being prepared 
(it looks like we missed this step for 2.2.0). We may also need to un-exclude 
any new subprojects / artifacts that were added in 2.2.0 since they now need to 
be backwards-compatibility-tested for 2.3.x.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20222: [SPARK-23028] Bump master branch version to 2.4.0-SNAPSH...

2018-01-10 Thread JoshRosen
Github user JoshRosen commented on the issue:

https://github.com/apache/spark/pull/20222
  
We should already be set up for 2.3.x builds in AMPLab Jenkins. For 
example: 
https://amplab.cs.berkeley.edu/jenkins/view/Spark%20QA%20Test/job/spark-branch-2.3-test-maven-hadoop-2.6/


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20191: [SPARK-22997] Add additional defenses against use of fre...

2018-01-10 Thread JoshRosen
Github user JoshRosen commented on the issue:

https://github.com/apache/spark/pull/20191
  
I'm merging this into master and branch-2.3.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20191: [SPARK-22997] Add additional defenses against use of fre...

2018-01-09 Thread JoshRosen
Github user JoshRosen commented on the issue:

https://github.com/apache/spark/pull/20191
  
jenkins retest this please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20179: [SPARK-22982] Remove unsafe asynchronous close() ...

2018-01-09 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/20179#discussion_r160547545
  
--- Diff: core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala 
---
@@ -376,18 +374,13 @@ private[netty] class NettyRpcEnv(
 
 def setError(e: Throwable): Unit = {
   error = e
-  source.close()
 }
 
 override def read(dst: ByteBuffer): Int = {
   Try(source.read(dst)) match {
+case _ if error != null => throw error
--- End diff --

I added a pair of comments to explain the flow of calls involving 
`setError()` and pipe closes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20179: [SPARK-22982] Remove unsafe asynchronous close() ...

2018-01-09 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/20179#discussion_r160489460
  
--- Diff: 
core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala ---
@@ -196,11 +196,24 @@ private[spark] class IndexShuffleBlockResolver(
 // find out the consolidated file, then the offset within that from 
our index
 val indexFile = getIndexFile(blockId.shuffleId, blockId.mapId)
 
-val in = new DataInputStream(new FileInputStream(indexFile))
+// SPARK-22982: if this FileInputStream's position is seeked forward 
by another piece of code
+// which is incorrectly using our file descriptor then this code will 
fetch the wrong offsets
+// (which may cause a reducer to be sent a different reducer's data). 
The explicit position
+// checks added here were a useful debugging aid during SPARK-22982 
and may help prevent this
+// class of issue from re-occurring in the future which is why they 
are left here even though
+// SPARK-22982 is fixed.
+val channel = Files.newByteChannel(indexFile.toPath)
+channel.position(blockId.reduceId * 8)
--- End diff --

For some more background: the asynchronous `close()` bug can cause reads 
from a closed-and-subsequently-reassigned file descriptor number and in 
principle this can affect almost any IO operation _anywhere_ in the 
application. For example, if the closed file descriptor number is immediately 
recycled by opening a socket then the invalid read can cause that socket read 
to miss data (since the data would have been consumed by the invalid reader and 
won't be delivered to the legitimate new user of the file descriptor).

Given this, I see how it might be puzzling that this patch is adding a 
check only here. There are two reasons for this:

1. Many other IO operations have implicit checksumming such that dropping 
data due to an invalid read be detected and cause an exception. For example, 
many compression codecs have block-level checksumming (and magic numbers at the 
beginning of the stream), so dropping data (especially at the start of a read) 
will be detected. This particular shuffle index file, however, does _not_ have 
mechanisms to detect corruption: skipping forward in the read by a multiple of 
8 bytes will still read structurally-valid data (but it will be the _wrong_ 
data, causing the wrong output to be read from the shuffle data file).
2. In the investigation which uncovered this bug, the invalid reads were 
predominantly impacting shuffle index lookups for reading local blocks. In a 
nutshell, there's a subtle race condition where Janino codegen compilation 
triggers attempted remote classloading of classes which don't exist, triggering 
the error-handling / error-propagation paths in `FileDownloadChannel` and 
causing the invalid asynchronous `close()` call to be performed. At the same 
time that this `close()` call was being performed, another task from the same 
stage attempts to read the shuffle index files of local blocks and experiences 
an invalid read due to the falsely-shared file descriptor.

   This is a very hard-to-trigger bug: we were only able to reproduce it on 
large clusters with very fast machines and shuffles that contain large numbers 
of map and reduce tasks (more shuffle blocks means more index file reads and 
more chances for the race to occur; faster machines increase the likelihood of 
the race occurring; larger clusters give us more chances for the error to 
occur). In our reproduction, this race occurred on a microsecond timescale 
(measured via kernel syscall tracing) and occurred relatively rarely, requiring 
many iterations until we could trigger a reproduction.

While investigating, I added these checks so that the index read fails-fast 
when this issue occurs, which made it significantly easier to reproduce and 
diagnose the root cause (fixed by the other changes in this patch).

There are a number of interesting details in the story of how we worked 
from the original high-level data corruption symptom to this low-level IO bug. 
I'll see about writing up the complete story in a blog post at some point.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20179: [SPARK-22982] Remove unsafe asynchronous close() ...

2018-01-09 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/20179#discussion_r160481843
  
--- Diff: 
core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala ---
@@ -196,11 +196,24 @@ private[spark] class IndexShuffleBlockResolver(
 // find out the consolidated file, then the offset within that from 
our index
 val indexFile = getIndexFile(blockId.shuffleId, blockId.mapId)
 
-val in = new DataInputStream(new FileInputStream(indexFile))
+// SPARK-22982: if this FileInputStream's position is seeked forward 
by another piece of code
+// which is incorrectly using our file descriptor then this code will 
fetch the wrong offsets
+// (which may cause a reducer to be sent a different reducer's data). 
The explicit position
+// checks added here were a useful debugging aid during SPARK-22982 
and may help prevent this
+// class of issue from re-occurring in the future which is why they 
are left here even though
+// SPARK-22982 is fixed.
+val channel = Files.newByteChannel(indexFile.toPath)
+channel.position(blockId.reduceId * 8)
+val in = new DataInputStream(Channels.newInputStream(channel))
 try {
-  ByteStreams.skipFully(in, blockId.reduceId * 8)
   val offset = in.readLong()
   val nextOffset = in.readLong()
+  val actualPosition = channel.position()
+  val expectedPosition = blockId.reduceId * 8 + 16
+  if (actualPosition != expectedPosition) {
+throw new Exception(s"SPARK-22982: Incorrect channel position 
after index file reads: " +
--- End diff --

Any suggestions for a better exception subtype? I don't expect this to be a 
recoverable error and wanted to avoid the possibility that downstream code 
catches and handles this error. Maybe I should go further and make it a 
RuntimeException to make it even more fatal?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20179: [SPARK-22982] Remove unsafe asynchronous close() ...

2018-01-08 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/20179#discussion_r160314055
  
--- Diff: 
core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala ---
@@ -196,11 +196,24 @@ private[spark] class IndexShuffleBlockResolver(
 // find out the consolidated file, then the offset within that from 
our index
 val indexFile = getIndexFile(blockId.shuffleId, blockId.mapId)
 
-val in = new DataInputStream(new FileInputStream(indexFile))
+// SPARK-22982: if this FileInputStream's position is seeked forward 
by another piece of code
+// which is incorrectly using our file descriptor then this code will 
fetch the wrong offsets
+// (which may cause a reducer to be sent a different reducer's data). 
The explicit position
+// checks added here were a useful debugging aid during SPARK-22982 
and may help prevent this
+// class of issue from re-occurring in the future which is why they 
are left here even though
+// SPARK-22982 is fixed.
+val channel = Files.newByteChannel(indexFile.toPath)
+channel.position(blockId.reduceId * 8)
--- End diff --

I made sure to incorporate @zsxwing's changes here. The problem originally 
related to calling `skip()`, but this change is from his fix to explicitly use 
`position` on a `FileChannel` instead.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20179: [SPARK-22982] Remove unsafe asynchronous close() ...

2018-01-08 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/20179#discussion_r160313840
  
--- Diff: core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala 
---
@@ -376,18 +374,13 @@ private[netty] class NettyRpcEnv(
 
 def setError(e: Throwable): Unit = {
   error = e
-  source.close()
 }
 
 override def read(dst: ByteBuffer): Int = {
--- End diff --

Yes. This currently happens in two places:

- In Utils.doFetchFile(): 
https://github.com/apache/spark/blob/28315714ddef3ddcc192375e98dd5207cf4ecc98/core/src/main/scala/org/apache/spark/util/Utils.scala#L661:
 the stream gets passed a couple of layers down to a 
`Utils.copyStream(closeStreams = true)` call which is guaranteed to clean up 
the stream.
- In ExecutorClassLoader, where we construct the stream in `fetchFn` from 
`getclassFileInputStreamFromRpc` and then close it in a `finally` block in 
`findClassLocally`: 
https://github.com/apache/spark/blob/e08d06b37bc96cc48fec1c5e40f73e0bca09c616/repl/src/main/scala/org/apache/spark/repl/ExecutorClassLoader.scala#L167


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20179: [SPARK-22982] Remove unsafe asynchronous close() ...

2018-01-08 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/20179#discussion_r160312699
  
--- Diff: 
core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala ---
@@ -196,11 +196,24 @@ private[spark] class IndexShuffleBlockResolver(
 // find out the consolidated file, then the offset within that from 
our index
 val indexFile = getIndexFile(blockId.shuffleId, blockId.mapId)
 
-val in = new DataInputStream(new FileInputStream(indexFile))
+// SPARK-22982: if this FileInputStream's position is seeked forward 
by another piece of code
+// which is incorrectly using our file descriptor then this code will 
fetch the wrong offsets
+// (which may cause a reducer to be sent a different reducer's data). 
The explicit position
+// checks added here were a useful debugging aid during SPARK-22982 
and may help prevent this
+// class of issue from re-occurring in the future which is why they 
are left here even though
+// SPARK-22982 is fixed.
+val channel = Files.newByteChannel(indexFile.toPath)
+channel.position(blockId.reduceId * 8)
+val in = new DataInputStream(Channels.newInputStream(channel))
 try {
-  ByteStreams.skipFully(in, blockId.reduceId * 8)
   val offset = in.readLong()
   val nextOffset = in.readLong()
+  val actualPosition = channel.position()
+  val expectedPosition = blockId.reduceId * 8 + 16
+  if (actualPosition != expectedPosition) {
--- End diff --

I considered this, but I don't think there's ever a case where we want to 
elide this particular check: if we read an incorrect offset here then there's 
(potentially) no other mechanism to detect this error, leading to silent wrong 
answers.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20179: [SPARK-22982] Remove unsafe asynchronous close() ...

2018-01-08 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/20179#discussion_r160312383
  
--- Diff: core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala 
---
@@ -332,15 +332,17 @@ private[netty] class NettyRpcEnv(
 
 val pipe = Pipe.open()
 val source = new FileDownloadChannel(pipe.source())
+var exceptionThrown = true
 try {
   val client = downloadClient(parsedUri.getHost(), parsedUri.getPort())
   val callback = new FileDownloadCallback(pipe.sink(), source, client)
   client.stream(parsedUri.getPath(), callback)
-} catch {
-  case e: Exception =>
+  exceptionThrown = false
+} finally {
+  if (exceptionThrown) {
 pipe.sink().close()
--- End diff --

To preserve the same behavior, I think we'd need to do

```scala
try {
 // code
} catch {
  case t: Throwable =>
pipe.sink().close()
source.close()
throw t
}
```

to ensure that we propagate the original `Throwable`.

It could be clearer (and safer, in the case of exceptions thrown from 
`close()` calls), to use 
[`Utils.tryWithSafeFinallyAndFailureCallbacks`](https://github.com/apache/spark/blob/849043ce1d28a976659278d29368da0799329db8/core/src/main/scala/org/apache/spark/util/Utils.scala#L1407)
 here, so let me make that change.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20191: [SPARK-22997] Add additional defenses against use of fre...

2018-01-08 Thread JoshRosen
Github user JoshRosen commented on the issue:

https://github.com/apache/spark/pull/20191
  
jenkins retest this please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20191: [SPARK-22997] Add additional defenses against use...

2018-01-08 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/20191#discussion_r160297024
  
--- Diff: 
common/unsafe/src/main/java/org/apache/spark/unsafe/memory/UnsafeMemoryAllocator.java
 ---
@@ -38,9 +38,20 @@ public MemoryBlock allocate(long size) throws 
OutOfMemoryError {
   public void free(MemoryBlock memory) {
 assert (memory.obj == null) :
   "baseObject not null; are you trying to use the off-heap allocator 
to free on-heap memory?";
+assert (memory.pageNumber != 
MemoryBlock.FREED_IN_ALLOCATOR_PAGE_NUMBER) :
+  "page has already been freed";
+assert ((memory.pageNumber == MemoryBlock.NO_PAGE_NUMBER)
+|| (memory.pageNumber == 
MemoryBlock.FREED_IN_TMM_PAGE_NUMBER)) :
+  "TMM-allocated pages must be freed via TMM.freePage(), not directly 
in allocator free()";
+
 if (MemoryAllocator.MEMORY_DEBUG_FILL_ENABLED) {
   memory.fill(MemoryAllocator.MEMORY_DEBUG_FILL_FREED_VALUE);
 }
 Platform.freeMemory(memory.offset);
+// As an additional layer of defense against use-after-free bugs, we 
mutate the
+// MemoryBlock to reset its pointer.
+memory.offset = 0;
--- End diff --

Yep, this will guarantee SIGSEGV instead of corruption.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20191: [SPARK-22997] Add additional defenses against use...

2018-01-08 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/20191#discussion_r160296722
  
--- Diff: 
common/unsafe/src/main/java/org/apache/spark/unsafe/memory/UnsafeMemoryAllocator.java
 ---
@@ -38,9 +38,20 @@ public MemoryBlock allocate(long size) throws 
OutOfMemoryError {
   public void free(MemoryBlock memory) {
 assert (memory.obj == null) :
   "baseObject not null; are you trying to use the off-heap allocator 
to free on-heap memory?";
+assert (memory.pageNumber != 
MemoryBlock.FREED_IN_ALLOCATOR_PAGE_NUMBER) :
+  "page has already been freed";
+assert ((memory.pageNumber == MemoryBlock.NO_PAGE_NUMBER)
+|| (memory.pageNumber == 
MemoryBlock.FREED_IN_TMM_PAGE_NUMBER)) :
+  "TMM-allocated pages must be freed via TMM.freePage(), not directly 
in allocator free()";
+
 if (MemoryAllocator.MEMORY_DEBUG_FILL_ENABLED) {
   memory.fill(MemoryAllocator.MEMORY_DEBUG_FILL_FREED_VALUE);
 }
 Platform.freeMemory(memory.offset);
+// As an additional layer of defense against use-after-free bugs, we 
mutate the
+// MemoryBlock to reset its pointer.
+memory.offset = 0;
--- End diff --

I think that it depends on whether the direct memory address stored in 
`memory.offset` corresponds to the address of memory allocated to the JVM.

If we've freed the underlying Unsafe / Direct memory then I would expect a 
SIGSEGV, but if the address has been re-used by a subsequent allocation (or if 
we've done some buffer pooling and have recycled the underlying direct memory 
without freeing it) then this address could point to valid memory allocated by 
the JVM but which is currently in use by another Spark task, so reads or writes 
would trigger data corruption.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20191: [SPARK-22997] Add additional defenses against use...

2018-01-08 Thread JoshRosen
GitHub user JoshRosen opened a pull request:

https://github.com/apache/spark/pull/20191

[SPARK-22997] Add additional defenses against use of freed MemoryBlocks

## What changes were proposed in this pull request?

This patch modifies Spark's `MemoryAllocator` implementations so that 
`free(MemoryBlock)` mutates the passed block to clear pointers (in the off-heap 
case) or null out references to backing `long[]` arrays (in the on-heap case). 
The goal of this change is to add an extra layer of defense against 
use-after-free bugs because currently it's hard to detect corruption caused by 
blind writes to freed memory blocks.

## How was this patch tested?

New unit tests in `PlatformSuite`, including new tests for existing 
functionality because we did not have sufficient mutation coverage of the 
on-heap memory allocator's pooling logic.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/JoshRosen/spark 
SPARK-22997-add-defenses-against-use-after-free-bugs-in-memory-allocator

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/20191.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #20191


commit a7f8c07fb5158f39bbb6cc1f23cfb13a0d473536
Author: Josh Rosen <joshrosen@...>
Date:   2018-01-08T23:50:18Z

Add additional defenses against use of freed MemoryBlocks




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20182: [SPARK-22985] Fix argument escaping bug in from_u...

2018-01-07 Thread JoshRosen
GitHub user JoshRosen opened a pull request:

https://github.com/apache/spark/pull/20182

[SPARK-22985] Fix argument escaping bug in from_utc_timestamp / 
to_utc_timestamp codegen

## What changes were proposed in this pull request?

This patch adds additional escaping in `from_utc_timestamp` / 
`to_utc_timestamp` expression codegen in order to a bug where invalid timezones 
which contain special characters could cause generated code to fail to compile. 

## How was this patch tested?

New regression tests in `DateExpressionsSuite`.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/JoshRosen/spark 
SPARK-22985-fix-utc-timezone-function-escaping-bugs

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/20182.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #20182


commit 2d10131892e54182517185102a362dd17de2af9b
Author: Josh Rosen <joshrosen@...>
Date:   2018-01-08T00:25:30Z

regression test

commit d94feed8e1f10c1610cae7cb3f7a3ad7014123e9
Author: Josh Rosen <joshrosen@...>
Date:   2018-01-08T00:26:05Z

Fix




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20181: [SPARK-22984] Fix incorrect bitmap copying and of...

2018-01-07 Thread JoshRosen
GitHub user JoshRosen opened a pull request:

https://github.com/apache/spark/pull/20181

[SPARK-22984] Fix incorrect bitmap copying and offset adjustment in 
GenerateUnsafeRowJoiner

## What changes were proposed in this pull request?

This PR fixes a longstanding correctness bug in `GenerateUnsafeRowJoiner`. 
This class was introduced in https://github.com/apache/spark/pull/7821 (July 
2015 / Spark 1.5.0+) and is used to combine pairs of UnsafeRows in 
TungstenAggregationIterator, CartesianProductExec, and AppendColumns.

### Bugs fixed by this patch

1. **Incorrect combining of null-tracking bitmaps**: when concatenating two 
UnsafeRows, the implementation "Concatenate the two bitsets together into a 
single one, taking padding into account". If one row has no columns then it has 
a bitset size of 0, but the code was incorrectly assuming that if the left row 
had a non-zero number of fields then the right row would also have at least one 
field, so it was copying invalid bytes and and treating them as part of the 
bitset. I'm not sure whether this bug was also present in the original 
implementation or whether it was introduced in 
https://github.com/apache/spark/pull/7892 (which fixed another bug in this 
code).
2. **Incorrect updating of data offsets for null variable-length fields**: 
after updating the bitsets and copying fixed-length and variable-length data, 
we need to perform adjustments to the offsets pointing the start of variable 
length fields's data. The existing code was _conditionally_ adding a fixed 
offset to correct for the new length of the combined row, but it is unsafe to 
do this if the variable-length field has a null value: we always represent 
nulls by storing `0` in the fixed-length slot, but this code was incorrectly 
incrementing those values. This bug was present since the original version of 
`GenerateUnsafeRowJoiner`.

### Why this bug remained latent for so long

The PR which introduced `GenerateUnsafeRowJoiner` features several 
randomized tests, including tests of the cases where one side of the join has 
no fields and where string-valued fields are null. However, the existing 
assertions were too weak to uncover this bug:

- If a null field has a non-zero value in its fixed-length data slot then 
this will not cause problems for field accesses because the null-tracking 
bitmap should still be correct and we will not try to use the incorrect offset 
for anything.
- If the null tracking bitmap is corrupted by joining against a row with no 
fields then the corruption occurs in field numbers past the actual field 
numbers contained in the row. Thus valid `isNullAt()` calls will not read the 
incorrectly-set bits.

The existing `GenerateUnsafeRowJoinerSuite` tests only exercised `.get()` 
and `isNullAt()`, but didn't actually check the UnsafeRows for bit-for-bit 
equality, preventing these bugs from failing assertions. It turns out that 
there was even a 
[GenerateUnsafeRowJoinerBitsetSuite](https://github.com/apache/spark/blob/03377d2522776267a07b7d6ae9bddf79a4e0f516/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeRowJoinerBitsetSuite.scala)
 but it looks like it also didn't catch this problem because it only tested the 
bitsets in an end-to-end fashion by accessing them through the `UnsafeRow` 
interface instead of actually comparing the bitsets' bytes.

### Impact of these bugs

- This bug will cause `equals()` and `hashCode()` to be incorrect for these 
rows, which will be problematic in case`GenerateUnsafeRowJoiner`'s results are 
used as join or grouping keys.
- Chained / repeated invocations of `GenerateUnsafeRowJoiner` may result in 
reads from invalid null bitmap positions causing fields to incorrectly become 
NULL (see the end-to-end example below).
  - It looks like this generally only happens in `CartesianProductExec`, 
which our query optimizer often avoids executing (usually we try to plan a 
`BroadcastNestedLoopJoin` instead).

### End-to-end test case demonstrating the problem

The following query demonstrates how this bug may result in incorrect query 
results:

```sql
set spark.sql.autoBroadcastJoinThreshold=-1; -- Needed to trigger 
CartesianProductExec

create table a as select * from values 1;
create table b as select * from values 2;

SELECT
  t3.col1,
  t1.col1
FROM a t1
CROSS JOIN b t2
CROSS JOIN b t3
```

This should return `(2, 1)` but instead was returning `(null, 1)`.

Column pruning ends up trimming off all columns from `t2`, so when `t2` 
joins with another table this triggers the bitmap-copying bug. This incorrect 
bitmap is subsequently copied again when performing the final join, causing the 
final output to have an incorrectly-set null bit for the first field.

## How was this pa

[GitHub] spark pull request #20180: [SPARK-22983] Don't push filters beneath aggregat...

2018-01-07 Thread JoshRosen
GitHub user JoshRosen opened a pull request:

https://github.com/apache/spark/pull/20180

[SPARK-22983] Don't push filters beneath aggregates with empty grouping 
expressions

## What changes were proposed in this pull request?

The following SQL query should return zero rows, but in Spark it actually 
returns one row:

```
SELECT 1 from (
  SELECT 1 AS z,
  MIN(a.x)
  FROM (select 1 as x) a
  WHERE false
) b
where b.z != b.z
```

The problem stems from the `PushDownPredicate` rule: when this rule 
encounters a filter on top of an Aggregate operator, e.g. `Filter(Agg(...))`, 
it removes the original filter and adds a new filter onto Aggregate's child, 
e.g. `Agg(Filter(...))`. This is sometimes okay, but the case above is a 
counterexample: because there is no explicit `GROUP BY`, we are implicitly 
computing a global aggregate over the entire table so the original filter was 
not acting like a `HAVING` clause filtering the number of groups: if we push 
this filter then it fails to actually reduce the cardinality of the Aggregate 
output, leading to the wrong answer.

In 2016 I fixed a similar problem involving invalid pushdowns of 
data-independent filters (filters which reference no columns of the filtered 
relation). There was additional discussion after my fix was merged which 
pointed out that my patch was an incomplete fix (see #15289), but it looks I 
must have either misunderstood the comment or forgot to follow up on the 
additional points raised there.

This patch fixes the problem by choosing to never push down filters in 
cases where there are no grouping expressions. Since there are no grouping 
keys, the only columns are aggregate columns and we can't push filters defined 
over aggregate results, so this change won't cause us to miss out on any 
legitimate pushdown opportunities.

## How was this patch tested?

New regression tests in `SQLQueryTestSuite` and `FilterPushdownSuite`.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/JoshRosen/spark 
SPARK-22983-dont-push-filters-beneath-aggs-with-empty-grouping-expressions

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/20180.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #20180


commit 5568d55255cbdd5313f7391755c7c39d34390c30
Author: Josh Rosen <joshrosen@...>
Date:   2018-01-07T05:14:56Z

Don't push filters beneath aggregates with empty grouping expressions




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20179: [SPARK-22982] Remove unsafe asynchronous close() ...

2018-01-07 Thread JoshRosen
GitHub user JoshRosen opened a pull request:

https://github.com/apache/spark/pull/20179

[SPARK-22982] Remove unsafe asynchronous close() call from 
FileDownloadChannel

## What changes were proposed in this pull request?

This patch fixes a severe asynchronous IO bug in Spark's Netty-based file 
transfer code. At a high-level, the problem is that an unsafe asynchronous 
`close()` of a pipe's source channel creates a race condition where file 
transfer code closes a file descriptor then attempts to read from it. If the 
closed file descriptor's number has been reused by an `open()` call then this 
invalid read may cause unrelated file operations to return incorrect results. 
**One manifestation of this problem is incorrect query results.**

For a high-level overview of how file download works, take a look at the 
control flow in `NettyRpcEnv.openChannel()`: this code creates a pipe to buffer 
results, then submits an asynchronous stream request to a lower-level 
TransportClient. The callback passes received data to the sink end of the pipe. 
The source end of the pipe is passed back to the caller of `openChannel()`. 
Thus `openChannel()` returns immediately and callers interact with the returned 
pipe source channel.

Because the underlying stream request is asynchronous, errors may occur 
after `openChannel()` has returned and after that method's caller has started 
to `read()` from the returned channel. For example, if a client requests an 
invalid stream from a remote server then the "stream does not exist" error may 
not be received from the remote server until after `openChannel()` has 
returned. In order to be able to propagate the "stream does not exist" error to 
the file-fetching application thread, this code wraps the pipe's source channel 
in a special `FileDownloadChannel` which adds an `setError(t: Throwable)` 
method, then calls this `setError()` method in the FileDownloadCallback's 
`onFailure` method.

It is possible for `FileDownloadChannel`'s `read()` and `setError()` 
methods to be called concurrently from different threads: the `setError()` 
method is called from within the Netty RPC system's stream callback handlers, 
while the `read()` methods are called from higher-level application code 
performing remote stream reads.

The problem lies in `setError()`: the existing code closed the wrapped pipe 
source channel. Because `read()` and `setError()` occur in different threads, 
this means it is possible for one thread to be calling `source.read()` while 
another asynchronously calls `source.close()`. Java's IO libraries do not 
guarantee that this will be safe and, in fact, it's possible for these 
operations to interleave in such a way that a lower-level `read()` system call 
occurs right after a `close()` call. In the best-case, this fails as a read of 
a closed file descriptor; in the worst-case, the file descriptor number has 
been re-used by an intervening `open()` operation and the read corrupts the 
result of an unrelated file IO operation being performed by a different thread.

The solution here is to remove the `stream.close()` call in `onError()`: 
the thread that is performing the `read()` calls is responsible for closing the 
stream in a `finally` block, so there's no need to close it here. If that 
thread is blocked in a `read()` then it will become unblocked when the sink end 
of the pipe is closed in `FileDownloadCallback.onFailure()`.

After making this change, we also need to refine the `read()` method to 
always check for a `setError()` result, even if the underlying channel `read()` 
call has succeeded.

This patch also makes a slight cleanup to a dodgy-looking `catch e: 
Exception` block to use a safer `try-finally` error handling idiom.

This bug was introduced in SPARK-11956 / #9941 and is present in Spark 
1.6.0+.

## How was this patch tested?

This fix was tested manually against a workload which non-deterministically 
hit this bug.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/JoshRosen/spark 
SPARK-22982-fix-unsafe-async-io-in-file-download-channel

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/20179.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #20179


commit 2138aa6d8225fb7c343c4a61dadc8c44e902e35b
Author: Josh Rosen <joshrosen@...>
Date:   2018-01-07T23:21:33Z

Add position checks to IndexShuffleBlockResolver.

commit 8e5ffa451f66dc64203dede68b9c0ac5fdc952cf
Author: Josh Rosen <joshrosen@...>
Date:   2018-01-07T23:22:44Z

Remove unsafe asynchronous close() call from FileDownloadChannel




---

-
To unsub

[GitHub] spark issue #19971: [SPARK-22774][SQL][Test] Add compilation check into TPCD...

2017-12-13 Thread JoshRosen
Github user JoshRosen commented on the issue:

https://github.com/apache/spark/pull/19971
  
Wait, nevermind: I see that this isn't actually running the queries, so 
setting `spark.sql.codegen.fallback=false` wouldn't be sufficient.

Separate from this PR, we might want to set 
`spark.sql.codegen.fallback=false` in more tests, though: it looks like it's on 
for Hive tests but might be disabled in some SQL suites.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19971: [SPARK-22774][SQL][Test] Add compilation check into TPCD...

2017-12-13 Thread JoshRosen
Github user JoshRosen commented on the issue:

https://github.com/apache/spark/pull/19971
  
Could we address this more broadly by ensuring that whole stage codegen 
fallback is disabled in tests?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19959: [SPARK-22766] Install R linter package in spark lib dire...

2017-12-12 Thread JoshRosen
Github user JoshRosen commented on the issue:

https://github.com/apache/spark/pull/19959
  
Because `LOCAL_LIB_LOC` seems to default to `$SPARK_HOME/R/lib`, will 
installing to this folder create the potential for `lintr` to be installed into 
a `lib` folder which gets packaged up in binary builds? Just want to 
double-check to make sure this won't mess up any packaging scripts which might 
be interacting with that directory. (I'm not super familiar with how R 
packaging works, so just wanted to double-check and be safe).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19788: [SPARK-9853][Core] Optimize shuffle fetch of contiguous ...

2017-11-28 Thread JoshRosen
Github user JoshRosen commented on the issue:

https://github.com/apache/spark/pull/19788
  
Is there an implicit assumption here that contiguous partitions' data can 
be decompressed / deserialized in a single stream? If the shuffled data is 
written with a non-relocatable serializer (Java serialization) or 
non-concatenatable  compression format then I'm not sure that you'll actually 
be able to successfully deserialize a multi-reducer range of the map output 
using a single decompression / deserialization stream.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19433: [SPARK-3162] [MLlib] Add local tree training for decisio...

2017-11-05 Thread JoshRosen
Github user JoshRosen commented on the issue:

https://github.com/apache/spark/pull/19433
  
jenkins retest this please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19077: [SPARK-21860][core]Improve memory reuse for heap ...

2017-08-31 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/19077#discussion_r136487281
  
--- Diff: 
common/unsafe/src/main/java/org/apache/spark/unsafe/memory/HeapMemoryAllocator.java
 ---
@@ -47,23 +47,29 @@ private boolean shouldPool(long size) {
 
   @Override
   public MemoryBlock allocate(long size) throws OutOfMemoryError {
-if (shouldPool(size)) {
+int arraySize = (int)((size + 7) / 8);
--- End diff --

You might be able to use `ByteAraryMethods.roundNumberOfBytesToNearestWord` 
for this, which we'e done for similar rounding elsewhere. Makes it a bit easier 
to spot what's happening.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19077: [SPARK-21860][core]Improve memory reuse for heap memory ...

2017-08-31 Thread JoshRosen
Github user JoshRosen commented on the issue:

https://github.com/apache/spark/pull/19077
  
Just curious: do you know where are we allocating these close-in-size 
chunks of memory? I understand the motivation, but just curious to know what's 
causing this pattern. I think the original idea here was that most allocations 
would come from a small set of sizes (usually the page size, or a configurable 
buffer size) and would not generally be arbitrary sized allocations.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19056: [SPARK-21765] Check that optimization doesn't affect isS...

2017-08-28 Thread JoshRosen
Github user JoshRosen commented on the issue:

https://github.com/apache/spark/pull/19056
  
(Dummy comment to test JIRA linkage)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18752: [SPARK-21551][Python] Increase timeout for PythonRDD.ser...

2017-08-08 Thread JoshRosen
Github user JoshRosen commented on the issue:

https://github.com/apache/spark/pull/18752
  
Jenkins, this is ok to test.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18752: [SPARK-21551][Python] Increase timeout for PythonRDD.ser...

2017-08-08 Thread JoshRosen
Github user JoshRosen commented on the issue:

https://github.com/apache/spark/pull/18752
  
This seems fine to me, especially since it's plausible that you might have 
a few-second GC pause in some situations. Let me go ahead and have Jenkins test 
this, then I'll merge it if tests pass (which I assume they will).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18814: [SPARK-21608][SPARK-9221][SQL] Window rangeBetween() API...

2017-08-03 Thread JoshRosen
Github user JoshRosen commented on the issue:

https://github.com/apache/spark/pull/18814
  

(test comment to test PR dashboard linking)
--





---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18814: [SPARK-21608][SPARK-9221][SQL] Window rangeBetween() API...

2017-08-03 Thread JoshRosen
Github user JoshRosen commented on the issue:

https://github.com/apache/spark/pull/18814
  
(test comment to test PR dashboard linking)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17180: [SPARK-19839][Core]release longArray in BytesToBytesMap

2017-07-30 Thread JoshRosen
Github user JoshRosen commented on the issue:

https://github.com/apache/spark/pull/17180
  
This seems fine to me. That said, the updated test case is a bit confusing,
but I don't think the original test was too clear to begin with. The
original test was using the `iterator()` method to make an assertion about
the internal state of the map, then was checking whether the pattern of
getting a buffer from the map, updating it, then getting it again from the
map would reflect the update. After your update to the test, the comment
doesn't quite align with the test anymore. The right way to fix this
involves splitting up the affected test into two separate tests. We can do
that in a followup, though, since I think that we still have sufficient
code coverage via other tests and this PR has already been under review for
months now so it'd be better to get it merged and move on.

On Sat, Jul 29, 2017 at 6:19 PM Xiao Li <notificati...@github.com> wrote:

> LGTM Wait for @JoshRosen <https://github.com/joshrosen> for final sign
> off.
>
> —
> You are receiving this because you were mentioned.
>
>
> Reply to this email directly, view it on GitHub
> <https://github.com/apache/spark/pull/17180#issuecomment-318870735>, or 
mute
> the thread
> 
<https://github.com/notifications/unsubscribe-auth/AADGPLGZbKkiLGK6qsrunDEpRj_Vhs0mks5sS9ojgaJpZM4MUoYt>
> .
>



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18645: [SPARK-14280][BUILD][WIP] Update change-version.sh and p...

2017-07-23 Thread JoshRosen
Github user JoshRosen commented on the issue:

https://github.com/apache/spark/pull/18645
  
Looking at the source compatibility changes made here, I was a little 
confused about why we didn't need to make many more changes. In principle, it 
seemed like the ambiguous overload issue affecting that `foreachPartition` call 
should have also impacted calls like `.filter(_ > 0)`, so I was originally 
expecting that we'd have to update hundreds of call sites.

It turns out that the final Scala 2.12 release has a nice feature which 
resolves this ambiguity in many cases. In the release notes at 
https://www.scala-lang.org/news/2.12.0/#lambda-syntax-for-sam-types, there's an 
example where even though there's a potentially ambiguous overload, 
"overloading resolution selects the one with the `Function1` argument type". 
This is explained in more detail at 
https://www.scala-lang.org/news/2.12.0/#sam-conversion-in-overloading-resolution.

Quoting:

> In Scala 2.12, both methods are applicable, therefore [overloading 
resolution](http://www.scala-lang.org/files/archive/spec/2.12/06-expressions.html#overloading-resolution)
 needs to pick the most specific alternative. The specification for [type 
compatibility](http://www.scala-lang.org/files/archive/spec/2.12/03-types.html#compatibility)
 has been updated to consider SAM conversion, so that the first alternative is 
more specific.

This explains why we didn't have to update all call sites. However, why did 
the `.foreachPartition` example break? I played around with this in the Scastie 
Scala paste bin and I think that I've spotted the problem: all of the cases 
where we had ambiguity seem to be cases where we're operating on Datasets in a 
generic way and have a type parameter representing the Dataset's type.

So, this means that

```
def f(ds: Dataset[Int]): Dataset[Int] = ds.filter(_ != null)
```

is unambiguous but

```
def f[T](ds: Dataset[T]): Dataset[T] = ds.filter(_ != null)
```

fails with a compiler error in Scala 2.12. You can try this out at 
https://scastie.scala-lang.org/JoshRosen/eBcxUGdORsiOQNULMUnAsw.

I took a look at the sections of the language spec linked in the quote 
above but it's not immediately clear to me whether this is a compiler bug or 
expected behavior (I'll have to take some more time to try to understand the 
spec).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18645: [SPARK-14280][BUILD][WIP] Update change-version.s...

2017-07-22 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/18645#discussion_r128892346
  
--- Diff: sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala 
---
@@ -353,7 +353,7 @@ class DatasetSuite extends QueryTest with 
SharedSQLContext {
   test("foreachPartition") {
 val ds = Seq(("a", 1), ("b", 2), ("c", 3)).toDS()
 val acc = sparkContext.longAccumulator
-ds.foreachPartition(_.foreach(v => acc.add(v._2)))
+ds.foreachPartition((it: Iterator[(String, Int)]) => it.foreach(v => 
acc.add(v._2)))
--- End diff --

I think this is primarily going to be an issue for end users who want to 
use an existing source tree to cross-compile for Scala 2.10, 2.11, and 2.12. 
Thus the pain of the source incompatibility would mostly be felt by 
library/package maintainers but it can be worked around as long as there's at 
least some common subset which is source compatible across all of those 
versions.

A similar ambiguity issue exists for people writing Spark code in Java 8 
and who use lambda syntax. The number of such users must also upgrade to Scala 
2.12 is probably fairly small and the number of those users who have a 
requirement to cross-build their Java 8 lambda syntax Spark code against 2.11 
and 2.12 is probably even smaller, so maybe this isn't a huge deal to accept 
the ambiguity and require those users to do a bit of work when upgrading. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18645: [SPARK-14280][BUILD][WIP] Update change-version.s...

2017-07-22 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/18645#discussion_r128891202
  
--- Diff: 
core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala ---
@@ -54,7 +54,10 @@ class TaskContextSuite extends SparkFunSuite with 
BeforeAndAfter with LocalSpark
 val rdd = new RDD[String](sc, List()) {
   override def getPartitions = Array[Partition](StubPartition(0))
   override def compute(split: Partition, context: TaskContext) = {
-context.addTaskCompletionListener(context => 
TaskContextSuite.completed = true)
+context.addTaskCompletionListener(new TaskCompletionListener {
--- End diff --

I think that you can avoid source incompatibilities for Scala users by 
removing the overloads which accept Scala functions and then adding in a 
package-level implicit conversion to convert from Scala functions back into our 
own custom trait / interface.

The trickiness here is that you need to preserve binary compatibility on 
Scala 2.10/2.11, so the removal of the overload needs to be done conditionally 
so that it only occurs when building with Scala 2.12. Rather than having a 
separate source tree for 2.12, I'd propose defining the removed overload in a 
mixin trait which comes from a separate source file and then configure the 
build to use different versions of that file for 2.10/2.11 and for 2.12.  

For details on this proposal, see 
https://docs.google.com/document/d/1P_wmH3U356f079AYgSsN53HKixuNdxSEvo8nw_tgLgM/edit,
 a document that I wrote in March 2016 which explores these source 
incompatibility difficulties.

Applying that idea here, the idea would be to remove the method

```
def addTaskCompletionListener(f: (TaskContext) => Unit)
```

and add a package-level implicit conversion from `TaskContext => Unit` to 
`TaskCompletionListener`, but to do this only in the 2.12 source tree / shim. 
This approach has some caveats and could potentially impact Java users who are 
doing weird things (violating the goal that Java Spark code is source and 
binary compatible with all Scala versions). See the linked doc for a full 
discussion of this problem.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18662: [SPARK-21444] Be more defensive when removing broadcasts...

2017-07-17 Thread JoshRosen
Github user JoshRosen commented on the issue:

https://github.com/apache/spark/pull/18662
  
Merged to master. Thanks for the quick reviews.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18662: [SPARK-21444] Be more defensive when removing bro...

2017-07-17 Thread JoshRosen
GitHub user JoshRosen opened a pull request:

https://github.com/apache/spark/pull/18662

[SPARK-21444] Be more defensive when removing broadcasts in MapOutputTracker

## What changes were proposed in this pull request?

In SPARK-21444, @sitalkedia reported an issue where the 
`Broadcast.destroy()` call in `MapOutputTracker`'s 
`ShuffleStatus.invalidateSerializedMapOutputStatusCache()` was failing with an 
`IOException`, causing the DAGScheduler to crash and bring down the entire 
driver.

This is a bug introduced by #17955. In the old code, we removed a broadcast 
variable by calling `BroadcastManager.unbroadcast` with `blocking=false`, but 
the new code simply calls `Broadcast.destroy()` which is capable of failing 
with an IOException in case certain blocking RPCs time out.

The fix implemented here is to replace this with a call to 
`destroy(blocking = false)` and to wrap the entire operation in 
`Utils.tryLogNonFatalError`.

## How was this patch tested?

I haven't written regression tests for this because it's really hard to 
inject mocks to simulate RPC failures here. Instead, this class of issue is 
probably best uncovered with more generalized error injection / network 
unreliability / fuzz testing tools.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/JoshRosen/spark SPARK-21444

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/18662.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #18662


commit a5ebcac4ceb14eb8342ce085965b370186b4aba9
Author: Josh Rosen <joshro...@databricks.com>
Date:   2017-07-17T23:56:45Z

Use blocking=false and add try logging.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17150: [SPARK-19810][BUILD][CORE] Remove support for Scala 2.10

2017-07-13 Thread JoshRosen
Github user JoshRosen commented on the issue:

https://github.com/apache/spark/pull/17150
  
@srowen, I'll disable those master branch 2.10 jobs and will update scripts 
to prevent them from being redeployed by accident.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18476: [SPARK-20858][DOC][MINOR] Document ListenerBus ev...

2017-06-30 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/18476#discussion_r125149417
  
--- Diff: docs/configuration.md ---
@@ -1398,6 +1398,15 @@ Apart from these, the following properties are also 
available, and may be useful
   
 
 
+  spark.scheduler.listenerbus.eventqueue.size
--- End diff --

If you're only documenting this in master then please use 
`spark.scheduler.listenerbus.eventqueue.capacity` instead (see definition in 
code and git blame for explanation).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18467: [SPARK-21253][Core]Disable spark.reducer.maxReqSi...

2017-06-29 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/18467#discussion_r124945147
  
--- Diff: 
core/src/main/scala/org/apache/spark/internal/config/package.scala ---
@@ -326,7 +326,7 @@ package object config {
   .doc("The blocks of a shuffle request will be fetched to disk when 
size of the request is " +
 "above this threshold. This is to avoid a giant request takes too 
much memory.")
   .bytesConf(ByteUnit.BYTE)
-  .createWithDefaultString("200m")
--- End diff --

Might want to put a `.internal()` here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



  1   2   3   4   5   6   7   8   9   10   >