[GitHub] spark issue #18554: [SPARK-21306][ML] OneVsRest should support setWeightCol
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/18554 **[Test build #79740 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/79740/testReport)** for PR 18554 at commit [`9ba0e2b`](https://github.com/apache/spark/commit/9ba0e2be0b6bb0b8012c6003984c307f569dda1e). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18554: [SPARK-21306][ML] OneVsRest should support setWei...
Github user facaiy commented on a diff in the pull request: https://github.com/apache/spark/pull/18554#discussion_r128158473 --- Diff: python/pyspark/ml/tests.py --- @@ -1255,6 +1255,17 @@ def test_output_columns(self): output = model.transform(df) self.assertEqual(output.columns, ["label", "features", "prediction"]) +def test_support_for_weightCol(self): --- End diff -- Sure. Use DecisionTreeClassifier to test. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18468: [SPARK-20873][SQL] Creat CachedBatchColumnVector ...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/18468#discussion_r128156883 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala --- @@ -326,6 +329,84 @@ class ColumnarBatchSuite extends SparkFunSuite { }} } + test("CachedBatch primitive Apis") { +(BooleanType :: IntegerType :: DoubleType :: Nil).foreach { dataType => { + val columnBuilder = ColumnBuilderHelper(dataType, 1024, "col", true) + val row = new SpecificInternalRow(Array(dataType)) + for (i <- 0 until 16) { +dataType match { + case _ : BooleanType => row.setBoolean(0, i % 2 == 0) + case _ : IntegerType => row.setInt(0, i) + case _ : DoubleType => row.setDouble(0, i) +} +columnBuilder.appendFrom(row, 0) + } + + val column = new CachedBatchColumnVector( +JavaUtils.bufferToArray(columnBuilder.build), 1024, dataType) + + for (i <- 0 until 16) { +assert(column.isNullAt(i) == false) +dataType match { + case _ : BooleanType => assert(column.getBoolean(i) == (i % 2 == 0)) + case _ : IntegerType => assert(column.getInt(i) == i) + case _ : DoubleType => assert(column.getDouble(i) == i.toDouble) +} + } + + /* check row access order */ + val column2 = new CachedBatchColumnVector( +JavaUtils.bufferToArray(columnBuilder.build), 1024, dataType) + dataType match { +case _: BooleanType => + /* Row access must start with 0 */ + val e = intercept[UnsupportedOperationException] { +column2.getBoolean(1) + } + assert(e.getMessage.startsWith("Row access order must be sequentially ascending.")) +case _: IntegerType => + /* Row access order must be sequential */ --- End diff -- I think we don't need to be so strict. The rule is, users can't jump back and read values, but other than that is OK, e.g. `getInt(0), getInt(0), getInt(10), getInt(11)`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18503: [SPARK-21271][SQL] Ensure Unsafe.sizeInBytes is a...
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/18503#discussion_r128156305 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala --- @@ -479,6 +479,61 @@ class StreamSuite extends StreamTest { CheckAnswer((1, 2), (2, 2), (3, 2))) } + testQuietly("store to and recover from a checkpoint") { --- End diff -- This test also checks whether the length of checkpoints is a multiple of 8 or not. Does it make sense? Or, is there any other test suite to check the length? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18444: [SPARK-16542][SQL][PYSPARK] Fix bugs about types that re...
Github user ueshin commented on the issue: https://github.com/apache/spark/pull/18444 LGTM, too. pending Jenkins. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18444: [SPARK-16542][SQL][PYSPARK] Fix bugs about types that re...
Github user HyukjinKwon commented on the issue: https://github.com/apache/spark/pull/18444 Okay. LGTM. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18468: [SPARK-20873][SQL] Creat CachedBatchColumnVector ...
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/18468#discussion_r128152848 --- Diff: sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/CachedBatchColumnVector.java --- @@ -0,0 +1,416 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.vectorized; + +import java.nio.ByteBuffer; + +import org.apache.spark.memory.MemoryMode; +import org.apache.spark.sql.execution.columnar.*; +import org.apache.spark.sql.types.*; +import org.apache.spark.unsafe.types.UTF8String; + +/** + * A column backed by an in memory JVM array. --- End diff -- Sure, done --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18468: [SPARK-20873][SQL] Creat CachedBatchColumnVector ...
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/18468#discussion_r128152870 --- Diff: sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/CachedBatchColumnVector.java --- @@ -0,0 +1,416 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.vectorized; + +import java.nio.ByteBuffer; + +import org.apache.spark.memory.MemoryMode; +import org.apache.spark.sql.execution.columnar.*; +import org.apache.spark.sql.types.*; +import org.apache.spark.unsafe.types.UTF8String; + +/** + * A column backed by an in memory JVM array. + */ +public final class CachedBatchColumnVector extends ColumnVector { + + // keep compressed data + private byte[] buffer; + + // accessor for a column + private ColumnAccessor columnAccessor; + + // a row where the compressed data is extracted + private ColumnVector columnVector; + + // an accessor uses only row 0 in columnVector + private final int ROWID = 0; + + // Keep row id that was previously accessed + private int previousRowId = -1; + + + public CachedBatchColumnVector(byte[] buffer, int numRows, DataType type) { +super(numRows, DataTypes.NullType, MemoryMode.ON_HEAP); +initialize(buffer, type); +reserveInternal(numRows); +reset(); + } + + @Override + public long valuesNativeAddress() { +throw new RuntimeException("Cannot get native address for on heap column"); + } + @Override + public long nullsNativeAddress() { +throw new RuntimeException("Cannot get native address for on heap column"); + } + + @Override + public void close() { + } + + private void setColumnAccessor() { +ByteBuffer byteBuffer = ByteBuffer.wrap(buffer); +columnAccessor = ColumnAccessor$.MODULE$.apply(type, byteBuffer); + } + + // call extractTo() before getting actual data + private void prepareAccess(int rowId) { +if (previousRowId + 1 == rowId) { + assert (columnAccessor.hasNext()); + columnAccessor.extractTo(columnVector, ROWID); + previousRowId = rowId; +} else if (previousRowId != rowId) { + throw new UnsupportedOperationException("Row access order must be sequentially ascending." + +" Internal row " + rowId + "is accessed after internal row "+ previousRowId + "was accessed."); +} + } + + // + // APIs dealing with nulls + // + + @Override + public void putNotNull(int rowId) { +throw new UnsupportedOperationException(); + } + + @Override + public void putNull(int rowId) { +throw new UnsupportedOperationException(); + } + + @Override + public void putNulls(int rowId, int count) { +throw new UnsupportedOperationException(); + } + + @Override + public void putNotNulls(int rowId, int count) { +throw new UnsupportedOperationException(); + } + + @Override + public boolean isNullAt(int rowId) { +prepareAccess(rowId); +return columnVector.isNullAt(ROWID); + } + + // + // APIs dealing with Booleans + // + + @Override + public void putBoolean(int rowId, boolean value) { +throw new UnsupportedOperationException(); + } + + @Override + public void putBooleans(int rowId, int count, boolean value) { +throw new UnsupportedOperationException(); + } + + @Override + public boolean getBoolean(int rowId) { +prepareAccess(rowId); +return columnVector.getBoolean(ROWID); + } + + @Override + public boolean[] getBooleans(int rowId, int count) { +throw new UnsupportedOperationException(); + } + + // + + // + // APIs dealing with Bytes + // + + @Override + public void putByte(int rowId, byte value) { +
[GitHub] spark issue #18468: [SPARK-20873][SQL] Creat CachedBatchColumnVector to abst...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/18468 **[Test build #79739 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/79739/testReport)** for PR 18468 at commit [`b83dedb`](https://github.com/apache/spark/commit/b83dedbb459dd1cea4810fb758f9e9d9cb592342). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18388: [SPARK-21175] Reject OpenBlocks when memory short...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/18388#discussion_r128151097 --- Diff: common/network-common/src/main/java/org/apache/spark/network/server/OneForOneStreamManager.java --- @@ -25,6 +25,9 @@ import com.google.common.base.Preconditions; import io.netty.channel.Channel; +import org.apache.commons.lang3.tuple.ImmutablePair; +import org.apache.commons.lang3.tuple.Pair; + --- End diff -- nit: remove this blank line --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18388: [SPARK-21175] Reject OpenBlocks when memory short...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/18388#discussion_r128151076 --- Diff: common/network-common/src/main/java/org/apache/spark/network/server/OneForOneStreamManager.java --- @@ -139,6 +153,32 @@ public void checkAuthorization(TransportClient client, long streamId) { } } + @Override + public void chunkSent(Object streamId) { --- End diff -- shall we accept a `long streamId` here? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18468: [SPARK-20873][SQL] Creat CachedBatchColumnVector ...
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/18468#discussion_r128149909 --- Diff: sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/CachedBatchColumnVector.java --- @@ -0,0 +1,416 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.vectorized; + +import java.nio.ByteBuffer; + +import org.apache.spark.memory.MemoryMode; +import org.apache.spark.sql.execution.columnar.*; +import org.apache.spark.sql.types.*; +import org.apache.spark.unsafe.types.UTF8String; + +/** + * A column backed by an in memory JVM array. + */ +public final class CachedBatchColumnVector extends ColumnVector { + + // keep compressed data + private byte[] buffer; + + // accessor for a column + private ColumnAccessor columnAccessor; + + // a row where the compressed data is extracted + private ColumnVector columnVector; + + // an accessor uses only row 0 in columnVector + private final int ROWID = 0; + + // Keep row id that was previously accessed + private int previousRowId = -1; + + + public CachedBatchColumnVector(byte[] buffer, int numRows, DataType type) { +super(numRows, DataTypes.NullType, MemoryMode.ON_HEAP); +initialize(buffer, type); +reserveInternal(numRows); +reset(); + } + + @Override + public long valuesNativeAddress() { +throw new RuntimeException("Cannot get native address for on heap column"); + } + @Override + public long nullsNativeAddress() { +throw new RuntimeException("Cannot get native address for on heap column"); + } + + @Override + public void close() { + } + + private void setColumnAccessor() { +ByteBuffer byteBuffer = ByteBuffer.wrap(buffer); +columnAccessor = ColumnAccessor$.MODULE$.apply(type, byteBuffer); + } + + // call extractTo() before getting actual data + private void prepareAccess(int rowId) { +if (previousRowId + 1 == rowId) { --- End diff -- yes, we can do it. See line 78. It means that `extractTo()` must be called only once for a given rowId. Now, we also support that `isNullAt(0), getInt(0)`, too. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18388: [SPARK-21175] Reject OpenBlocks when memory short...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/18388#discussion_r128149914 --- Diff: common/network-common/src/main/java/org/apache/spark/network/server/OneForOneStreamManager.java --- @@ -53,9 +56,13 @@ // that the caller only requests each chunk one at a time, in order. int curChunk = 0; +// Used to keep track of the number of chunks being transferred and not finished yet. +AtomicLong chunksBeingTransferred; --- End diff -- according to you usage, seems `volatile long chunksBeingTransferred` is enough? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18656: [SPARK-21441][SQL]Incorrect Codegen in SortMergeJoinExec...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/18656 LGTM, can you update the PR description? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18656: [SPARK-21441][SQL]Incorrect Codegen in SortMergeJoinExec...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/18656 **[Test build #79738 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/79738/testReport)** for PR 18656 at commit [`1161ffd`](https://github.com/apache/spark/commit/1161ffdf879b287f03065ac9e3661ffddd3b64f3). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18656: [SPARK-21441][SQL]Incorrect Codegen in SortMergeJoinExec...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/18656 ok to test --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18655: [SPARK-21440][SQL][PYSPARK] Refactor ArrowConverters and...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/18655 **[Test build #79737 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/79737/testReport)** for PR 18655 at commit [`b5988f9`](https://github.com/apache/spark/commit/b5988f9a223de407b7709f239fca672bb02b60aa). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18678: [SPARK-21464][SS] Minimize deprecation warnings caused b...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/18678 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/79735/ Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18678: [SPARK-21464][SS] Minimize deprecation warnings caused b...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/18678 Merged build finished. Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18678: [SPARK-21464][SS] Minimize deprecation warnings caused b...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/18678 **[Test build #79735 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/79735/testReport)** for PR 18678 at commit [`41d550f`](https://github.com/apache/spark/commit/41d550f599afba023ede3e8a4f8e5af910e440b9). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18655: [SPARK-21440][SQL][PYSPARK] Refactor ArrowConverters and...
Github user ueshin commented on the issue: https://github.com/apache/spark/pull/18655 I see, I'll move files back to `arrow` package. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18655: [SPARK-21440][SQL][PYSPARK] Refactor ArrowConvert...
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/18655#discussion_r128146875 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/arrow/ArrowConvertersSuite.scala --- @@ -391,6 +392,85 @@ class ArrowConvertersSuite extends SharedSQLContext with BeforeAndAfterAll { collectAndValidate(df, json, "floating_point-double_precision.json") } + ignore("decimal conversion") { --- End diff -- Oh, I'm sorry, I should have mentioned it. It seems like `JsonFileReader` doesn't support DecimalType, so I ignored it for now. But now I'm thinking that If Arrow 0.4.0 has a bug for the decimal type as you said, should I remove decimal type support from this pr and add support in the following prs? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18655: [SPARK-21440][SQL][PYSPARK] Refactor ArrowConvert...
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/18655#discussion_r128146851 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala --- @@ -55,145 +51,55 @@ private[sql] class ArrowPayload private[arrow] (payload: Array[Byte]) extends Se def asPythonSerializable: Array[Byte] = payload } -private[sql] object ArrowPayload { - - /** - * Create an ArrowPayload from an ArrowRecordBatch and Spark schema. - */ - def apply( - batch: ArrowRecordBatch, - schema: StructType, - allocator: BufferAllocator): ArrowPayload = { -new ArrowPayload(ArrowConverters.batchToByteArray(batch, schema, allocator)) - } -} - private[sql] object ArrowConverters { /** - * Map a Spark DataType to ArrowType. - */ - private[arrow] def sparkTypeToArrowType(dataType: DataType): ArrowType = { -dataType match { - case BooleanType => ArrowType.Bool.INSTANCE - case ShortType => new ArrowType.Int(8 * ShortType.defaultSize, true) - case IntegerType => new ArrowType.Int(8 * IntegerType.defaultSize, true) - case LongType => new ArrowType.Int(8 * LongType.defaultSize, true) - case FloatType => new ArrowType.FloatingPoint(FloatingPointPrecision.SINGLE) - case DoubleType => new ArrowType.FloatingPoint(FloatingPointPrecision.DOUBLE) - case ByteType => new ArrowType.Int(8, true) - case StringType => ArrowType.Utf8.INSTANCE - case BinaryType => ArrowType.Binary.INSTANCE - case _ => throw new UnsupportedOperationException(s"Unsupported data type: $dataType") -} - } - - /** - * Convert a Spark Dataset schema to Arrow schema. - */ - private[arrow] def schemaToArrowSchema(schema: StructType): Schema = { -val arrowFields = schema.fields.map { f => - new Field(f.name, f.nullable, sparkTypeToArrowType(f.dataType), List.empty[Field].asJava) -} -new Schema(arrowFields.toList.asJava) - } - - /** * Maps Iterator from InternalRow to ArrowPayload. Limit ArrowRecordBatch size in ArrowPayload * by setting maxRecordsPerBatch or use 0 to fully consume rowIter. */ private[sql] def toPayloadIterator( rowIter: Iterator[InternalRow], schema: StructType, - maxRecordsPerBatch: Int): Iterator[ArrowPayload] = { -new Iterator[ArrowPayload] { - private val _allocator = new RootAllocator(Long.MaxValue) - private var _nextPayload = if (rowIter.nonEmpty) convert() else null + maxRecordsPerBatch: Int, + context: TaskContext): Iterator[ArrowPayload] = { - override def hasNext: Boolean = _nextPayload != null +val arrowSchema = ArrowUtils.toArrowSchema(schema) +val allocator = + ArrowUtils.rootAllocator.newChildAllocator("toPayloadIterator", 0, Long.MaxValue) - override def next(): ArrowPayload = { -val obj = _nextPayload -if (hasNext) { - if (rowIter.hasNext) { -_nextPayload = convert() - } else { -_allocator.close() -_nextPayload = null - } -} -obj - } - - private def convert(): ArrowPayload = { -val batch = internalRowIterToArrowBatch(rowIter, schema, _allocator, maxRecordsPerBatch) -ArrowPayload(batch, schema, _allocator) - } -} - } - - /** - * Iterate over InternalRows and write to an ArrowRecordBatch, stopping when rowIter is consumed - * or the number of records in the batch equals maxRecordsInBatch. If maxRecordsPerBatch is 0, - * then rowIter will be fully consumed. - */ - private def internalRowIterToArrowBatch( - rowIter: Iterator[InternalRow], - schema: StructType, - allocator: BufferAllocator, - maxRecordsPerBatch: Int = 0): ArrowRecordBatch = { - -val columnWriters = schema.fields.zipWithIndex.map { case (field, ordinal) => - ColumnWriter(field.dataType, ordinal, allocator).init() -} +val root = VectorSchemaRoot.create(arrowSchema, allocator) +val arrowWriter = ArrowWriter.create(root) -val writerLength = columnWriters.length -var recordsInBatch = 0 -while (rowIter.hasNext && (maxRecordsPerBatch <= 0 || recordsInBatch < maxRecordsPerBatch)) { - val row = rowIter.next() - var i = 0 - while (i < writerLength) { -columnWriters(i).write(row) -i += 1 - } - recordsInBatch += 1 +context.addTaskCompletionListener { _ => + root.close() + allocator.close()
[GitHub] spark pull request #18655: [SPARK-21440][SQL][PYSPARK] Refactor ArrowConvert...
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/18655#discussion_r128146856 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/vectorized/ArrowWriter.scala --- @@ -0,0 +1,405 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.vectorized + +import scala.collection.JavaConverters._ + +import org.apache.arrow.vector._ +import org.apache.arrow.vector.complex._ +import org.apache.arrow.vector.util.DecimalUtility + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.SpecializedGetters +import org.apache.spark.sql.types._ + +object ArrowWriter { + + def create(schema: StructType): ArrowWriter = { +val arrowSchema = ArrowUtils.toArrowSchema(schema) +val root = VectorSchemaRoot.create(arrowSchema, ArrowUtils.rootAllocator) +create(root) + } + + def create(root: VectorSchemaRoot): ArrowWriter = { +val children = root.getFieldVectors().asScala.map { vector => + vector.allocateNew() + createFieldWriter(vector) +} +new ArrowWriter(root, children.toArray) + } + + private def createFieldWriter(vector: ValueVector): ArrowFieldWriter = { +val field = vector.getField() +ArrowUtils.fromArrowField(field) match { + case BooleanType => +new BooleanWriter(vector.asInstanceOf[NullableBitVector]) + case ByteType => +new ByteWriter(vector.asInstanceOf[NullableTinyIntVector]) + case ShortType => +new ShortWriter(vector.asInstanceOf[NullableSmallIntVector]) + case IntegerType => +new IntegerWriter(vector.asInstanceOf[NullableIntVector]) + case LongType => +new LongWriter(vector.asInstanceOf[NullableBigIntVector]) + case FloatType => +new FloatWriter(vector.asInstanceOf[NullableFloat4Vector]) + case DoubleType => +new DoubleWriter(vector.asInstanceOf[NullableFloat8Vector]) + case DecimalType.Fixed(precision, scale) => +new DecimalWriter(vector.asInstanceOf[NullableDecimalVector], precision, scale) + case StringType => +new StringWriter(vector.asInstanceOf[NullableVarCharVector]) + case BinaryType => +new BinaryWriter(vector.asInstanceOf[NullableVarBinaryVector]) + case ArrayType(_, _) => +val v = vector.asInstanceOf[ListVector] +val elementVector = createFieldWriter(v.getDataVector()) +new ArrayWriter(v, elementVector) + case StructType(_) => +val v = vector.asInstanceOf[NullableMapVector] +val children = (0 until v.size()).map { ordinal => + createFieldWriter(v.getChildByOrdinal(ordinal)) +} +new StructWriter(v, children.toArray) +} + } +} + +class ArrowWriter( +val root: VectorSchemaRoot, +fields: Array[ArrowFieldWriter]) { + + def schema: StructType = StructType(fields.map { f => +StructField(f.name, f.dataType, f.nullable) + }) + + private var count: Int = 0 + + def write(row: InternalRow): Unit = { +var i = 0 +while (i < fields.size) { + fields(i).write(row, i) + i += 1 +} +count += 1 + } + + def finish(): Unit = { +root.setRowCount(count) +fields.foreach(_.finish()) + } + + def reset(): Unit = { +root.setRowCount(0) +count = 0 +fields.foreach(_.reset()) + } +} + +private[sql] abstract class ArrowFieldWriter { + + def valueVector: ValueVector + def valueMutator: ValueVector.Mutator + + def name: String = valueVector.getField().getName() + def dataType: DataType = ArrowUtils.fromArrowField(valueVector.getField()) + def nullable: Boolean = valueVector.getField().isNullable() + + def setNull(): Unit
[GitHub] spark pull request #18655: [SPARK-21440][SQL][PYSPARK] Refactor ArrowConvert...
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/18655#discussion_r128146843 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala --- @@ -55,145 +51,55 @@ private[sql] class ArrowPayload private[arrow] (payload: Array[Byte]) extends Se def asPythonSerializable: Array[Byte] = payload } -private[sql] object ArrowPayload { - - /** - * Create an ArrowPayload from an ArrowRecordBatch and Spark schema. - */ - def apply( - batch: ArrowRecordBatch, - schema: StructType, - allocator: BufferAllocator): ArrowPayload = { -new ArrowPayload(ArrowConverters.batchToByteArray(batch, schema, allocator)) - } -} - private[sql] object ArrowConverters { /** - * Map a Spark DataType to ArrowType. - */ - private[arrow] def sparkTypeToArrowType(dataType: DataType): ArrowType = { -dataType match { - case BooleanType => ArrowType.Bool.INSTANCE - case ShortType => new ArrowType.Int(8 * ShortType.defaultSize, true) - case IntegerType => new ArrowType.Int(8 * IntegerType.defaultSize, true) - case LongType => new ArrowType.Int(8 * LongType.defaultSize, true) - case FloatType => new ArrowType.FloatingPoint(FloatingPointPrecision.SINGLE) - case DoubleType => new ArrowType.FloatingPoint(FloatingPointPrecision.DOUBLE) - case ByteType => new ArrowType.Int(8, true) - case StringType => ArrowType.Utf8.INSTANCE - case BinaryType => ArrowType.Binary.INSTANCE - case _ => throw new UnsupportedOperationException(s"Unsupported data type: $dataType") -} - } - - /** - * Convert a Spark Dataset schema to Arrow schema. - */ - private[arrow] def schemaToArrowSchema(schema: StructType): Schema = { -val arrowFields = schema.fields.map { f => - new Field(f.name, f.nullable, sparkTypeToArrowType(f.dataType), List.empty[Field].asJava) -} -new Schema(arrowFields.toList.asJava) - } - - /** * Maps Iterator from InternalRow to ArrowPayload. Limit ArrowRecordBatch size in ArrowPayload * by setting maxRecordsPerBatch or use 0 to fully consume rowIter. */ private[sql] def toPayloadIterator( rowIter: Iterator[InternalRow], schema: StructType, - maxRecordsPerBatch: Int): Iterator[ArrowPayload] = { -new Iterator[ArrowPayload] { - private val _allocator = new RootAllocator(Long.MaxValue) - private var _nextPayload = if (rowIter.nonEmpty) convert() else null + maxRecordsPerBatch: Int, + context: TaskContext): Iterator[ArrowPayload] = { - override def hasNext: Boolean = _nextPayload != null +val arrowSchema = ArrowUtils.toArrowSchema(schema) +val allocator = + ArrowUtils.rootAllocator.newChildAllocator("toPayloadIterator", 0, Long.MaxValue) - override def next(): ArrowPayload = { -val obj = _nextPayload -if (hasNext) { - if (rowIter.hasNext) { -_nextPayload = convert() - } else { -_allocator.close() -_nextPayload = null - } -} -obj - } - - private def convert(): ArrowPayload = { -val batch = internalRowIterToArrowBatch(rowIter, schema, _allocator, maxRecordsPerBatch) -ArrowPayload(batch, schema, _allocator) - } -} - } - - /** - * Iterate over InternalRows and write to an ArrowRecordBatch, stopping when rowIter is consumed - * or the number of records in the batch equals maxRecordsInBatch. If maxRecordsPerBatch is 0, - * then rowIter will be fully consumed. - */ - private def internalRowIterToArrowBatch( - rowIter: Iterator[InternalRow], - schema: StructType, - allocator: BufferAllocator, - maxRecordsPerBatch: Int = 0): ArrowRecordBatch = { - -val columnWriters = schema.fields.zipWithIndex.map { case (field, ordinal) => - ColumnWriter(field.dataType, ordinal, allocator).init() -} +val root = VectorSchemaRoot.create(arrowSchema, allocator) +val arrowWriter = ArrowWriter.create(root) -val writerLength = columnWriters.length -var recordsInBatch = 0 -while (rowIter.hasNext && (maxRecordsPerBatch <= 0 || recordsInBatch < maxRecordsPerBatch)) { - val row = rowIter.next() - var i = 0 - while (i < writerLength) { -columnWriters(i).write(row) -i += 1 - } - recordsInBatch += 1 +context.addTaskCompletionListener { _ => + root.close() + allocator.close()
[GitHub] spark issue #18670: [SPARK-21455][CORE]RpcFailure should be call on RpcRespo...
Github user zsxwing commented on the issue: https://github.com/apache/spark/pull/18670 @ConeyLiu the network layer doesn't know how to serialize Throwable, or in other words, it cannot use JavaSerializer in Spark core. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18444: [SPARK-16542][SQL][PYSPARK] Fix bugs about types that re...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/18444 **[Test build #79736 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/79736/testReport)** for PR 18444 at commit [`88091ea`](https://github.com/apache/spark/commit/88091ea4bfd7ee6311302d3b68360df3f2065f51). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18656: [SPARK-21441][SQL]Incorrect Codegen in SortMergeJoinExec...
Github user viirya commented on the issue: https://github.com/apache/spark/pull/18656 @cloud-fan Can you help trigger the jenkins test for this? Thanks. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18444: [SPARK-16542][SQL][PYSPARK] Fix bugs about types that re...
Github user zasdfgbnm commented on the issue: https://github.com/apache/spark/pull/18444 @HyukjinKwon Take a look at my newest commit. I think I find a better way to solve the problem that keeps all the hacking code for `SPARK-21465` in a single place, making it easier to be removed in the future. What do you think? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18487: [SPARK-21243][Core] Limit no. of map outputs in a...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/18487#discussion_r128145633 --- Diff: core/src/main/scala/org/apache/spark/internal/config/package.scala --- @@ -321,6 +321,17 @@ package object config { .intConf .createWithDefault(3) + private[spark] val REDUCER_MAX_BLOCKS_IN_FLIGHT_PER_ADDRESS = +ConfigBuilder("spark.reducer.maxBlocksInFlightPerAddress") + .doc("This configuration limits the number of remote blocks being fetched per reduce task" + +" from a given host port. When a large number of blocks are being requested from a given" + +" address in a single fetch or simultaneously, this could crash the serving executor or" + +" Node Manager. This is especially useful to reduce the load on the Node Manager when" + +" external shuffle is enabled. You can mitigate the issue by setting it to a lower value.") + .intConf + .checkValue(_ > 0, "The max no. of blocks in flight cannot be non-positive.") + .createWithDefault(Int.MaxValue) --- End diff -- cc @tgravescs shall we change the default value to 20 or something? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18670: [SPARK-21455][CORE]RpcFailure should be call on RpcRespo...
Github user ConeyLiu commented on the issue: https://github.com/apache/spark/pull/18670 Ok, thanks for the answer. I think we can use `ByteBuffer` to solve the wire compatibility problem, maybe I'm wrong. I think we should change this, because we also using `onFaliure` to send failure message, as follow: ```scala respond(new RpcFailure(req.requestId, Throwables.getStackTraceAsString(e))); ``` or ```scala (e) => callback.onFailure(e) ``` Then the client will receive a error messages wraped in `RuntimeException`. If all these do not need to change, I can close it. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18468: [SPARK-20873][SQL] Creat CachedBatchColumnVector ...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/18468#discussion_r128144943 --- Diff: sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/CachedBatchColumnVector.java --- @@ -0,0 +1,416 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.vectorized; + +import java.nio.ByteBuffer; + +import org.apache.spark.memory.MemoryMode; +import org.apache.spark.sql.execution.columnar.*; +import org.apache.spark.sql.types.*; +import org.apache.spark.unsafe.types.UTF8String; + +/** + * A column backed by an in memory JVM array. + */ +public final class CachedBatchColumnVector extends ColumnVector { + + // keep compressed data + private byte[] buffer; + + // accessor for a column + private ColumnAccessor columnAccessor; + + // a row where the compressed data is extracted + private ColumnVector columnVector; + + // an accessor uses only row 0 in columnVector + private final int ROWID = 0; + + // Keep row id that was previously accessed + private int previousRowId = -1; + + + public CachedBatchColumnVector(byte[] buffer, int numRows, DataType type) { +super(numRows, DataTypes.NullType, MemoryMode.ON_HEAP); +initialize(buffer, type); +reserveInternal(numRows); +reset(); + } + + @Override + public long valuesNativeAddress() { +throw new RuntimeException("Cannot get native address for on heap column"); + } + @Override + public long nullsNativeAddress() { +throw new RuntimeException("Cannot get native address for on heap column"); + } + + @Override + public void close() { + } + + private void setColumnAccessor() { +ByteBuffer byteBuffer = ByteBuffer.wrap(buffer); +columnAccessor = ColumnAccessor$.MODULE$.apply(type, byteBuffer); + } + + // call extractTo() before getting actual data + private void prepareAccess(int rowId) { +if (previousRowId + 1 == rowId) { --- End diff -- we should allow `previousRowId == rowId`, as we are able to support `getInt(1), getInt(1), getInt(2)` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18468: [SPARK-20873][SQL] Creat CachedBatchColumnVector ...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/18468#discussion_r128144719 --- Diff: sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/CachedBatchColumnVector.java --- @@ -0,0 +1,416 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.vectorized; + +import java.nio.ByteBuffer; + +import org.apache.spark.memory.MemoryMode; +import org.apache.spark.sql.execution.columnar.*; +import org.apache.spark.sql.types.*; +import org.apache.spark.unsafe.types.UTF8String; + +/** + * A column backed by an in memory JVM array. + */ +public final class CachedBatchColumnVector extends ColumnVector { + + // keep compressed data + private byte[] buffer; + + // accessor for a column + private ColumnAccessor columnAccessor; + + // a row where the compressed data is extracted + private ColumnVector columnVector; + + // an accessor uses only row 0 in columnVector + private final int ROWID = 0; + + // Keep row id that was previously accessed + private int previousRowId = -1; + + + public CachedBatchColumnVector(byte[] buffer, int numRows, DataType type) { +super(numRows, DataTypes.NullType, MemoryMode.ON_HEAP); +initialize(buffer, type); +reserveInternal(numRows); +reset(); + } + + @Override + public long valuesNativeAddress() { +throw new RuntimeException("Cannot get native address for on heap column"); + } + @Override + public long nullsNativeAddress() { +throw new RuntimeException("Cannot get native address for on heap column"); + } + + @Override + public void close() { + } + + private void setColumnAccessor() { +ByteBuffer byteBuffer = ByteBuffer.wrap(buffer); +columnAccessor = ColumnAccessor$.MODULE$.apply(type, byteBuffer); + } + + // call extractTo() before getting actual data + private void prepareAccess(int rowId) { +if (previousRowId + 1 == rowId) { + assert (columnAccessor.hasNext()); + columnAccessor.extractTo(columnVector, ROWID); + previousRowId = rowId; +} else if (previousRowId != rowId) { + throw new UnsupportedOperationException("Row access order must be sequentially ascending." + +" Internal row " + rowId + "is accessed after internal row "+ previousRowId + "was accessed."); +} + } + + // + // APIs dealing with nulls + // + + @Override + public void putNotNull(int rowId) { +throw new UnsupportedOperationException(); + } + + @Override + public void putNull(int rowId) { +throw new UnsupportedOperationException(); + } + + @Override + public void putNulls(int rowId, int count) { +throw new UnsupportedOperationException(); + } + + @Override + public void putNotNulls(int rowId, int count) { +throw new UnsupportedOperationException(); + } + + @Override + public boolean isNullAt(int rowId) { +prepareAccess(rowId); +return columnVector.isNullAt(ROWID); + } + + // + // APIs dealing with Booleans + // + + @Override + public void putBoolean(int rowId, boolean value) { +throw new UnsupportedOperationException(); + } + + @Override + public void putBooleans(int rowId, int count, boolean value) { +throw new UnsupportedOperationException(); + } + + @Override + public boolean getBoolean(int rowId) { +prepareAccess(rowId); +return columnVector.getBoolean(ROWID); + } + + @Override + public boolean[] getBooleans(int rowId, int count) { +throw new UnsupportedOperationException(); + } + + // + + // + // APIs dealing with Bytes + // + + @Override + public void putByte(int rowId, byte value) { +
[GitHub] spark pull request #18651: [SPARK-21383][Core] Fix the YarnAllocator allocat...
Github user djvulee commented on a diff in the pull request: https://github.com/apache/spark/pull/18651#discussion_r128144194 --- Diff: resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala --- @@ -505,32 +508,37 @@ private[yarn] class YarnAllocator( if (numExecutorsRunning < targetNumExecutors) { if (launchContainers) { - launcherPool.execute(new Runnable { -override def run(): Unit = { - try { -new ExecutorRunnable( - Some(container), - conf, - sparkConf, - driverUrl, - executorId, - executorHostname, - executorMemory, - executorCores, - appAttemptId.getApplicationId.toString, - securityMgr, - localResources -).run() -updateInternalState() - } catch { -case NonFatal(e) => - logError(s"Failed to launch executor $executorId on container $containerId", e) - // Assigned container should be released immediately to avoid unnecessary resource - // occupation. - amClient.releaseAssignedContainer(containerId) + try { +numExecutorToBeLaunched += 1 +launcherPool.execute(new Runnable { + override def run(): Unit = { +try { + new ExecutorRunnable( +Some(container), +conf, +sparkConf, +driverUrl, +executorId, +executorHostname, +executorMemory, +executorCores, +appAttemptId.getApplicationId.toString, +securityMgr, +localResources + ).run() + updateInternalState() +} catch { + case NonFatal(e) => +logError(s"Failed to launch executor $executorId on container $containerId", e) +// Assigned container should be released immediately +// to avoid unnecessary resource occupation. +amClient.releaseAssignedContainer(containerId) +} } -} - }) +}) + } finally { +numExecutorToBeLaunched -= 1 --- End diff -- Yes, you're right. When I test the code by experiment, i decrease the `numExecutorToBeLaunched` in the `updateInternalState` function, but I later found this may impact the test. I will fix this soon. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18651: [SPARK-21383][Core] Fix the YarnAllocator allocat...
Github user djvulee commented on a diff in the pull request: https://github.com/apache/spark/pull/18651#discussion_r128143898 --- Diff: resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala --- @@ -82,6 +82,8 @@ private[yarn] class YarnAllocator( @volatile private var numExecutorsRunning = 0 + @volatile private var numExecutorToBeLaunched = 0 --- End diff -- OK,I will change the name. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18656: [SPARK-21441][SQL]Incorrect Codegen in SortMergeJoinExec...
Github user DonnyZone commented on the issue: https://github.com/apache/spark/pull/18656 Thanks for reviewing, I will add a test later. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18649: [SPARK-21395][SQL] Spark SQL hive-thriftserver doesn't r...
Github user debugger87 commented on the issue: https://github.com/apache/spark/pull/18649 @gatorsmile Could you please help me review this PR? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18468: [SPARK-20873][SQL] Creat CachedBatchColumnVector ...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/18468#discussion_r128143583 --- Diff: sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/CachedBatchColumnVector.java --- @@ -0,0 +1,416 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.vectorized; + +import java.nio.ByteBuffer; + +import org.apache.spark.memory.MemoryMode; +import org.apache.spark.sql.execution.columnar.*; +import org.apache.spark.sql.types.*; +import org.apache.spark.unsafe.types.UTF8String; + +/** + * A column backed by an in memory JVM array. --- End diff -- We should explicitly say that, this is a wrapper to read compressed data as ColumnVector in table cache. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18677: [SPARK-21273][SQL][Follow-up] Propagate logical p...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/18677 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18668: [SPARK-21451][SQL]get `spark.hadoop.*` properties...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/18668#discussion_r128143343 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala --- @@ -404,6 +404,13 @@ private[spark] object HiveUtils extends Logging { propMap.put(ConfVars.METASTORE_EVENT_LISTENERS.varname, "") propMap.put(ConfVars.METASTORE_END_FUNCTION_LISTENERS.varname, "") +// Copy any "spark.hadoop.foo=bar" system properties into conf as "foo=bar" --- End diff -- why do we should do so? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18677: [SPARK-21273][SQL][Follow-up] Propagate logical plan sta...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/18677 thanks, merging to master! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18677: [SPARK-21273][SQL][Follow-up] Propagate logical plan sta...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/18677 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/79733/ Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18677: [SPARK-21273][SQL][Follow-up] Propagate logical plan sta...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/18677 Merged build finished. Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18677: [SPARK-21273][SQL][Follow-up] Propagate logical plan sta...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/18677 **[Test build #79733 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/79733/testReport)** for PR 18677 at commit [`bc8257c`](https://github.com/apache/spark/commit/bc8257c4cfa73c844c2a4b8129327c69dbcc2710). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18444: [SPARK-16542][SQL][PYSPARK] Fix bugs about types ...
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/18444#discussion_r128142573 --- Diff: python/pyspark/sql/types.py --- @@ -938,12 +1016,17 @@ def _infer_type(obj): return MapType(_infer_type(key), _infer_type(value), True) else: return MapType(NullType(), NullType(), True) -elif isinstance(obj, (list, array)): +elif isinstance(obj, list): for v in obj: if v is not None: return ArrayType(_infer_type(obj[0]), True) else: return ArrayType(NullType(), True) +elif isinstance(obj, array): +if obj.typecode in _array_type_mappings: +return ArrayType(_array_type_mappings[obj.typecode](), False) +else: +raise TypeError("not supported type: array(%s)" % obj.typecode) --- End diff -- Sounds good to me, too. Let's discuss it in the next pr. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18656: [SPARK-21441][SQL]Incorrect Codegen in SortMergeJ...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/18656#discussion_r128142552 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala --- @@ -489,13 +489,13 @@ case class CollapseCodegenStages(conf: SQLConf) extends Rule[SparkPlan] { * Inserts an InputAdapter on top of those that do not support codegen. */ private def insertInputAdapter(plan: SparkPlan): SparkPlan = plan match { +case p if !supportCodegen(p) => + // collapse them recursively + InputAdapter(insertWholeStageCodegen(p)) case j @ SortMergeJoinExec(_, _, _, _, left, right) if j.supportCodegen => --- End diff -- `supportCodegen` will call `CodegenSupport.supportCodegen`, so `SortMergeJoinExec.supportCodegen` is called then. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18444: [SPARK-16542][SQL][PYSPARK] Fix bugs about types ...
Github user zasdfgbnm commented on a diff in the pull request: https://github.com/apache/spark/pull/18444#discussion_r128142511 --- Diff: python/pyspark/sql/types.py --- @@ -938,12 +1016,17 @@ def _infer_type(obj): return MapType(_infer_type(key), _infer_type(value), True) else: return MapType(NullType(), NullType(), True) -elif isinstance(obj, (list, array)): +elif isinstance(obj, list): for v in obj: if v is not None: return ArrayType(_infer_type(obj[0]), True) else: return ArrayType(NullType(), True) +elif isinstance(obj, array): +if obj.typecode in _array_type_mappings: +return ArrayType(_array_type_mappings[obj.typecode](), False) +else: +raise TypeError("not supported type: array(%s)" % obj.typecode) --- End diff -- https://issues.apache.org/jira/browse/SPARK-21465 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18656: [SPARK-21441]Incorrect Codegen in SortMergeJoinEx...
Github user DonnyZone commented on a diff in the pull request: https://github.com/apache/spark/pull/18656#discussion_r128142467 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala --- @@ -489,13 +489,13 @@ case class CollapseCodegenStages(conf: SQLConf) extends Rule[SparkPlan] { * Inserts an InputAdapter on top of those that do not support codegen. */ private def insertInputAdapter(plan: SparkPlan): SparkPlan = plan match { +case p if !supportCodegen(p) => + // collapse them recursively + InputAdapter(insertWholeStageCodegen(p)) case j @ SortMergeJoinExec(_, _, _, _, left, right) if j.supportCodegen => --- End diff -- Therefore, I think we should still verify it. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18677: [SPARK-21273][SQL][Follow-up] Propagate logical plan sta...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/18677 Merged build finished. Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18677: [SPARK-21273][SQL][Follow-up] Propagate logical plan sta...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/18677 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/79732/ Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18677: [SPARK-21273][SQL][Follow-up] Propagate logical plan sta...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/18677 **[Test build #79732 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/79732/testReport)** for PR 18677 at commit [`0834050`](https://github.com/apache/spark/commit/0834050236707fab658e5b88c98e72d7acc06b33). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18656: [SPARK-21441]Incorrect Codegen in SortMergeJoinEx...
Github user DonnyZone commented on a diff in the pull request: https://github.com/apache/spark/pull/18656#discussion_r128142370 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala --- @@ -489,13 +489,13 @@ case class CollapseCodegenStages(conf: SQLConf) extends Rule[SparkPlan] { * Inserts an InputAdapter on top of those that do not support codegen. */ private def insertInputAdapter(plan: SparkPlan): SparkPlan = plan match { +case p if !supportCodegen(p) => + // collapse them recursively + InputAdapter(insertWholeStageCodegen(p)) case j @ SortMergeJoinExec(_, _, _, _, left, right) if j.supportCodegen => --- End diff -- SortMergeJoinExec.supportCodegen checks whether `joinType.isInstanceOf[InnerLike]` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18668: [SPARK-21451][SQL]get `spark.hadoop.*` properties from s...
Github user yaooqinn commented on the issue: https://github.com/apache/spark/pull/18668 ping @cloud-fan @gatorsmile --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18634: [SPARK-21414] Refine SlidingWindowFunctionFrame t...
Github user jinxing64 commented on a diff in the pull request: https://github.com/apache/spark/pull/18634#discussion_r128142152 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/SQLWindowFunctionSuite.scala --- @@ -356,6 +356,42 @@ class SQLWindowFunctionSuite extends QueryTest with SharedSQLContext { spark.catalog.dropTempView("nums") } + test("window function: mutiple window expressions specified by range in a single expression") { +val nums = sparkContext.parallelize(1 to 10).map(x => (x, x % 2)).toDF("x", "y") +nums.createOrReplaceTempView("nums") --- End diff -- Sure, I will add it later today. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18649: [SPARK-21395][SQL] Spark SQL hive-thriftserver doesn't r...
Github user jerryshao commented on the issue: https://github.com/apache/spark/pull/18649 Sorry I'm not familiar with this part, I cannot give you a valid comment, you could ask others to help reviewing your patch 😄 . --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18673: [SPARK-21447][WEB UI] Spark history server fails to rend...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/18673 Merged build finished. Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17848: [SPARK-20586] [SQL] Add deterministic to ScalaUDF...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/17848#discussion_r128141710 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/UDFRegistration.scala --- @@ -104,21 +104,35 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends val inputTypes = (1 to x).foldRight("Nil")((i, s) => {s"ScalaReflection.schemaFor[A$i].dataType :: $s"}) println(s""" /** - * Register a Scala closure of ${x} arguments as user-defined function (UDF). + * Registers a Scala closure of ${x} arguments as user-defined function (UDF). * @tparam RT return type of UDF. * @since 1.3.0 */ def register[$typeTags](name: String, func: Function$x[$types]): UserDefinedFunction = { + registerUDF(name, func, deterministic = true) +} + +/** + * Registers a non-deterministic Scala closure of ${x} arguments as user-defined function (UDF). + * @tparam RT return type of UDF. + * @since 2.3.0 + */ +def registerNonDeterministic[$typeTags](name: String, func: Function$x[$types]): UserDefinedFunction = { + registerUDF(name, func, deterministic = false) +} + +private def registerUDF[$typeTags](name: String, func: Function$x[$types], deterministic: Boolean): UserDefinedFunction = { --- End diff -- can we make this public? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18673: [SPARK-21447][WEB UI] Spark history server fails to rend...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/18673 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/79731/ Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18444: [SPARK-16542][SQL][PYSPARK] Fix bugs about types ...
Github user zasdfgbnm commented on a diff in the pull request: https://github.com/apache/spark/pull/18444#discussion_r128141018 --- Diff: python/pyspark/sql/types.py --- @@ -938,12 +1016,17 @@ def _infer_type(obj): return MapType(_infer_type(key), _infer_type(value), True) else: return MapType(NullType(), NullType(), True) -elif isinstance(obj, (list, array)): +elif isinstance(obj, list): for v in obj: if v is not None: return ArrayType(_infer_type(obj[0]), True) else: return ArrayType(NullType(), True) +elif isinstance(obj, array): +if obj.typecode in _array_type_mappings: +return ArrayType(_array_type_mappings[obj.typecode](), False) +else: +raise TypeError("not supported type: array(%s)" % obj.typecode) --- End diff -- How about doing the following? 1. Make changes to tests as you suggest, make sure we clearly document why for python2 'L' is an exception as comments. 3. Open a new JIRA, explaining the issue about 'L' support. 4. Finish this PR --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18673: [SPARK-21447][WEB UI] Spark history server fails to rend...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/18673 **[Test build #79731 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/79731/testReport)** for PR 18673 at commit [`b82847e`](https://github.com/apache/spark/commit/b82847ea94839f05049e26dd4f9aa0d13ff63337). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18444: [SPARK-16542][SQL][PYSPARK] Fix bugs about types ...
Github user zasdfgbnm commented on a diff in the pull request: https://github.com/apache/spark/pull/18444#discussion_r128141384 --- Diff: python/pyspark/sql/types.py --- @@ -938,12 +1016,17 @@ def _infer_type(obj): return MapType(_infer_type(key), _infer_type(value), True) else: return MapType(NullType(), NullType(), True) -elif isinstance(obj, (list, array)): +elif isinstance(obj, list): for v in obj: if v is not None: return ArrayType(_infer_type(obj[0]), True) else: return ArrayType(NullType(), True) +elif isinstance(obj, array): +if obj.typecode in _array_type_mappings: +return ArrayType(_array_type_mappings[obj.typecode](), False) +else: +raise TypeError("not supported type: array(%s)" % obj.typecode) --- End diff -- OK. Let me do this. If it is possible, I would like to finish this PR before I travel. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18444: [SPARK-16542][SQL][PYSPARK] Fix bugs about types ...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/18444#discussion_r128141295 --- Diff: python/pyspark/sql/types.py --- @@ -938,12 +1016,17 @@ def _infer_type(obj): return MapType(_infer_type(key), _infer_type(value), True) else: return MapType(NullType(), NullType(), True) -elif isinstance(obj, (list, array)): +elif isinstance(obj, list): for v in obj: if v is not None: return ArrayType(_infer_type(obj[0]), True) else: return ArrayType(NullType(), True) +elif isinstance(obj, array): +if obj.typecode in _array_type_mappings: +return ArrayType(_array_type_mappings[obj.typecode](), False) +else: +raise TypeError("not supported type: array(%s)" % obj.typecode) --- End diff -- Sounds good to me. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18649: [SPARK-21395][SQL] Spark SQL hive-thriftserver doesn't r...
Github user debugger87 commented on the issue: https://github.com/apache/spark/pull/18649 @jerryshao Yes, it's just copied from SQLOperation in Hive. However, those code lines are the key point that HiveServer2 can return operation log to client via `TFetchResultsReq` which fetchType is `FetchType.LOG`. If hive-thriftserver in Spark SQL support complete HiveServer2 protocol, `registerCurrentOperationLog` should be introduced and consistent with Hive. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18677: [SPARK-21273][SQL][Follow-up] Propagate logical plan sta...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/18677 LGTM --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18654: [SPARK-21435][SQL] Empty files should be skipped ...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/18654 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18670: [SPARK-21455][CORE]RpcFailure should be call on RpcRespo...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/18670 +1 to keep wire compatibility. Just for curious, if we can re-design it, shall we use `RpcFailure` to send exception? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18656: [SPARK-21441]Incorrect Codegen in SortMergeJoinExec resu...
Github user viirya commented on the issue: https://github.com/apache/spark/pull/18656 Btw, can you also add a test for this? Thanks. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18654: [SPARK-21435][SQL] Empty files should be skipped while w...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/18654 LGTM, merging to master! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18656: [SPARK-21441]Incorrect Codegen in SortMergeJoinExec resu...
Github user viirya commented on the issue: https://github.com/apache/spark/pull/18656 And please also add SQL tag to the PR title, e.g., [SPARK-21441][SQL]. Thanks. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18670: [SPARK-21455][CORE]RpcFailure should be call on RpcRespo...
Github user vanzin commented on the issue: https://github.com/apache/spark/pull/18670 > why we don't let org.apache.spark.network.protocol.RpcFailure hold one Throwable Because that breaks wire compatibility, and you need to be able to talk to older versions of the library (e.g. an old shuffle service). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18656: [SPARK-21441]Incorrect Codegen in SortMergeJoinEx...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/18656#discussion_r128140400 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala --- @@ -489,13 +489,13 @@ case class CollapseCodegenStages(conf: SQLConf) extends Rule[SparkPlan] { * Inserts an InputAdapter on top of those that do not support codegen. */ private def insertInputAdapter(plan: SparkPlan): SparkPlan = plan match { +case p if !supportCodegen(p) => + // collapse them recursively + InputAdapter(insertWholeStageCodegen(p)) case j @ SortMergeJoinExec(_, _, _, _, left, right) if j.supportCodegen => --- End diff -- The previous pattern case already validates `j.supportCodegen`, we don't need to verify it again. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18444: [SPARK-16542][SQL][PYSPARK] Fix bugs about types ...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/18444#discussion_r128140334 --- Diff: python/pyspark/sql/types.py --- @@ -938,12 +1016,17 @@ def _infer_type(obj): return MapType(_infer_type(key), _infer_type(value), True) else: return MapType(NullType(), NullType(), True) -elif isinstance(obj, (list, array)): +elif isinstance(obj, list): for v in obj: if v is not None: return ArrayType(_infer_type(obj[0]), True) else: return ArrayType(NullType(), True) +elif isinstance(obj, array): +if obj.typecode in _array_type_mappings: +return ArrayType(_array_type_mappings[obj.typecode](), False) +else: +raise TypeError("not supported type: array(%s)" % obj.typecode) --- End diff -- > Should we add 'L' as exception for python2 in unsupported types tests, or do we just completely remove unsupported tests? I'd prefer 'L' as exception for Python 2. > Should we test 'L' for python 2? I don't think we should explicitly test this here. I guess we mistakenly happened to support this case as you said. I think I (we contributors) can't make this decision (by rule) to remove this 'L' support ("undefined support status") but I would like to push this PR forward and this is why I was thinking deferring to a separate PR. cc @ueshin, are you online now? WDYT removing out (unexpected I guess) 'L' support in Python 2? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18656: [SPARK-21441]Incorrect Codegen in SortMergeJoinExec resu...
Github user DonnyZone commented on the issue: https://github.com/apache/spark/pull/18656 I have validated both cases with and without CodegenFallback expressions for `SortMergeJoinExec`. The fix works well. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18444: [SPARK-16542][SQL][PYSPARK] Fix bugs about types ...
Github user zasdfgbnm commented on a diff in the pull request: https://github.com/apache/spark/pull/18444#discussion_r128138629 --- Diff: python/pyspark/sql/types.py --- @@ -938,12 +1016,17 @@ def _infer_type(obj): return MapType(_infer_type(key), _infer_type(value), True) else: return MapType(NullType(), NullType(), True) -elif isinstance(obj, (list, array)): +elif isinstance(obj, list): for v in obj: if v is not None: return ArrayType(_infer_type(obj[0]), True) else: return ArrayType(NullType(), True) +elif isinstance(obj, array): +if obj.typecode in _array_type_mappings: +return ArrayType(_array_type_mappings[obj.typecode](), False) +else: +raise TypeError("not supported type: array(%s)" % obj.typecode) --- End diff -- If we fall back to `_infer_type`, then there should be some dirty changes in test cases to make it pass: Consider the following question: 1. Should we add 'L' as exception for python2 in unsupported types tests, or do we just completely remove unsupported tests? 2. Should we test 'L' for python 2? I really like how the tests now are organized and these changes above will makes the test very messy. My opinion is, we are not changing the status of 'L' from "supported" to "unsupported", but from "undefined support status" to "unsupported". If changing from "undefined support status" to "unsupported" sounds bad, instead of making these changes to the test cases, I would rather to solve this problem now and keep the test cases clean. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18444: [SPARK-16542][SQL][PYSPARK] Fix bugs about types ...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/18444#discussion_r128135623 --- Diff: python/pyspark/sql/types.py --- @@ -938,12 +1016,17 @@ def _infer_type(obj): return MapType(_infer_type(key), _infer_type(value), True) else: return MapType(NullType(), NullType(), True) -elif isinstance(obj, (list, array)): +elif isinstance(obj, list): for v in obj: if v is not None: return ArrayType(_infer_type(obj[0]), True) else: return ArrayType(NullType(), True) +elif isinstance(obj, array): +if obj.typecode in _array_type_mappings: +return ArrayType(_array_type_mappings[obj.typecode](), False) +else: +raise TypeError("not supported type: array(%s)" % obj.typecode) --- End diff -- We could disallow or support this in a separate PR later. I would rather make a safe choice here right now. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18678: [SPARK-21464][SS] Minimize deprecation warnings caused b...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/18678 **[Test build #79735 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/79735/testReport)** for PR 18678 at commit [`41d550f`](https://github.com/apache/spark/commit/41d550f599afba023ede3e8a4f8e5af910e440b9). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18605: [SparkR][SPARK-21381]:SparkR: pass on setHandleInvalid f...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/18605 Merged build finished. Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18605: [SparkR][SPARK-21381]:SparkR: pass on setHandleInvalid f...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/18605 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/79734/ Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18605: [SparkR][SPARK-21381]:SparkR: pass on setHandleInvalid f...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/18605 **[Test build #79734 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/79734/testReport)** for PR 18605 at commit [`3ebb5cd`](https://github.com/apache/spark/commit/3ebb5cd548986fc4cc0201d95301b43fed51029a). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18678: [SPARK-21464][SS] Minimize deprecation warnings c...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/18678#discussion_r128135329 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/ProcessingTimeSuite.scala --- @@ -22,20 +22,22 @@ import java.util.concurrent.TimeUnit import scala.concurrent.duration._ import org.apache.spark.SparkFunSuite -import org.apache.spark.sql.streaming.ProcessingTime +import org.apache.spark.sql.streaming.{ProcessingTime, Trigger} class ProcessingTimeSuite extends SparkFunSuite { test("create") { -assert(ProcessingTime(10.seconds).intervalMs === 10 * 1000) -assert(ProcessingTime.create(10, TimeUnit.SECONDS).intervalMs === 10 * 1000) -assert(ProcessingTime("1 minute").intervalMs === 60 * 1000) -assert(ProcessingTime("interval 1 minute").intervalMs === 60 * 1000) - -intercept[IllegalArgumentException] { ProcessingTime(null: String) } -intercept[IllegalArgumentException] { ProcessingTime("") } -intercept[IllegalArgumentException] { ProcessingTime("invalid") } -intercept[IllegalArgumentException] { ProcessingTime("1 month") } -intercept[IllegalArgumentException] { ProcessingTime("1 year") } +def getIntervalMs(trigger: Trigger): Long = trigger.asInstanceOf[ProcessingTime].intervalMs --- End diff -- Reduced 10 lines of deprecation warning to 1 line. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18678: [SPARK-21464][SS] Minimize deprecation warnings c...
GitHub user tdas opened a pull request: https://github.com/apache/spark/pull/18678 [SPARK-21464][SS] Minimize deprecation warnings caused by ProcessingTime class ## What changes were proposed in this pull request? Use of `ProcessingTime` class was deprecated in favor of `Trigger.ProcessingTime` in Spark 2.2. However interval uses to ProcessingTime causes deprecation warnings during compilation. This cannot be avoided entirely as even though it is deprecated as a public API, ProcessingTime instances are used internally in TriggerExecutor. This PR is to minimize the warning by removing its uses from tests as much as possible. ## How was this patch tested? Existing tests. You can merge this pull request into a Git repository by running: $ git pull https://github.com/tdas/spark SPARK-21464 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/18678.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #18678 commit 41d550f599afba023ede3e8a4f8e5af910e440b9 Author: Tathagata DasDate: 2017-07-19T01:21:51Z Minimized usage of ProcessingTime class --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18654: [SPARK-21435][SQL] Empty files should be skipped ...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/18654#discussion_r128134922 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala --- @@ -236,7 +236,10 @@ object FileFormatWriter extends Logging { committer.setupTask(taskAttemptContext) val writeTask = - if (description.partitionColumns.isEmpty && description.bucketIdExpression.isEmpty) { + if (sparkPartitionId != 0 && !iterator.hasNext) { --- End diff -- cc @yhuai too who reviewed my similar PR before. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18444: [SPARK-16542][SQL][PYSPARK] Fix bugs about types ...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/18444#discussion_r128134574 --- Diff: python/pyspark/sql/types.py --- @@ -938,12 +1016,17 @@ def _infer_type(obj): return MapType(_infer_type(key), _infer_type(value), True) else: return MapType(NullType(), NullType(), True) -elif isinstance(obj, (list, array)): +elif isinstance(obj, list): for v in obj: if v is not None: return ArrayType(_infer_type(obj[0]), True) else: return ArrayType(NullType(), True) +elif isinstance(obj, array): +if obj.typecode in _array_type_mappings: +return ArrayType(_array_type_mappings[obj.typecode](), False) +else: +raise TypeError("not supported type: array(%s)" % obj.typecode) --- End diff -- Wait .. we could go decimal maybe ..? Let's just fall back to leave this case as is. I am quite sure that would be safer. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18444: [SPARK-16542][SQL][PYSPARK] Fix bugs about types ...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/18444#discussion_r128134072 --- Diff: python/pyspark/sql/types.py --- @@ -938,12 +1016,17 @@ def _infer_type(obj): return MapType(_infer_type(key), _infer_type(value), True) else: return MapType(NullType(), NullType(), True) -elif isinstance(obj, (list, array)): +elif isinstance(obj, list): for v in obj: if v is not None: return ArrayType(_infer_type(obj[0]), True) else: return ArrayType(NullType(), True) +elif isinstance(obj, array): +if obj.typecode in _array_type_mappings: +return ArrayType(_array_type_mappings[obj.typecode](), False) +else: +raise TypeError("not supported type: array(%s)" % obj.typecode) --- End diff -- Okay. Either way is fine to me (falling back to type inference or disallowing unmappable type). Please let me know if any reviewer thinks differently. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18676: [SPARK-21463] Allow userSpecifiedSchema to override part...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/18676 Merged build finished. Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18676: [SPARK-21463] Allow userSpecifiedSchema to override part...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/18676 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/79730/ Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18676: [SPARK-21463] Allow userSpecifiedSchema to override part...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/18676 **[Test build #79730 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/79730/testReport)** for PR 18676 at commit [`7cdc864`](https://github.com/apache/spark/commit/7cdc864f9b0c50ac8f9d877eb67820569c54776e). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18654: [SPARK-21435][SQL] Empty files should be skipped while w...
Github user xuanyuanking commented on the issue: https://github.com/apache/spark/pull/18654 ping @cloud-fan @HyukjinKwon --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18503: [SPARK-21271][SQL] Ensure Unsafe.sizeInBytes is a...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/18503#discussion_r128133369 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala --- @@ -350,20 +350,24 @@ private[state] class HDFSBackedStateStoreProvider extends StateStoreProvider wit throw new IOException( s"Error reading delta file $fileToRead of $this: key size cannot be $keySize") } else { - val keyRowBuffer = new Array[Byte](keySize) + // If key size in an existing file is not a multiple of 8, round it to multiple of 8 --- End diff -- BTW, we only need to do this for value, not key. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18503: [SPARK-21271][SQL] Ensure Unsafe.sizeInBytes is a...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/18503#discussion_r128133349 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala --- @@ -350,20 +350,24 @@ private[state] class HDFSBackedStateStoreProvider extends StateStoreProvider wit throw new IOException( s"Error reading delta file $fileToRead of $this: key size cannot be $keySize") } else { - val keyRowBuffer = new Array[Byte](keySize) + // If key size in an existing file is not a multiple of 8, round it to multiple of 8 --- End diff -- I don't think we can round. Assume the actual length of an unsafe row is 8, and previously we will append 4 bytes and have an unsafe row with 12 bytes, and save it to checkpoint. So here, when we reading old checkppint, we need to read 12 bytes, and set the length to 8. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18503: [SPARK-21271][SQL] Ensure Unsafe.sizeInBytes is a...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/18503#discussion_r128133007 --- Diff: sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java --- @@ -167,6 +167,7 @@ public UnsafeRow() {} */ public void pointTo(Object baseObject, long baseOffset, int sizeInBytes) { assert numFields >= 0 : "numFields (" + numFields + ") should >= 0"; +assert sizeInBytes % 8 == 0 : "sizeInBytes (" + sizeInBytes + ") should be a multiple of 8"; --- End diff -- I think we only need the assertion here, in `pointTo`, and in `setTotalSize`. Other places are just checking length for existing unsafe rows, which is unnecessary. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18670: [SPARK-21455][CORE]RpcFailure should be call on RpcRespo...
Github user ConeyLiu commented on the issue: https://github.com/apache/spark/pull/18670 Thanks for your reviewing. @vanzin @zsxwing I can understand what you mean. But if we really need a `Throwable` message in client, such as [NettyRpcEnv.scala#L205](https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala#L205), why we don't let `org.apache.spark.network.protocol.RpcFailure` hold one `Throwable` body message instead of a `String` error message. Because we reply back error message by `RpcResponseCallback.onSuccess`, the `onSuccess` and `onFailure` can have different behavior. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18444: [SPARK-16542][SQL][PYSPARK] Fix bugs about types ...
Github user zasdfgbnm commented on a diff in the pull request: https://github.com/apache/spark/pull/18444#discussion_r128132778 --- Diff: python/pyspark/sql/types.py --- @@ -938,12 +1016,17 @@ def _infer_type(obj): return MapType(_infer_type(key), _infer_type(value), True) else: return MapType(NullType(), NullType(), True) -elif isinstance(obj, (list, array)): +elif isinstance(obj, list): for v in obj: if v is not None: return ArrayType(_infer_type(obj[0]), True) else: return ArrayType(NullType(), True) +elif isinstance(obj, array): +if obj.typecode in _array_type_mappings: +return ArrayType(_array_type_mappings[obj.typecode](), False) +else: +raise TypeError("not supported type: array(%s)" % obj.typecode) --- End diff -- @HyukjinKwon what do you think? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18444: [SPARK-16542][SQL][PYSPARK] Fix bugs about types that re...
Github user HyukjinKwon commented on the issue: https://github.com/apache/spark/pull/18444 LGTM. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18444: [SPARK-16542][SQL][PYSPARK] Fix bugs about types ...
Github user zasdfgbnm commented on a diff in the pull request: https://github.com/apache/spark/pull/18444#discussion_r128132584 --- Diff: python/pyspark/sql/types.py --- @@ -938,12 +1016,17 @@ def _infer_type(obj): return MapType(_infer_type(key), _infer_type(value), True) else: return MapType(NullType(), NullType(), True) -elif isinstance(obj, (list, array)): +elif isinstance(obj, list): for v in obj: if v is not None: return ArrayType(_infer_type(obj[0]), True) else: return ArrayType(NullType(), True) +elif isinstance(obj, array): +if obj.typecode in _array_type_mappings: +return ArrayType(_array_type_mappings[obj.typecode](), False) +else: +raise TypeError("not supported type: array(%s)" % obj.typecode) --- End diff -- Actually I don't think 'L' should be supported because there is no type in Scala that can hold a 64bit unsigned integer... --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18670: [SPARK-21455][CORE]RpcFailure should be call on R...
Github user ConeyLiu commented on a diff in the pull request: https://github.com/apache/spark/pull/18670#discussion_r128132310 --- Diff: core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala --- @@ -624,7 +624,9 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll { val e = intercept[SparkException] { ThreadUtils.awaitResult(f, 1 seconds) } - assert(e.getCause.isInstanceOf[NotSerializableException]) + assert(e.getCause.isInstanceOf[RuntimeException]) --- End diff -- Because we need a `Throwable` meesage called back from server, you can see: [NettyRpcEnv.scala#L205](https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala#L205) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18444: [SPARK-16542][SQL][PYSPARK] Fix bugs about types ...
Github user zasdfgbnm commented on a diff in the pull request: https://github.com/apache/spark/pull/18444#discussion_r128132120 --- Diff: python/pyspark/sql/types.py --- @@ -938,12 +1016,17 @@ def _infer_type(obj): return MapType(_infer_type(key), _infer_type(value), True) else: return MapType(NullType(), NullType(), True) -elif isinstance(obj, (list, array)): +elif isinstance(obj, list): for v in obj: if v is not None: return ArrayType(_infer_type(obj[0]), True) else: return ArrayType(NullType(), True) +elif isinstance(obj, array): +if obj.typecode in _array_type_mappings: +return ArrayType(_array_type_mappings[obj.typecode](), False) +else: +raise TypeError("not supported type: array(%s)" % obj.typecode) --- End diff -- @HyukjinKwon So, the conclusion is, we do support everything we supported before. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18444: [SPARK-16542][SQL][PYSPARK] Fix bugs about types ...
Github user zasdfgbnm commented on a diff in the pull request: https://github.com/apache/spark/pull/18444#discussion_r128131993 --- Diff: python/pyspark/sql/types.py --- @@ -938,12 +1016,17 @@ def _infer_type(obj): return MapType(_infer_type(key), _infer_type(value), True) else: return MapType(NullType(), NullType(), True) -elif isinstance(obj, (list, array)): +elif isinstance(obj, list): for v in obj: if v is not None: return ArrayType(_infer_type(obj[0]), True) else: return ArrayType(NullType(), True) +elif isinstance(obj, array): +if obj.typecode in _array_type_mappings: +return ArrayType(_array_type_mappings[obj.typecode](), False) +else: +raise TypeError("not supported type: array(%s)" % obj.typecode) --- End diff -- Test result for python 2 is here: ```python Python 2.7.13 (default, Jul 2 2017, 22:24:59) [GCC 7.1.1 20170621] on linux2 Type "help", "copyright", "credits" or "license" for more information. Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). 17/07/18 20:59:26 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 17/07/18 20:59:26 WARN Utils: Your hostname, archlinux resolves to a loopback address: 127.0.0.1; using 192.168.88.2 instead (on interface eno1) 17/07/18 20:59:26 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address 17/07/18 20:59:29 WARN ObjectStore: Failed to get database global_temp, returning NoSuchObjectException Welcome to __ / __/__ ___ _/ /__ _\ \/ _ \/ _ `/ __/ '_/ /__ / .__/\_,_/_/ /_/\_\ version 2.2.0 /_/ Using Python version 2.7.13 (default, Jul 2 2017 22:24:59) SparkSession available as 'spark'. >>> import array,sys >>> from pyspark import * >>> from pyspark.sql import * >>> >>> def assertCollectSuccess(typecode, value): ... row = Row(myarray=array.array(typecode, [value])) ... df = spark.createDataFrame([row]) ... print(typecode) ... df.show() ... >>> assertCollectSuccess('u',u'a') u +---+ |myarray| +---+ |[a]| +---+ >>> assertCollectSuccess('f',1.2) f +---+ |myarray| +---+ | [null]| +---+ >>> assertCollectSuccess('d',1.2) d +---+ |myarray| +---+ | [1.2]| +---+ >>> assertCollectSuccess('b',1) b +---+ |myarray| +---+ | [null]| +---+ >>> assertCollectSuccess('B',1) B +---+ |myarray| +---+ | [null]| +---+ >>> assertCollectSuccess('h',1) h +---+ |myarray| +---+ | [null]| +---+ >>> assertCollectSuccess('H',1) H +---+ |myarray| +---+ |[1]| +---+ >>> assertCollectSuccess('i',1) i +---+ |myarray| +---+ |[1]| +---+ >>> assertCollectSuccess('I',1) I +---+ |myarray| +---+ |[1]| +---+ >>> assertCollectSuccess('l',1) l +---+ |myarray| +---+ |[1]| +---+ >>> assertCollectSuccess('L',1) L +---+ |myarray| +---+ |[1]| +---+ >>> >>> >>> if sys.version_info[0] > 2: ... assertCollectSuccess('q',1) ... assertCollectSuccess('Q',1) ... >>> >>> if sys.version_info[0] < 3: ... assertCollectSuccess('c','a') ... c +---+ |myarray| +---+ |[a]| +---+ >>> ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18444: [SPARK-16542][SQL][PYSPARK] Fix bugs about types ...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/18444#discussion_r128131947 --- Diff: python/pyspark/sql/types.py --- @@ -938,12 +1016,17 @@ def _infer_type(obj): return MapType(_infer_type(key), _infer_type(value), True) else: return MapType(NullType(), NullType(), True) -elif isinstance(obj, (list, array)): +elif isinstance(obj, list): for v in obj: if v is not None: return ArrayType(_infer_type(obj[0]), True) else: return ArrayType(NullType(), True) +elif isinstance(obj, array): +if obj.typecode in _array_type_mappings: +return ArrayType(_array_type_mappings[obj.typecode](), False) +else: +raise TypeError("not supported type: array(%s)" % obj.typecode) --- End diff -- Definitely. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18444: [SPARK-16542][SQL][PYSPARK] Fix bugs about types ...
Github user zasdfgbnm commented on a diff in the pull request: https://github.com/apache/spark/pull/18444#discussion_r128131853 --- Diff: python/pyspark/sql/types.py --- @@ -938,12 +1016,17 @@ def _infer_type(obj): return MapType(_infer_type(key), _infer_type(value), True) else: return MapType(NullType(), NullType(), True) -elif isinstance(obj, (list, array)): +elif isinstance(obj, list): for v in obj: if v is not None: return ArrayType(_infer_type(obj[0]), True) else: return ArrayType(NullType(), True) +elif isinstance(obj, array): +if obj.typecode in _array_type_mappings: +return ArrayType(_array_type_mappings[obj.typecode](), False) +else: +raise TypeError("not supported type: array(%s)" % obj.typecode) --- End diff -- Test result of python3 & spark 2.2.0 is here. I will check on python 2 in a few minutes. I paste the whole command line output, it's a bit lengthy. @HyukjinKwon could you please double check if my test is complete? ```python >>> import array,sys >>> from pyspark import * >>> from pyspark.sql import * >>> >>> def assertCollectSuccess(typecode, value): ... row = Row(myarray=array.array(typecode, [value])) ... df = spark.createDataFrame([row]) ... print(typecode) ... df.show() ... >>> assertCollectSuccess('u',u'a') u +---+ |myarray| +---+ |[a]| +---+ >>> assertCollectSuccess('f',1.2) f +---+ |myarray| +---+ | [null]| +---+ >>> assertCollectSuccess('d',1.2) d +---+ |myarray| +---+ | [1.2]| +---+ >>> assertCollectSuccess('b',1) b +---+ |myarray| +---+ | [null]| +---+ >>> assertCollectSuccess('B',1) B +---+ |myarray| +---+ | [null]| +---+ >>> assertCollectSuccess('h',1) h +---+ |myarray| +---+ | [null]| +---+ >>> assertCollectSuccess('H',1) H +---+ |myarray| +---+ |[1]| +---+ >>> assertCollectSuccess('i',1) i +---+ |myarray| +---+ |[1]| +---+ >>> assertCollectSuccess('I',1) I +---+ |myarray| +---+ |[1]| +---+ >>> assertCollectSuccess('l',1) l +---+ |myarray| +---+ |[1]| +---+ >>> assertCollectSuccess('L',1) L 17/07/18 20:55:55 ERROR Executor: Exception in task 14.0 in stage 119.0 (TID 799) net.razorvine.pickle.PickleException: unsupported datatype: 64-bits unsigned long at net.razorvine.pickle.objects.ArrayConstructor.constructLongArrayFromUInt64(ArrayConstructor.java:302) at net.razorvine.pickle.objects.ArrayConstructor.construct(ArrayConstructor.java:240) at net.razorvine.pickle.objects.ArrayConstructor.construct(ArrayConstructor.java:26) at net.razorvine.pickle.Unpickler.load_reduce(Unpickler.java:707) at net.razorvine.pickle.Unpickler.dispatch(Unpickler.java:175) at net.razorvine.pickle.Unpickler.load(Unpickler.java:99) at net.razorvine.pickle.Unpickler.loads(Unpickler.java:112) at org.apache.spark.api.python.SerDeUtil$$anonfun$pythonToJava$1$$anonfun$apply$1.apply(SerDeUtil.scala:152) at org.apache.spark.api.python.SerDeUtil$$anonfun$pythonToJava$1$$anonfun$apply$1.apply(SerDeUtil.scala:151) at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434) at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:234) at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:228) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:108) at