[jira] [Commented] (SPARK-21676) cannot compile on hadoop 2.2.0 and hive
[ https://issues.apache.org/jira/browse/SPARK-21676?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16121100#comment-16121100 ] Qinghe Jin commented on SPARK-21676: Thanks! > cannot compile on hadoop 2.2.0 and hive > --- > > Key: SPARK-21676 > URL: https://issues.apache.org/jira/browse/SPARK-21676 > Project: Spark > Issue Type: Bug > Components: Build >Affects Versions: 2.2.0 > Environment: centos6, java8, maven3.5.0, hadoop2.2, hive-0.12.0 >Reporter: Qinghe Jin > > Using following command to compile: > “./make-distribution.sh --tgz -Phadoop-2.2 -Pyarn -Phive > -Dhadoop.version=2.2.0” > Then get the following output: > >>>[error] > >>>SPARK_DIR/spark-2.2.0/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala:149: > >>> value getThreadStatistics is not a member of > >>>org.apache.hadoop.fs.FileSystem.Statistics > >>>[error] val f = () => > >>>FileSystem.getAllStatistics.asScala.map(_.getThreadStatistics.getBytesRead).sum > >>>[error] ^ > >>>[error] > >>>SPARK_DIR/spark-2.2.0/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala:149: > >>> ambiguous implicit values: > >>>[error] both object BigIntIsIntegral in object Numeric of type > >>>scala.math.Numeric.BigIntIsIntegral.type > >>>[error] and object ShortIsIntegral in object Numeric of type > >>>scala.math.Numeric.ShortIsIntegral.type > >>>[error] match expected type Numeric[B] > >>>[error] val f = () => > >>>FileSystem.getAllStatistics.asScala.map(_.getThreadStatistics.getBytesRead).sum > >>>[error] > >>> ^ > >>>[error] > >>>SPARK_DIR/spark-2.2.0/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala:166: > >>> could not find implicit value for parameter num: Numeric[(Nothing, > >>>Nothing)] > >>>[error] }.sum > >>>[error] ^ > >>>[error] > >>>SPARK_DIR/spark-2.2.0/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala:180: > >>> value getThreadStatistics is not a member of > >>>org.apache.hadoop.fs.FileSystem.Statistics > >>>[error] val threadStats = > >>>FileSystem.getAllStatistics.asScala.map(_.getThreadStatistics) > >>>[error] ^ > >>>[error] > >>>SPARK_DIR/spark-2.2.0/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala:181: > >>> value getBytesWritten is not a member of Nothing > >>>[error] val f = () => threadStats.map(_.getBytesWritten).sum > >>>[error] ^ > >>>[error] > >>>SPARK_DIR/spark-2.2.0/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala:181: > >>> ambiguous implicit values: > >>>[error] both object BigIntIsIntegral in object Numeric of type > >>>scala.math.Numeric.BigIntIsIntegral.type > >>>[error] and object ShortIsIntegral in object Numeric of type > >>>scala.math.Numeric.ShortIsIntegral.type > >>>[error] match expected type Numeric[B] > >>>[error] val f = () => threadStats.map(_.getBytesWritten).sum > >>>[error] ^ > >>>[WARNING] unused-1.0.0.jar, spark-network-common_2.11-2.2.0.jar, > >>>spark-network-shuffle_2.11-2.2.0.jar, spark-tags_2.11-2.2.0.jar define 1 > >>>overlapping classes: > >>>[WARNING] - org.apache.spark.unused.UnusedStubClass > >>>[WARNING] maven-shade-plugin has detected that some class files are > >>>[WARNING] present in two or more JARs. When this happens, only one > >>>[WARNING] single version of the class is copied to the uber jar. > >>>[WARNING] Usually this is not harmful and you can skip these warnings, > >>>[WARNING] otherwise try to manually exclude artifacts based on > >>>[WARNING] mvn dependency:tree -Ddetail=true and the above output. > >>>[WARNING] See http://maven.apache.org/plugins/maven-shade-plugin/ > >>>[INFO] > >>>[INFO] --- maven-source-plugin:3.0.1:jar-no-fork (create-source-jar) @ > >>>spark-network-yarn_2.11 --- > >>>[INFO] Building jar: > >>>SPARK_DIR/spark-2.2.0/common/network-yarn/target/spark-network-yarn_2.11-2.2.0-sources.jar > >>>[INFO] > >>>[INFO] --- maven-source-plugin:3.0.1:test-jar-no-fork (create-source-jar) > >>>@ spark-network-yarn_2.11 --- > >>>[INFO] Building jar: > >>>SPARK_DIR/spark-2.2.0/common/network-yarn/target/spark-network-yarn_2.11-2.2.0-test-sources.jar > >>>[error] > >>>SPARK_DIR/spark-2.2.0/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala:324: > >>> not found: type InputSplitWithLocationInfo > >>>[error] case lsplit: InputSplitWithLocationInfo => > >>>[error]^ > >>>[error] > >>>SPARK_DIR/spark-2.2.0/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala:403: > >>> not found: type SplitLocationInfo >
[jira] [Updated] (SPARK-21688) performance improvement in mllib SVM with native BLAS
[ https://issues.apache.org/jira/browse/SPARK-21688?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Vincent updated SPARK-21688: Attachment: mllib svm training.png > performance improvement in mllib SVM with native BLAS > -- > > Key: SPARK-21688 > URL: https://issues.apache.org/jira/browse/SPARK-21688 > Project: Spark > Issue Type: Improvement > Components: MLlib >Affects Versions: 2.2.0 > Environment: 4 nodes: 1 master node, 3 worker nodes > model name : Intel(R) Xeon(R) CPU E5-2697 v2 @ 2.70GHz > Memory : 180G > num of core per node: 10 >Reporter: Vincent > Attachments: mllib svm training.png > > > in current mllib SVM implementation, we found that the CPU is not fully > utilized, one reason is that f2j blas is set to be used in the HingeGradient > computation. As we found out earlier > (https://issues.apache.org/jira/browse/SPARK-21305) that with proper > settings, native blas is generally better than f2j on the uni-test level, > here we make the blas operations in SVM go with MKL blas and get an end to > end performance report showing that in most cases native blas outperformance > f2j blas up to 50%. > So, we suggest removing those f2j-fixed calling and going for native blas if > available. If this proposal is acceptable, we will move on to benchmark other > algorithms impacted. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-21688) performance improvement in mllib SVM with native BLAS
Vincent created SPARK-21688: --- Summary: performance improvement in mllib SVM with native BLAS Key: SPARK-21688 URL: https://issues.apache.org/jira/browse/SPARK-21688 Project: Spark Issue Type: Improvement Components: MLlib Affects Versions: 2.2.0 Environment: 4 nodes: 1 master node, 3 worker nodes model name : Intel(R) Xeon(R) CPU E5-2697 v2 @ 2.70GHz Memory : 180G num of core per node: 10 Reporter: Vincent in current mllib SVM implementation, we found that the CPU is not fully utilized, one reason is that f2j blas is set to be used in the HingeGradient computation. As we found out earlier (https://issues.apache.org/jira/browse/SPARK-21305) that with proper settings, native blas is generally better than f2j on the uni-test level, here we make the blas operations in SVM go with MKL blas and get an end to end performance report showing that in most cases native blas outperformance f2j blas up to 50%. So, we suggest removing those f2j-fixed calling and going for native blas if available. If this proposal is acceptable, we will move on to benchmark other algorithms impacted. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Closed] (SPARK-20762) Make String Params Case-Insensitive
[ https://issues.apache.org/jira/browse/SPARK-20762?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zhengruifeng closed SPARK-20762. Resolution: Not A Problem > Make String Params Case-Insensitive > --- > > Key: SPARK-20762 > URL: https://issues.apache.org/jira/browse/SPARK-20762 > Project: Spark > Issue Type: Improvement > Components: ML >Affects Versions: 2.3.0 >Reporter: zhengruifeng > > Make String Params (excpet Cols) case-insensitve: > {{solver}} > {{modelType}} > {{initMode}} > {{metricName}} > {{handleInvalid}} > {{strategy}} > {{stringOrderType}} > {{coldStartStrategy}} > {{impurity}} > {{lossType}} > {{featureSubsetStrategy}} > {{intermediateStorageLevel}} > {{finalStorageLevel}} -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-21687) Spark SQL should set createTime for Hive partition
Chaozhong Yang created SPARK-21687: -- Summary: Spark SQL should set createTime for Hive partition Key: SPARK-21687 URL: https://issues.apache.org/jira/browse/SPARK-21687 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 2.2.0, 2.1.0 Reporter: Chaozhong Yang Fix For: 2.3.0 In Spark SQL, we often use `insert overwite table t partition(p=xx)` to create partition for partitioned table. `createTime` is an important information to manage data lifecycle, e.g TTL. However, we found that Spark SQL doesn't call setCreateTime in `HiveClientImpl#toHivePartition` as follows: {code:scala} def toHivePartition( p: CatalogTablePartition, ht: HiveTable): HivePartition = { val tpart = new org.apache.hadoop.hive.metastore.api.Partition val partValues = ht.getPartCols.asScala.map { hc => p.spec.get(hc.getName).getOrElse { throw new IllegalArgumentException( s"Partition spec is missing a value for column '${hc.getName}': ${p.spec}") } } val storageDesc = new StorageDescriptor val serdeInfo = new SerDeInfo p.storage.locationUri.map(CatalogUtils.URIToString(_)).foreach(storageDesc.setLocation) p.storage.inputFormat.foreach(storageDesc.setInputFormat) p.storage.outputFormat.foreach(storageDesc.setOutputFormat) p.storage.serde.foreach(serdeInfo.setSerializationLib) serdeInfo.setParameters(p.storage.properties.asJava) storageDesc.setSerdeInfo(serdeInfo) tpart.setDbName(ht.getDbName) tpart.setTableName(ht.getTableName) tpart.setValues(partValues.asJava) tpart.setSd(storageDesc) new HivePartition(ht, tpart) } {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-21682) Caching 100k-task RDD GC-kills driver (due to updatedBlockStatuses?)
[ https://issues.apache.org/jira/browse/SPARK-21682?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16120959#comment-16120959 ] DjvuLee commented on SPARK-21682: - Yes, our company also faced with this scalability problem, the driver can easily died under a 70K partition. > Caching 100k-task RDD GC-kills driver (due to updatedBlockStatuses?) > > > Key: SPARK-21682 > URL: https://issues.apache.org/jira/browse/SPARK-21682 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.0.2, 2.1.1, 2.2.0 >Reporter: Ryan Williams > > h3. Summary > * {{sc.parallelize(1 to 10, 10).cache.count}} causes a driver GC > stall midway through on every configuration and version I've tried in 2.x. > * It runs fine with no Full GCs as of 1.6.3 > * I think that {{internal.metrics.updatedBlockStatuses}} is the culprit, and > breaks a contract about what big-O sizes accumulators' values can be: > ** they are each of size O(P), where P is the number of partitions in a > cached RDD > ** ⇒ the driver must process O(P²) data from {{TaskEnd}} events, instead of > O(P) > ** ⇒ the driver also must process O(P*E) work every 10s from > {{ExecutorMetricsUpdates}} (where E is the number of executors; cf. > {{spark.executor.heartbeatInterval}}) > * when operating on a 100k-partition cached RDD, the driver enters a GC loop > due to all the allocations it must do to process {{ExecutorMetricsUpdate}} > and {{TaskEnd}} events with {{updatedBlockStatuses}} attached > * this metric should be disabled, or some ability to blacklist it from the > command-line should be added. > * [SPARK-20084|https://issues.apache.org/jira/browse/SPARK-20084] addressed > one part of this - the event-log size had exploded - but the root problem > still exists / is worse > h3. {{count}} a 100k-partition RDD: works fine without {{.cache}} > In Spark 2.2.0 or 2.1.1: > {code} > spark-shell --conf spark.driver.extraJavaOptions="-XX:+PrintGCDetails > -XX:+PrintGCTimeStamps -verbose:gc" > scala> val rdd = sc.parallelize(1 to 10, 10) > scala> rdd.count > {code} > In YARN and local modes, this finishes in ~20s seconds with ~20 partial GCs > logged, all taking under 0.1s ([example > output|https://gist.github.com/ryan-williams/a3d02ea49d2b5021b53e1680f8fedc37#file-local-mode-rdd-not-cached-works-fine]); > all is well! > h3. {{count}} a 100k-partition cached RDD: GC-dies > If we {{cache}} the RDD first, the same {{count}} job quickly sends the > driver into a GC death spiral: full GC's start after a few thousand tasks and > increase in frequency and length until they last minutes / become continuous > (and, in YARN, the driver loses contact with any executors). > Example outputs: > [local|https://gist.github.com/ryan-williams/a3d02ea49d2b5021b53e1680f8fedc37#file-local-mode-rdd-cached-crashes], > > [YARN|https://gist.github.com/ryan-williams/a3d02ea49d2b5021b53e1680f8fedc37#file-yarn-mode-cached-rdd-dies]. > The YARN example removes any confusion about whether the storing of the > blocks is causing memory pressure on the driver; the driver is basically > doing no work except receiving executor updates and events, and yet it > becomes overloaded. > h3. Can't effectively throw driver heap at the problem > I've tested with 1GB, 10GB, and 20GB heaps, and the larger heaps do what we'd > expect: delay the first Full GC, and make Full GCs longer when they happen. > I don't have a clear sense on whether the onset is linear or quadratic (i.e. > do I get twice as far into the job before the first Full GC with a 20GB as > with a 10GB heap, or only sqrt(2) times as far?). > h3. Mostly memory pressure, not OOMs > An interesting note is that I'm rarely seeing OOMs as a result of this, even > on small heaps. > I think this is consistent with the idea that all this data is being > immediately discarded by the driver, as opposed to kept around to serve web > UIs or somesuch. > h3. Eliminating {{ExecutorMetricsUpdate}}'s doesn't seem to help > Interestingly, setting large values of {{spark.executor.heartbeatInterval}} > doesn't seem to mitigate the problem; GC-stall sets in at about the same > point in the {{count}} job. > This implies that, in this example, the {{TaskEnd}} events are doing most or > all of the damage. > h3. CMS helps but doesn't solve the problem > In some rough testing, I saw the {{count}} get about twice as far before > dying when using the CMS collector. > h3. What bandwidth do we expect the driver to process events at? > IIUC, every 10s the driver gets O(T) (~100k?) block updates from each of ~500 > executors, and allocating objects for these updates is pushing it over a > tipping point where it can't keep up. > I don't know how to get good numbers on how much data the driver is >
[jira] [Commented] (SPARK-21680) ML/MLLIB Vector compressed optimization
[ https://issues.apache.org/jira/browse/SPARK-21680?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16120947#comment-16120947 ] Peng Meng commented on SPARK-21680: --- Hi [~srowen], if add toSparse(size), for secure reason, it is better to check size with numNonzeros, if size is larger than numNonzeros, the program may crash. If we check the size with numNonzeros, we still add one more scan to the value. So in this PR, I revise the code like this JIRA. Thanks. > ML/MLLIB Vector compressed optimization > --- > > Key: SPARK-21680 > URL: https://issues.apache.org/jira/browse/SPARK-21680 > Project: Spark > Issue Type: Improvement > Components: ML, MLlib >Affects Versions: 2.3.0 >Reporter: Peng Meng > > When use Vector.compressed to change a Vector to SparseVector, the > performance is very low comparing with Vector.toSparse. > This is because you have to scan the value three times using > Vector.compressed, but you just need two times when use Vector.toSparse. > When the length of the vector is large, there is significant performance > difference between this two method. > Code of Vector compressed: > {code:java} > def compressed: Vector = { > val nnz = numNonzeros > // A dense vector needs 8 * size + 8 bytes, while a sparse vector needs > 12 * nnz + 20 bytes. > if (1.5 * (nnz + 1.0) < size) { > toSparse > } else { > toDense > } > } > {code} > I propose to change it to: > {code:java} > // Some comments here > def compressed: Vector = { > val nnz = numNonzeros > // A dense vector needs 8 * size + 8 bytes, while a sparse vector needs > 12 * nnz + 20 bytes. > if (1.5 * (nnz + 1.0) < size) { > val ii = new Array[Int](nnz) > val vv = new Array[Double](nnz) > var k = 0 > foreachActive { (i, v) => > if (v != 0) { > ii(k) = i > vv(k) = v > k += 1 > } > } > new SparseVector(size, ii, vv) > } else { > toDense > } > } > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-21245) Resolve code duplication for classification/regression summarizers
[ https://issues.apache.org/jira/browse/SPARK-21245?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16120930#comment-16120930 ] Bravo Zhang commented on SPARK-21245: - User 'bravo-zhang' has created a pull request for this issue: https://github.com/apache/spark/pull/18898 > Resolve code duplication for classification/regression summarizers > -- > > Key: SPARK-21245 > URL: https://issues.apache.org/jira/browse/SPARK-21245 > Project: Spark > Issue Type: Sub-task > Components: ML >Affects Versions: 2.2.1 >Reporter: Seth Hendrickson >Priority: Minor > Labels: starter > > In several places (LogReg, LinReg, SVC) in Spark ML, we collect summary > information about training data using {{MultivariateOnlineSummarizer}} and > {{MulticlassSummarizer}}. We have the same code appearing in several places > (and including test suites). We can eliminate this by creating a common > implementation somewhere. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-21590) Structured Streaming window start time should support negative values to adjust time zone
[ https://issues.apache.org/jira/browse/SPARK-21590?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16120924#comment-16120924 ] Kevin Zhang commented on SPARK-21590: - Thanks, I'd like to work on this. I agree the requirement that the absolute value of the start offset is less than the slide interval, but I don't know why we should add the slide interval to start offset when it's negative? I've tried in my local environment that currently the calculation supports negative value of start offset, it's the non-negative check for parameters of the window that limits. So I suggest only to make some changes of the check function. How do you think about it? > Structured Streaming window start time should support negative values to > adjust time zone > - > > Key: SPARK-21590 > URL: https://issues.apache.org/jira/browse/SPARK-21590 > Project: Spark > Issue Type: Improvement > Components: Structured Streaming >Affects Versions: 2.0.0, 2.0.1, 2.1.0, 2.2.0 > Environment: spark 2.2.0 >Reporter: Kevin Zhang > Labels: spark-sql, spark2.2, streaming, structured, timezone, > window > > I want to calculate (unique) daily access count using structured streaming > (2.2.0). > Now strut streaming' s window with 1 day duration starts at > 00:00:00 UTC and ends at 23:59:59 UTC each day, but my local timezone is CST > (UTC + 8 hours) and I > want date boundaries to be 00:00:00 CST (that is 00:00:00 UTC - 8). > In Flink I can set the window offset to -8 hours to make it, but here in > struct streaming if I set the start time (same as the offset in Flink) to -8 > or any other negative values, I will get the following error: > {code:java} > Exception in thread "main" org.apache.spark.sql.AnalysisException: cannot > resolve 'timewindow(timestamp, 864, 864, -288)' due > to data type mismatch: The start time (-288) must be greater than or > equal to 0.;; > {code} > because the time window checks the input parameters to guarantee each value > is greater than or equal to 0. > So I'm thinking about whether we can remove the limit that the start time > cannot be negative? -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-14932) Allow DataFrame.replace() to replace values with None
[ https://issues.apache.org/jira/browse/SPARK-14932?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xiao Li resolved SPARK-14932. - Resolution: Fixed Assignee: Bravo Zhang Fix Version/s: 2.3.0 > Allow DataFrame.replace() to replace values with None > - > > Key: SPARK-14932 > URL: https://issues.apache.org/jira/browse/SPARK-14932 > Project: Spark > Issue Type: Improvement > Components: SQL >Reporter: Nicholas Chammas >Assignee: Bravo Zhang >Priority: Minor > Labels: starter > Fix For: 2.3.0 > > > Current doc: > http://spark.apache.org/docs/1.6.1/api/python/pyspark.sql.html#pyspark.sql.DataFrame.replace > I would like to specify {{None}} as the value to substitute in. This is > currently > [disallowed|https://github.com/apache/spark/blob/9797cc20c0b8fb34659df11af8eccb9ed293c52c/python/pyspark/sql/dataframe.py#L1144-L1145]. > My use case is for replacing bad values with {{None}} so I can then ignore > them with {{dropna()}}. > For example, I have a dataset that incorrectly includes empty strings where > there should be {{None}} values. I would like to replace the empty strings > with {{None}} and then drop all null data with {{dropna()}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-21551) pyspark's collect fails when getaddrinfo is too slow
[ https://issues.apache.org/jira/browse/SPARK-21551?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Reynold Xin resolved SPARK-21551. - Resolution: Fixed Assignee: peay Fix Version/s: 2.3.0 > pyspark's collect fails when getaddrinfo is too slow > > > Key: SPARK-21551 > URL: https://issues.apache.org/jira/browse/SPARK-21551 > Project: Spark > Issue Type: Bug > Components: PySpark >Affects Versions: 2.1.0 >Reporter: peay >Assignee: peay >Priority: Critical > Fix For: 2.3.0 > > > Pyspark's {{RDD.collect}}, as well as {{DataFrame.toLocalIterator}} and > {{DataFrame.collect}} all work by starting an ephemeral server in the driver, > and having Python connect to it to download the data. > All three are implemented along the lines of: > {code} > port = self._jdf.collectToPython() > return list(_load_from_socket(port, BatchedSerializer(PickleSerializer( > {code} > The server has **a hardcoded timeout of 3 seconds** > (https://github.com/apache/spark/blob/e26dac5feb02033f980b1e69c9b0ff50869b6f9e/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala#L695) > -- i.e., the Python process has 3 seconds to connect to it from the very > moment the driver server starts. > In general, that seems fine, but I have been encountering frequent timeouts > leading to `Exception: could not open socket`. > After investigating a bit, it turns out that {{_load_from_socket}} makes a > call to {{getaddrinfo}}: > {code} > def _load_from_socket(port, serializer): > sock = None > # Support for both IPv4 and IPv6. > # On most of IPv6-ready systems, IPv6 will take precedence. > for res in socket.getaddrinfo("localhost", port, socket.AF_UNSPEC, > socket.SOCK_STREAM): >.. connect .. > {code} > I am not sure why, but while most such calls to {{getaddrinfo}} on my machine > only take a couple milliseconds, about 10% of them take between 2 and 10 > seconds, leading to about 10% of jobs failing. I don't think we can always > expect {{getaddrinfo}} to return instantly. More generally, Python may > sometimes pause for a couple seconds, which may not leave enough time for the > process to connect to the server. > Especially since the server timeout is hardcoded, I think it would be best to > set a rather generous value (15 seconds?) to avoid such situations. > A {{getaddrinfo}} specific fix could avoid doing it every single time, or do > it before starting up the driver server. > > cc SPARK-677 [~davies] -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-19116) LogicalPlan.statistics.sizeInBytes wrong for trivial parquet file
[ https://issues.apache.org/jira/browse/SPARK-19116?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16120643#comment-16120643 ] Andrew Ash commented on SPARK-19116: Ah yes, for files it seems like Spark currently uses size of the parquet files on disk, rather than estimating in-memory size by multiplying the sum of the column type sizes by the row count. > LogicalPlan.statistics.sizeInBytes wrong for trivial parquet file > - > > Key: SPARK-19116 > URL: https://issues.apache.org/jira/browse/SPARK-19116 > Project: Spark > Issue Type: Bug > Components: PySpark, SQL >Affects Versions: 2.0.1, 2.0.2 > Environment: Python 3.5.x > Windows 10 >Reporter: Shea Parkes > > We're having some modestly severe issues with broadcast join inference, and > I've been chasing them through the join heuristics in the catalyst engine. > I've made it as far as I can, and I've hit upon something that does not make > any sense to me. > I thought that loading from parquet would be a RelationPlan, which would just > use the sum of default sizeInBytes for each column times the number of rows. > But this trivial example shows that I am not correct: > {code} > import pyspark.sql.functions as F > df_range = session.range(100).select(F.col('id').cast('integer')) > df_range.write.parquet('c:/scratch/hundred_integers.parquet') > df_parquet = session.read.parquet('c:/scratch/hundred_integers.parquet') > df_parquet.explain(True) > # Expected sizeInBytes > integer_default_sizeinbytes = 4 > print(df_parquet.count() * integer_default_sizeinbytes) # = 400 > # Inferred sizeInBytes > print(df_parquet._jdf.logicalPlan().statistics().sizeInBytes()) # = 2318 > # For posterity (Didn't really expect this to match anything above) > print(df_range._jdf.logicalPlan().statistics().sizeInBytes()) # = 600 > {code} > And here's the results of explain(True) on df_parquet: > {code} > In [456]: == Parsed Logical Plan == > Relation[id#794] parquet > == Analyzed Logical Plan == > id: int > Relation[id#794] parquet > == Optimized Logical Plan == > Relation[id#794] parquet > == Physical Plan == > *BatchedScan parquet [id#794] Format: ParquetFormat, InputPaths: > file:/c:/scratch/hundred_integers.parquet, PartitionFilters: [], > PushedFilters: [], ReadSchema: struct > {code} > So basically, I'm not understanding well how the size of the parquet file is > being estimated. I don't expect it to be extremely accurate, but empirically > it's so inaccurate that we're having to mess with autoBroadcastJoinThreshold > way too much. (It's not always too high like the example above, it's often > way too low.) > Without deeper understanding, I'm considering a result of 2318 instead of 400 > to be a bug. My apologies if I'm missing something obvious. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Closed] (SPARK-19116) LogicalPlan.statistics.sizeInBytes wrong for trivial parquet file
[ https://issues.apache.org/jira/browse/SPARK-19116?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Andrew Ash closed SPARK-19116. -- Resolution: Not A Problem > LogicalPlan.statistics.sizeInBytes wrong for trivial parquet file > - > > Key: SPARK-19116 > URL: https://issues.apache.org/jira/browse/SPARK-19116 > Project: Spark > Issue Type: Bug > Components: PySpark, SQL >Affects Versions: 2.0.1, 2.0.2 > Environment: Python 3.5.x > Windows 10 >Reporter: Shea Parkes > > We're having some modestly severe issues with broadcast join inference, and > I've been chasing them through the join heuristics in the catalyst engine. > I've made it as far as I can, and I've hit upon something that does not make > any sense to me. > I thought that loading from parquet would be a RelationPlan, which would just > use the sum of default sizeInBytes for each column times the number of rows. > But this trivial example shows that I am not correct: > {code} > import pyspark.sql.functions as F > df_range = session.range(100).select(F.col('id').cast('integer')) > df_range.write.parquet('c:/scratch/hundred_integers.parquet') > df_parquet = session.read.parquet('c:/scratch/hundred_integers.parquet') > df_parquet.explain(True) > # Expected sizeInBytes > integer_default_sizeinbytes = 4 > print(df_parquet.count() * integer_default_sizeinbytes) # = 400 > # Inferred sizeInBytes > print(df_parquet._jdf.logicalPlan().statistics().sizeInBytes()) # = 2318 > # For posterity (Didn't really expect this to match anything above) > print(df_range._jdf.logicalPlan().statistics().sizeInBytes()) # = 600 > {code} > And here's the results of explain(True) on df_parquet: > {code} > In [456]: == Parsed Logical Plan == > Relation[id#794] parquet > == Analyzed Logical Plan == > id: int > Relation[id#794] parquet > == Optimized Logical Plan == > Relation[id#794] parquet > == Physical Plan == > *BatchedScan parquet [id#794] Format: ParquetFormat, InputPaths: > file:/c:/scratch/hundred_integers.parquet, PartitionFilters: [], > PushedFilters: [], ReadSchema: struct > {code} > So basically, I'm not understanding well how the size of the parquet file is > being estimated. I don't expect it to be extremely accurate, but empirically > it's so inaccurate that we're having to mess with autoBroadcastJoinThreshold > way too much. (It's not always too high like the example above, it's often > way too low.) > Without deeper understanding, I'm considering a result of 2318 instead of 400 > to be a bug. My apologies if I'm missing something obvious. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-21685) Params isSet in scala Transformer triggered by _setDefault in pyspark
[ https://issues.apache.org/jira/browse/SPARK-21685?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ratan Rai Sur updated SPARK-21685: -- Description: I'm trying to write a PySpark wrapper for a Transformer whose transform method includes the line {code:java} require(!(isSet(outputNodeName) && isSet(outputNodeIndex)), "Can't set both outputNodeName and outputNodeIndex") {code} This should only throw an exception when both of these parameters are explicitly set. In the PySpark wrapper for the Transformer, there is this line in ___init___ {code:java} self._setDefault(outputNodeIndex=0) {code} Here is the line in the main python script showing how it is being configured {code:java} cntkModel = CNTKModel().setInputCol("images").setOutputCol("output").setModelLocation(spark, model.uri).setOutputNodeName("z") {code} As you can see, only setOutputNodeName is being explicitly set but the exception is still being thrown. If you need more context, https://github.com/RatanRSur/mmlspark/tree/default-cntkmodel-output is the branch with the code, the files I'm referring to here that are tracked are the following: src/cntk-model/src/main/scala/CNTKModel.scala notebooks/tests/301 - CIFAR10 CNTK CNN Evaluation.ipynb The pyspark wrapper code is autogenerated was: I'm trying to write a PySpark wrapper for a Transformer whose transform method includes the line {code:java} require(!(isSet(outputNodeName) && isSet(outputNodeIndex)), "Can't set both outputNodeName and outputNodeIndex") {code} This should only throw an exception when both of these parameters are explicitly set. In the PySpark wrapper for the Transformer, there is this line in ___init___ {code:java} self._setDefault(outputNodeIndex=0) {code} Here is the line in the main python script showing how it is being configured {code:java} cntkModel = CNTKModel().setInputCol("images").setOutputCol("output").setModelLocation(spark, model.uri).setOutputNodeName("z") {code} As you can see, only setOutputNodeName is being explicitly set but the exception is still being thrown. If you need more context, https://github.com/RatanRSur/mmlspark/tree/default-cntkmodel-output is the branch with the code, the files I'm referring to here are src/cntk-model/src/main/scala/CNTKModel.scala src/src/main/resources/mmlspark/_CNTKModel.py notebooks/tests/301 - CIFAR10 CNTK CNN Evaluation.ipynb > Params isSet in scala Transformer triggered by _setDefault in pyspark > - > > Key: SPARK-21685 > URL: https://issues.apache.org/jira/browse/SPARK-21685 > Project: Spark > Issue Type: Bug > Components: PySpark >Affects Versions: 2.1.0 >Reporter: Ratan Rai Sur > > I'm trying to write a PySpark wrapper for a Transformer whose transform > method includes the line > {code:java} > require(!(isSet(outputNodeName) && isSet(outputNodeIndex)), "Can't set both > outputNodeName and outputNodeIndex") > {code} > This should only throw an exception when both of these parameters are > explicitly set. > In the PySpark wrapper for the Transformer, there is this line in ___init___ > {code:java} > self._setDefault(outputNodeIndex=0) > {code} > Here is the line in the main python script showing how it is being configured > {code:java} > cntkModel = > CNTKModel().setInputCol("images").setOutputCol("output").setModelLocation(spark, > model.uri).setOutputNodeName("z") > {code} > As you can see, only setOutputNodeName is being explicitly set but the > exception is still being thrown. > If you need more context, > https://github.com/RatanRSur/mmlspark/tree/default-cntkmodel-output is the > branch with the code, the files I'm referring to here that are tracked are > the following: > src/cntk-model/src/main/scala/CNTKModel.scala > notebooks/tests/301 - CIFAR10 CNTK CNN Evaluation.ipynb > The pyspark wrapper code is autogenerated -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-21685) Params isSet in scala Transformer triggered by _setDefault in pyspark
[ https://issues.apache.org/jira/browse/SPARK-21685?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ratan Rai Sur updated SPARK-21685: -- Description: I'm trying to write a PySpark wrapper for a Transformer whose transform method includes the line {code:java} require(!(isSet(outputNodeName) && isSet(outputNodeIndex)), "Can't set both outputNodeName and outputNodeIndex") {code} This should only throw an exception when both of these parameters are explicitly set. In the PySpark wrapper for the Transformer, there is this line in ___init___ {code:java} self._setDefault(outputNodeIndex=0) {code} Here is the line in the main python script showing how it is being configured {code:java} cntkModel = CNTKModel().setInputCol("images").setOutputCol("output").setModelLocation(spark, model.uri).setOutputNodeName("z") {code} As you can see, only setOutputNodeName is being explicitly set but the exception is still being thrown. If you need more context, https://github.com/RatanRSur/mmlspark/tree/default-cntkmodel-output is the branch with the code, the files I'm referring to here are src/cntk-model/src/main/scala/CNTKModel.scala src/src/main/resources/mmlspark/_CNTKModel.py notebooks/tests/301 - CIFAR10 CNTK CNN Evaluation.ipynb was: I'm trying to write a PySpark wrapper for a Transformer whose transform method includes the line {code:java} require(!(isSet(outputNodeName) && isSet(outputNodeIndex)), "Can't set both outputNodeName and outputNodeIndex") {code} This should only throw an exception when both of these parameters are explicitly set. In the PySpark wrapper for the Transformer, there is this line in ___init___ {code:java} self._setDefault(outputNodeIndex=0) {code} Here is the line in the main python script showing how it is being configured {code:java} cntkModel = CNTKModel().setInputCol("images").setOutputCol("output").setModelLocation(spark, model.uri).setOutputNodeName("z") {code} As you can see, only setOutputNodeName is being explicitly set but the exception is still being thrown. If you need more context, https://github.com/RatanRSur/mmlspark/tree/default-cntkmodel-output is the branch with the code, the files I'm referring to here are CNTKModel.scala and _CNTKModel.py > Params isSet in scala Transformer triggered by _setDefault in pyspark > - > > Key: SPARK-21685 > URL: https://issues.apache.org/jira/browse/SPARK-21685 > Project: Spark > Issue Type: Bug > Components: PySpark >Affects Versions: 2.1.0 >Reporter: Ratan Rai Sur > > I'm trying to write a PySpark wrapper for a Transformer whose transform > method includes the line > {code:java} > require(!(isSet(outputNodeName) && isSet(outputNodeIndex)), "Can't set both > outputNodeName and outputNodeIndex") > {code} > This should only throw an exception when both of these parameters are > explicitly set. > In the PySpark wrapper for the Transformer, there is this line in ___init___ > {code:java} > self._setDefault(outputNodeIndex=0) > {code} > Here is the line in the main python script showing how it is being configured > {code:java} > cntkModel = > CNTKModel().setInputCol("images").setOutputCol("output").setModelLocation(spark, > model.uri).setOutputNodeName("z") > {code} > As you can see, only setOutputNodeName is being explicitly set but the > exception is still being thrown. > If you need more context, > https://github.com/RatanRSur/mmlspark/tree/default-cntkmodel-output is the > branch with the code, the files I'm referring to here are > src/cntk-model/src/main/scala/CNTKModel.scala > src/src/main/resources/mmlspark/_CNTKModel.py > notebooks/tests/301 - CIFAR10 CNTK CNN Evaluation.ipynb -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-21686) spark.sql.hive.convertMetastoreOrc is causing NullPointerException while reading ORC tables
Ernani Pereira de Mattos Junior created SPARK-21686: --- Summary: spark.sql.hive.convertMetastoreOrc is causing NullPointerException while reading ORC tables Key: SPARK-21686 URL: https://issues.apache.org/jira/browse/SPARK-21686 Project: Spark Issue Type: Bug Components: Spark Shell Affects Versions: 1.6.1 Environment: spark_2_4_2_0_258-1.6.1.2.4.2.0-258.el6.noarch spark_2_4_2_0_258-python-1.6.1.2.4.2.0-258.el6.noarch spark_2_4_2_0_258-yarn-shuffle-1.6.1.2.4.2.0-258.el6.noarch RHEL-7 (64-Bit) JDK 1.8 Reporter: Ernani Pereira de Mattos Junior The issue is very similar to SPARK-10304; Spark Query throws a NullPointerException. >>> sqlContext.sql('select * from core_next.spark_categorization').show(57) 17/06/19 11:26:54 ERROR Executor: Exception in task 2.0 in stage 21.0 (TID 48) java.lang.NullPointerException at org.apache.spark.sql.hive.HiveInspectors$class.unwrapperFor(HiveInspectors.scala:488) at org.apache.spark.sql.hive.orc.OrcTableScan.unwrapperFor(OrcRelation.scala:244) at org.apache.spark.sql.hive.orc.OrcTableScan$$anonfun$org$apache$spark$sql$hive$orc$OrcTableScan$$fillObject$1$$anonfun$6.apply(OrcRelation.scala:275) at org.apache.spark.sql.hive.orc.OrcTableScan$$anonfun$org$apache$spark$sql$hive$orc$OrcTableScan$$fillObject$1$$anonfun$6.apply(OrcRelation.scala:275) Turn off ORC optimizations and issue was resolved: "sqlContext.setConf("spark.sql.hive.convertMetastoreOrc", "false") -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-21685) Params isSet in scala Transformer triggered by _setDefault in pyspark
[ https://issues.apache.org/jira/browse/SPARK-21685?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ratan Rai Sur updated SPARK-21685: -- Description: I'm trying to write a PySpark wrapper for a Transformer whose transform method includes the line {code:java} require(!(isSet(outputNodeName) && isSet(outputNodeIndex)), "Can't set both outputNodeName and outputNodeIndex") {code} This should only throw an exception when both of these parameters are explicitly set. In the PySpark wrapper for the Transformer, there is this line in ___init___ {code:java} self._setDefault(outputNodeIndex=0) {code} Here is the line in the main python script showing how it is being configured {code:java} cntkModel = CNTKModel().setInputCol("images").setOutputCol("output").setModelLocation(spark, model.uri).setOutputNodeName("z") {code} As you can see, only setOutputNodeName is being explicitly set but the exception is still being thrown. If you need more context, https://github.com/RatanRSur/mmlspark/tree/default-cntkmodel-output is the branch with the code, the files I'm referring to here are CNTKModel.scala and _CNTKModel.py was: I'm trying to write a PySpark wrapper for a Transformer whose transform method includes the line {code:scala} require(!(isSet(outputNodeName) && isSet(outputNodeIndex)), "Can't set both outputNodeName and outputNodeIndex") {code} This should only throw an exception when both of these parameters are explicitly set. In the PySpark wrapper for the Transformer, there is this line in ___init___ {code:python} self._setDefault(outputNodeIndex=0) {code} Here is the line in the main python script showing how it is being configured {code:python} cntkModel = CNTKModel().setInputCol("images").setOutputCol("output").setModelLocation(spark, model.uri).setOutputNodeName("z") {code} As you can see, only setOutputNodeName is being explicitly set but the exception is still being thrown. If you need more context, https://github.com/RatanRSur/mmlspark/tree/default-cntkmodel-output is the branch with the code, the files I'm referring to here are CNTKModel.scala and _CNTKModel.py > Params isSet in scala Transformer triggered by _setDefault in pyspark > - > > Key: SPARK-21685 > URL: https://issues.apache.org/jira/browse/SPARK-21685 > Project: Spark > Issue Type: Bug > Components: PySpark >Affects Versions: 2.1.0 >Reporter: Ratan Rai Sur > > I'm trying to write a PySpark wrapper for a Transformer whose transform > method includes the line > {code:java} > require(!(isSet(outputNodeName) && isSet(outputNodeIndex)), "Can't set both > outputNodeName and outputNodeIndex") > {code} > This should only throw an exception when both of these parameters are > explicitly set. > In the PySpark wrapper for the Transformer, there is this line in ___init___ > {code:java} > self._setDefault(outputNodeIndex=0) > {code} > Here is the line in the main python script showing how it is being configured > {code:java} > cntkModel = > CNTKModel().setInputCol("images").setOutputCol("output").setModelLocation(spark, > model.uri).setOutputNodeName("z") > {code} > As you can see, only setOutputNodeName is being explicitly set but the > exception is still being thrown. > If you need more context, > https://github.com/RatanRSur/mmlspark/tree/default-cntkmodel-output is the > branch with the code, the files I'm referring to here are CNTKModel.scala and > _CNTKModel.py -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-21685) Params isSet in scala Transformer triggered by _setDefault in pyspark
Ratan Rai Sur created SPARK-21685: - Summary: Params isSet in scala Transformer triggered by _setDefault in pyspark Key: SPARK-21685 URL: https://issues.apache.org/jira/browse/SPARK-21685 Project: Spark Issue Type: Bug Components: PySpark Affects Versions: 2.1.0 Reporter: Ratan Rai Sur I'm trying to write a PySpark wrapper for a Transformer whose transform method includes the line {code:scala} require(!(isSet(outputNodeName) && isSet(outputNodeIndex)), "Can't set both outputNodeName and outputNodeIndex") {code} This should only throw an exception when both of these parameters are explicitly set. In the PySpark wrapper for the Transformer, there is this line in ___init___ {code:python} self._setDefault(outputNodeIndex=0) {code} Here is the line in the main python script showing how it is being configured {code:python} cntkModel = CNTKModel().setInputCol("images").setOutputCol("output").setModelLocation(spark, model.uri).setOutputNodeName("z") {code} As you can see, only setOutputNodeName is being explicitly set but the exception is still being thrown. If you need more context, https://github.com/RatanRSur/mmlspark/tree/default-cntkmodel-output is the branch with the code, the files I'm referring to here are CNTKModel.scala and _CNTKModel.py -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-21667) ConsoleSink should not fail streaming query with checkpointLocation option
[ https://issues.apache.org/jira/browse/SPARK-21667?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16120576#comment-16120576 ] Jacek Laskowski commented on SPARK-21667: - Oh, what an offer! Couldn't have thought of a better one today :) Let me see how far my hope to fix it leads me. I'm on it. > ConsoleSink should not fail streaming query with checkpointLocation option > -- > > Key: SPARK-21667 > URL: https://issues.apache.org/jira/browse/SPARK-21667 > Project: Spark > Issue Type: Improvement > Components: Structured Streaming >Affects Versions: 2.3.0 >Reporter: Jacek Laskowski >Priority: Minor > > As agreed on the Spark users mailing list in the thread "\[SS] Console sink > not supporting recovering from checkpoint location? Why?" in which > [~marmbrus] said: > {quote} > I think there is really no good reason for this limitation. > {quote} > Using {{ConsoleSink}} should therefore not fail a streaming query when used > with {{checkpointLocation}} option. > {code} > // today's build from the master > scala> spark.version > res8: String = 2.3.0-SNAPSHOT > scala> val q = records. > | writeStream. > | format("console"). > | option("truncate", false). > | option("checkpointLocation", "/tmp/checkpoint"). // <-- > checkpoint directory > | trigger(Trigger.ProcessingTime(10.seconds)). > | outputMode(OutputMode.Update). > | start > org.apache.spark.sql.AnalysisException: This query does not support > recovering from checkpoint location. Delete /tmp/checkpoint/offsets to start > over.; > at > org.apache.spark.sql.streaming.StreamingQueryManager.createQuery(StreamingQueryManager.scala:222) > at > org.apache.spark.sql.streaming.StreamingQueryManager.startQuery(StreamingQueryManager.scala:278) > at > org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:284) > ... 61 elided > {code} > The "trigger" is SPARK-16116 and [this > line|https://github.com/apache/spark/pull/13817/files#diff-d35e8fce09686073f81de598ed657de7R277] > in particular. > This also relates to SPARK-19768 that was resolved as not a bug. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-21587) Filter pushdown for EventTime Watermark Operator
[ https://issues.apache.org/jira/browse/SPARK-21587?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tathagata Das resolved SPARK-21587. --- Resolution: Fixed Fix Version/s: 3.0.0 Issue resolved by pull request 18790 [https://github.com/apache/spark/pull/18790] > Filter pushdown for EventTime Watermark Operator > > > Key: SPARK-21587 > URL: https://issues.apache.org/jira/browse/SPARK-21587 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Affects Versions: 2.2.0 >Reporter: Jose Torres > Fix For: 3.0.0 > > > If I have a streaming query that sets a watermark, then a where() that > pertains to a partition column does not prune these partitions and they will > all be queried, greatly reducing performance for partitioned tables. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-21682) Caching 100k-task RDD GC-kills driver (due to updatedBlockStatuses?)
[ https://issues.apache.org/jira/browse/SPARK-21682?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16120537#comment-16120537 ] Ryan Williams commented on SPARK-21682: --- bq. But do you really need to create so many partitions? Could you use `coalesce` to reduce the number of partitions? In my real app, that is exactly what I am trying to do! I am {{count}}'ing the records to determine how many partitions I should {{coalesce}} to, and I am {{cache}}'ing to avoid computing the RDD twice (once for the {{count}}, once for the repartition) because that would be very expensive. More detail: I have a large RDD (~100BN records, 100k partitions) that I am filtering down to what is likely to be <1MM records (but might not be!). ~1MM records/partition is a good size for this data, both before and after the filtering/repartitioning, based on what I know about it (and have already observed in this app when I try to put much more than that on a single partition, and see GC problems). If that sounds crazy to you, please tell me. Otherwise, can we instead talk about: * should Spark fall over at 100k partitions? * this worked fine in 1.6.3 but afaict it's impossible to {{count}} a 100k-partition cached RDD in the 2.x line. Is that a problem? * does anyone know how much work the driver is doing before and after e.g. a large accumulator is added, and how much the "maximum number of partitions before GC-stall-death" ceiling is lowered by a given change? > Caching 100k-task RDD GC-kills driver (due to updatedBlockStatuses?) > > > Key: SPARK-21682 > URL: https://issues.apache.org/jira/browse/SPARK-21682 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.0.2, 2.1.1, 2.2.0 >Reporter: Ryan Williams > > h3. Summary > * {{sc.parallelize(1 to 10, 10).cache.count}} causes a driver GC > stall midway through on every configuration and version I've tried in 2.x. > * It runs fine with no Full GCs as of 1.6.3 > * I think that {{internal.metrics.updatedBlockStatuses}} is the culprit, and > breaks a contract about what big-O sizes accumulators' values can be: > ** they are each of size O(P), where P is the number of partitions in a > cached RDD > ** ⇒ the driver must process O(P²) data from {{TaskEnd}} events, instead of > O(P) > ** ⇒ the driver also must process O(P*E) work every 10s from > {{ExecutorMetricsUpdates}} (where E is the number of executors; cf. > {{spark.executor.heartbeatInterval}}) > * when operating on a 100k-partition cached RDD, the driver enters a GC loop > due to all the allocations it must do to process {{ExecutorMetricsUpdate}} > and {{TaskEnd}} events with {{updatedBlockStatuses}} attached > * this metric should be disabled, or some ability to blacklist it from the > command-line should be added. > * [SPARK-20084|https://issues.apache.org/jira/browse/SPARK-20084] addressed > one part of this - the event-log size had exploded - but the root problem > still exists / is worse > h3. {{count}} a 100k-partition RDD: works fine without {{.cache}} > In Spark 2.2.0 or 2.1.1: > {code} > spark-shell --conf spark.driver.extraJavaOptions="-XX:+PrintGCDetails > -XX:+PrintGCTimeStamps -verbose:gc" > scala> val rdd = sc.parallelize(1 to 10, 10) > scala> rdd.count > {code} > In YARN and local modes, this finishes in ~20s seconds with ~20 partial GCs > logged, all taking under 0.1s ([example > output|https://gist.github.com/ryan-williams/a3d02ea49d2b5021b53e1680f8fedc37#file-local-mode-rdd-not-cached-works-fine]); > all is well! > h3. {{count}} a 100k-partition cached RDD: GC-dies > If we {{cache}} the RDD first, the same {{count}} job quickly sends the > driver into a GC death spiral: full GC's start after a few thousand tasks and > increase in frequency and length until they last minutes / become continuous > (and, in YARN, the driver loses contact with any executors). > Example outputs: > [local|https://gist.github.com/ryan-williams/a3d02ea49d2b5021b53e1680f8fedc37#file-local-mode-rdd-cached-crashes], > > [YARN|https://gist.github.com/ryan-williams/a3d02ea49d2b5021b53e1680f8fedc37#file-yarn-mode-cached-rdd-dies]. > The YARN example removes any confusion about whether the storing of the > blocks is causing memory pressure on the driver; the driver is basically > doing no work except receiving executor updates and events, and yet it > becomes overloaded. > h3. Can't effectively throw driver heap at the problem > I've tested with 1GB, 10GB, and 20GB heaps, and the larger heaps do what we'd > expect: delay the first Full GC, and make Full GCs longer when they happen. > I don't have a clear sense on whether the onset is linear or quadratic (i.e. > do I get twice as far into the job before the first Full GC with a 20GB as >
[jira] [Commented] (SPARK-21667) ConsoleSink should not fail streaming query with checkpointLocation option
[ https://issues.apache.org/jira/browse/SPARK-21667?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16120423#comment-16120423 ] Shixiong Zhu commented on SPARK-21667: -- Do you mind to submit a PR to fix it? > ConsoleSink should not fail streaming query with checkpointLocation option > -- > > Key: SPARK-21667 > URL: https://issues.apache.org/jira/browse/SPARK-21667 > Project: Spark > Issue Type: Improvement > Components: Structured Streaming >Affects Versions: 2.3.0 >Reporter: Jacek Laskowski >Priority: Minor > > As agreed on the Spark users mailing list in the thread "\[SS] Console sink > not supporting recovering from checkpoint location? Why?" in which > [~marmbrus] said: > {quote} > I think there is really no good reason for this limitation. > {quote} > Using {{ConsoleSink}} should therefore not fail a streaming query when used > with {{checkpointLocation}} option. > {code} > // today's build from the master > scala> spark.version > res8: String = 2.3.0-SNAPSHOT > scala> val q = records. > | writeStream. > | format("console"). > | option("truncate", false). > | option("checkpointLocation", "/tmp/checkpoint"). // <-- > checkpoint directory > | trigger(Trigger.ProcessingTime(10.seconds)). > | outputMode(OutputMode.Update). > | start > org.apache.spark.sql.AnalysisException: This query does not support > recovering from checkpoint location. Delete /tmp/checkpoint/offsets to start > over.; > at > org.apache.spark.sql.streaming.StreamingQueryManager.createQuery(StreamingQueryManager.scala:222) > at > org.apache.spark.sql.streaming.StreamingQueryManager.startQuery(StreamingQueryManager.scala:278) > at > org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:284) > ... 61 elided > {code} > The "trigger" is SPARK-16116 and [this > line|https://github.com/apache/spark/pull/13817/files#diff-d35e8fce09686073f81de598ed657de7R277] > in particular. > This also relates to SPARK-19768 that was resolved as not a bug. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-21684) df.write double escaping all the already escaped characters except the first one
Taran Saini created SPARK-21684: --- Summary: df.write double escaping all the already escaped characters except the first one Key: SPARK-21684 URL: https://issues.apache.org/jira/browse/SPARK-21684 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 2.2.0 Reporter: Taran Saini Hi, If we have a dataframe with the column value as {noformat} ab\,cd\,ef\,gh {noformat} Then while writing it is being written as {noformat} "ab\,cd\\,ef\\,gh" {noformat} i.e it double escapes all the already escaped commas/delimiters but not the first one. This is weird behaviour considering either it should do for all or none. If I do mention df.option("escape","") as empty then it solves this problem but the double quotes inside the same value if any are preceded by a special char i.e '\u00'. Why does it do so when the escape character is set as ""(empty)? -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-21682) Caching 100k-task RDD GC-kills driver (due to updatedBlockStatuses?)
[ https://issues.apache.org/jira/browse/SPARK-21682?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16120415#comment-16120415 ] Shixiong Zhu commented on SPARK-21682: -- I agree that driver is a bottleneck. I already saw several issues about the driver scalability. But do you really need to create so many partitions? Could you use `coalesce` to reduce the number of partitions? > Caching 100k-task RDD GC-kills driver (due to updatedBlockStatuses?) > > > Key: SPARK-21682 > URL: https://issues.apache.org/jira/browse/SPARK-21682 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.0.2, 2.1.1, 2.2.0 >Reporter: Ryan Williams > > h3. Summary > * {{sc.parallelize(1 to 10, 10).cache.count}} causes a driver GC > stall midway through on every configuration and version I've tried in 2.x. > * It runs fine with no Full GCs as of 1.6.3 > * I think that {{internal.metrics.updatedBlockStatuses}} is the culprit, and > breaks a contract about what big-O sizes accumulators' values can be: > ** they are each of size O(P), where P is the number of partitions in a > cached RDD > ** ⇒ the driver must process O(P²) data from {{TaskEnd}} events, instead of > O(P) > ** ⇒ the driver also must process O(P*E) work every 10s from > {{ExecutorMetricsUpdates}} (where E is the number of executors; cf. > {{spark.executor.heartbeatInterval}}) > * when operating on a 100k-partition cached RDD, the driver enters a GC loop > due to all the allocations it must do to process {{ExecutorMetricsUpdate}} > and {{TaskEnd}} events with {{updatedBlockStatuses}} attached > * this metric should be disabled, or some ability to blacklist it from the > command-line should be added. > * [SPARK-20084|https://issues.apache.org/jira/browse/SPARK-20084] addressed > one part of this - the event-log size had exploded - but the root problem > still exists / is worse > h3. {{count}} a 100k-partition RDD: works fine without {{.cache}} > In Spark 2.2.0 or 2.1.1: > {code} > spark-shell --conf spark.driver.extraJavaOptions="-XX:+PrintGCDetails > -XX:+PrintGCTimeStamps -verbose:gc" > scala> val rdd = sc.parallelize(1 to 10, 10) > scala> rdd.count > {code} > In YARN and local modes, this finishes in ~20s seconds with ~20 partial GCs > logged, all taking under 0.1s ([example > output|https://gist.github.com/ryan-williams/a3d02ea49d2b5021b53e1680f8fedc37#file-local-mode-rdd-not-cached-works-fine]); > all is well! > h3. {{count}} a 100k-partition cached RDD: GC-dies > If we {{cache}} the RDD first, the same {{count}} job quickly sends the > driver into a GC death spiral: full GC's start after a few thousand tasks and > increase in frequency and length until they last minutes / become continuous > (and, in YARN, the driver loses contact with any executors). > Example outputs: > [local|https://gist.github.com/ryan-williams/a3d02ea49d2b5021b53e1680f8fedc37#file-local-mode-rdd-cached-crashes], > > [YARN|https://gist.github.com/ryan-williams/a3d02ea49d2b5021b53e1680f8fedc37#file-yarn-mode-cached-rdd-dies]. > The YARN example removes any confusion about whether the storing of the > blocks is causing memory pressure on the driver; the driver is basically > doing no work except receiving executor updates and events, and yet it > becomes overloaded. > h3. Can't effectively throw driver heap at the problem > I've tested with 1GB, 10GB, and 20GB heaps, and the larger heaps do what we'd > expect: delay the first Full GC, and make Full GCs longer when they happen. > I don't have a clear sense on whether the onset is linear or quadratic (i.e. > do I get twice as far into the job before the first Full GC with a 20GB as > with a 10GB heap, or only sqrt(2) times as far?). > h3. Mostly memory pressure, not OOMs > An interesting note is that I'm rarely seeing OOMs as a result of this, even > on small heaps. > I think this is consistent with the idea that all this data is being > immediately discarded by the driver, as opposed to kept around to serve web > UIs or somesuch. > h3. Eliminating {{ExecutorMetricsUpdate}}'s doesn't seem to help > Interestingly, setting large values of {{spark.executor.heartbeatInterval}} > doesn't seem to mitigate the problem; GC-stall sets in at about the same > point in the {{count}} job. > This implies that, in this example, the {{TaskEnd}} events are doing most or > all of the damage. > h3. CMS helps but doesn't solve the problem > In some rough testing, I saw the {{count}} get about twice as far before > dying when using the CMS collector. > h3. What bandwidth do we expect the driver to process events at? > IIUC, every 10s the driver gets O(T) (~100k?) block updates from each of ~500 > executors, and allocating objects for these updates is pushing it over a > tipping
[jira] [Updated] (SPARK-21682) Caching 100k-task RDD GC-kills driver (due to updatedBlockStatuses?)
[ https://issues.apache.org/jira/browse/SPARK-21682?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ryan Williams updated SPARK-21682: -- Description: h3. Summary * {{sc.parallelize(1 to 10, 10).cache.count}} causes a driver GC stall midway through on every configuration and version I've tried in 2.x. * It runs fine with no Full GCs as of 1.6.3 * I think that {{internal.metrics.updatedBlockStatuses}} is the culprit, and breaks a contract about what big-O sizes accumulators' values can be: ** they are each of size O(P), where P is the number of partitions in a cached RDD ** ⇒ the driver must process O(P²) data from {{TaskEnd}} events, instead of O(P) ** ⇒ the driver also must process O(P*E) work every 10s from {{ExecutorMetricsUpdates}} (where E is the number of executors; cf. {{spark.executor.heartbeatInterval}}) * when operating on a 100k-partition cached RDD, the driver enters a GC loop due to all the allocations it must do to process {{ExecutorMetricsUpdate}} and {{TaskEnd}} events with {{updatedBlockStatuses}} attached * this metric should be disabled, or some ability to blacklist it from the command-line should be added. * [SPARK-20084|https://issues.apache.org/jira/browse/SPARK-20084] addressed one part of this - the event-log size had exploded - but the root problem still exists / is worse h3. {{count}} a 100k-partition RDD: works fine without {{.cache}} In Spark 2.2.0 or 2.1.1: {code} spark-shell --conf spark.driver.extraJavaOptions="-XX:+PrintGCDetails -XX:+PrintGCTimeStamps -verbose:gc" scala> val rdd = sc.parallelize(1 to 10, 10) scala> rdd.count {code} In YARN and local modes, this finishes in ~20s seconds with ~20 partial GCs logged, all taking under 0.1s ([example output|https://gist.github.com/ryan-williams/a3d02ea49d2b5021b53e1680f8fedc37#file-local-mode-rdd-not-cached-works-fine]); all is well! h3. {{count}} a 100k-partition cached RDD: GC-dies If we {{cache}} the RDD first, the same {{count}} job quickly sends the driver into a GC death spiral: full GC's start after a few thousand tasks and increase in frequency and length until they last minutes / become continuous (and, in YARN, the driver loses contact with any executors). Example outputs: [local|https://gist.github.com/ryan-williams/a3d02ea49d2b5021b53e1680f8fedc37#file-local-mode-rdd-cached-crashes], [YARN|https://gist.github.com/ryan-williams/a3d02ea49d2b5021b53e1680f8fedc37#file-yarn-mode-cached-rdd-dies]. The YARN example removes any confusion about whether the storing of the blocks is causing memory pressure on the driver; the driver is basically doing no work except receiving executor updates and events, and yet it becomes overloaded. h3. Can't effectively throw driver heap at the problem I've tested with 1GB, 10GB, and 20GB heaps, and the larger heaps do what we'd expect: delay the first Full GC, and make Full GCs longer when they happen. I don't have a clear sense on whether the onset is linear or quadratic (i.e. do I get twice as far into the job before the first Full GC with a 20GB as with a 10GB heap, or only sqrt(2) times as far?). h3. Mostly memory pressure, not OOMs An interesting note is that I'm rarely seeing OOMs as a result of this, even on small heaps. I think this is consistent with the idea that all this data is being immediately discarded by the driver, as opposed to kept around to serve web UIs or somesuch. h3. Eliminating {{ExecutorMetricsUpdate}}'s doesn't seem to help Interestingly, setting large values of {{spark.executor.heartbeatInterval}} doesn't seem to mitigate the problem; GC-stall sets in at about the same point in the {{count}} job. This implies that, in this example, the {{TaskEnd}} events are doing most or all of the damage. h3. CMS helps but doesn't solve the problem In some rough testing, I saw the {{count}} get about twice as far before dying when using the CMS collector. h3. What bandwidth do we expect the driver to process events at? IIUC, every 10s the driver gets O(T) (~100k?) block updates from each of ~500 executors, and allocating objects for these updates is pushing it over a tipping point where it can't keep up. I don't know how to get good numbers on how much data the driver is processing; does anyone? There should be monitoring/tests in place to catch a regression where the driver begins writing 1000x the data to the event-log, or having to process 1000x the data over the event bus h3. Should this accumulator be disabled altogether? Seems like yes, to me. Making the driver churn through all this useless data seems unreasonable (short of a major refactoring of the driver to... offload things to threads?). was: h3. Summary * * {{internal.metrics.updatedBlockStatuses}} breaks a contract about what big-O sizes accumulators' values can be: ** they are each of size O(P), where P is the number of partitions in a cached RDD ** ⇒ the driver
[jira] [Comment Edited] (SPARK-21682) Caching 100k-task RDD GC-kills driver (due to updatedBlockStatuses?)
[ https://issues.apache.org/jira/browse/SPARK-21682?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16120409#comment-16120409 ] Ryan Williams edited comment on SPARK-21682 at 8/9/17 6:28 PM: --- Interestingly, I thought the {{updatedBlockStatuses}} issue started in 2.1.x, but I can't {{sc.parallelize(1 to 10, 10).cache.count}} on 2.0.2 either. Maybe the issue started earlier than I thought, or something else is causing the driver GC stall? I can {{sc.parallelize(1 to 10, 10).cache.count}} with no Full GCs / no problem on 1.6.3 in local mode with a 1GB driver. I've updated the issue title and description above to reflect this a bit. was (Author: rdub): Interestingly, I thought the {{updatedBlockStatuses}} issue started in 2.1.x, but I can't {{sc.parallelize(1 to 10, 10).cache.count}} on 2.0.2 either. Maybe the issue started earlier than I thought, or something else is causing the driver GC stall? I can {{sc.parallelize(1 to 10, 10).cache.count}} with no Full GCs / no problem on 1.6.3 in local mode with a 1GB driver. > Caching 100k-task RDD GC-kills driver (due to updatedBlockStatuses?) > > > Key: SPARK-21682 > URL: https://issues.apache.org/jira/browse/SPARK-21682 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.0.2, 2.1.1, 2.2.0 >Reporter: Ryan Williams > > h3. Summary > * {{sc.parallelize(1 to 10, 10).cache.count}} causes a driver GC > stall midway through on every configuration and version I've tried in 2.x. > * It runs fine with no Full GCs as of 1.6.3 > * I think that {{internal.metrics.updatedBlockStatuses}} is the culprit, and > breaks a contract about what big-O sizes accumulators' values can be: > ** they are each of size O(P), where P is the number of partitions in a > cached RDD > ** ⇒ the driver must process O(P²) data from {{TaskEnd}} events, instead of > O(P) > ** ⇒ the driver also must process O(P*E) work every 10s from > {{ExecutorMetricsUpdates}} (where E is the number of executors; cf. > {{spark.executor.heartbeatInterval}}) > * when operating on a 100k-partition cached RDD, the driver enters a GC loop > due to all the allocations it must do to process {{ExecutorMetricsUpdate}} > and {{TaskEnd}} events with {{updatedBlockStatuses}} attached > * this metric should be disabled, or some ability to blacklist it from the > command-line should be added. > * [SPARK-20084|https://issues.apache.org/jira/browse/SPARK-20084] addressed > one part of this - the event-log size had exploded - but the root problem > still exists / is worse > h3. {{count}} a 100k-partition RDD: works fine without {{.cache}} > In Spark 2.2.0 or 2.1.1: > {code} > spark-shell --conf spark.driver.extraJavaOptions="-XX:+PrintGCDetails > -XX:+PrintGCTimeStamps -verbose:gc" > scala> val rdd = sc.parallelize(1 to 10, 10) > scala> rdd.count > {code} > In YARN and local modes, this finishes in ~20s seconds with ~20 partial GCs > logged, all taking under 0.1s ([example > output|https://gist.github.com/ryan-williams/a3d02ea49d2b5021b53e1680f8fedc37#file-local-mode-rdd-not-cached-works-fine]); > all is well! > h3. {{count}} a 100k-partition cached RDD: GC-dies > If we {{cache}} the RDD first, the same {{count}} job quickly sends the > driver into a GC death spiral: full GC's start after a few thousand tasks and > increase in frequency and length until they last minutes / become continuous > (and, in YARN, the driver loses contact with any executors). > Example outputs: > [local|https://gist.github.com/ryan-williams/a3d02ea49d2b5021b53e1680f8fedc37#file-local-mode-rdd-cached-crashes], > > [YARN|https://gist.github.com/ryan-williams/a3d02ea49d2b5021b53e1680f8fedc37#file-yarn-mode-cached-rdd-dies]. > The YARN example removes any confusion about whether the storing of the > blocks is causing memory pressure on the driver; the driver is basically > doing no work except receiving executor updates and events, and yet it > becomes overloaded. > h3. Can't effectively throw driver heap at the problem > I've tested with 1GB, 10GB, and 20GB heaps, and the larger heaps do what we'd > expect: delay the first Full GC, and make Full GCs longer when they happen. > I don't have a clear sense on whether the onset is linear or quadratic (i.e. > do I get twice as far into the job before the first Full GC with a 20GB as > with a 10GB heap, or only sqrt(2) times as far?). > h3. Mostly memory pressure, not OOMs > An interesting note is that I'm rarely seeing OOMs as a result of this, even > on small heaps. > I think this is consistent with the idea that all this data is being > immediately discarded by the driver, as opposed to kept around to serve web > UIs or somesuch. > h3. Eliminating
[jira] [Updated] (SPARK-21682) Caching 100k-task RDD GC-kills driver (due to updatedBlockStatuses?)
[ https://issues.apache.org/jira/browse/SPARK-21682?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ryan Williams updated SPARK-21682: -- Summary: Caching 100k-task RDD GC-kills driver (due to updatedBlockStatuses?) (was: Caching 100k-task RDD GC-kills driver due to updatedBlockStatuses) > Caching 100k-task RDD GC-kills driver (due to updatedBlockStatuses?) > > > Key: SPARK-21682 > URL: https://issues.apache.org/jira/browse/SPARK-21682 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.0.2, 2.1.1, 2.2.0 >Reporter: Ryan Williams > > h3. Summary > * {{internal.metrics.updatedBlockStatuses}} breaks a contract about what > big-O sizes accumulators' values can be: > ** they are each of size O(P), where P is the number of partitions in a > cached RDD > ** ⇒ the driver must process O(P²) data from {{TaskEnd}} events, instead of > O(P) > ** ⇒ the driver also must process O(P*E) work every 10s from > {{ExecutorMetricsUpdates}} (where E is the number of executors; cf. > {{spark.executor.heartbeatInterval}}) > * when operating on a 100k-partition cached RDD, the driver enters a GC loop > due to all the allocations it must do to process {{ExecutorMetricsUpdate}} > and {{TaskEnd}} events with {{updatedBlockStatuses}} attached > * this metric should be disabled, or some ability to blacklist it from the > command-line should be added. > * [SPARK-20084|https://issues.apache.org/jira/browse/SPARK-20084] addressed > one part of this - the event-log size had exploded - but the root problem > still exists / is worse > h3. {{count}} a 100k-partition RDD: works fine without {{.cache}} > In Spark 2.2.0 or 2.1.1: > {code} > spark-shell --conf spark.driver.extraJavaOptions="-XX:+PrintGCDetails > -XX:+PrintGCTimeStamps -verbose:gc" > scala> val rdd = sc.parallelize(1 to 10, 10) > scala> rdd.count > {code} > In YARN and local modes, this finishes in ~20s seconds with ~20 partial GCs > logged, all taking under 0.1s ([example > output|https://gist.github.com/ryan-williams/a3d02ea49d2b5021b53e1680f8fedc37#file-local-mode-rdd-not-cached-works-fine]); > all is well! > h3. {{count}} a 100k-partition cached RDD: GC-dies > If we {{cache}} the RDD first, the same {{count}} job quickly sends the > driver into a GC death spiral: full GC's start after a few thousand tasks and > increase in frequency and length until they last minutes / become continuous > (and, in YARN, the driver loses contact with any executors). > Example outputs: > [local|https://gist.github.com/ryan-williams/a3d02ea49d2b5021b53e1680f8fedc37#file-local-mode-rdd-cached-crashes], > > [YARN|https://gist.github.com/ryan-williams/a3d02ea49d2b5021b53e1680f8fedc37#file-yarn-mode-cached-rdd-dies]. > The YARN example removes any confusion about whether the storing of the > blocks is causing memory pressure on the driver; the driver is basically > doing no work except receiving executor updates and events, and yet it > becomes overloaded. > h3. Can't effectively throw driver heap at the problem > I've tested with 1GB, 10GB, and 20GB heaps, and the larger heaps do what we'd > expect: delay the first Full GC, and make Full GCs longer when they happen. > I don't have a clear sense on whether the onset is linear or quadratic (i.e. > do I get twice as far into the job before the first Full GC with a 20GB as > with a 10GB heap, or only sqrt(2) times as far?). > h3. Mostly memory pressure, not OOMs > An interesting note is that I'm rarely seeing OOMs as a result of this, even > on small heaps. > I think this is consistent with the idea that all this data is being > immediately discarded by the driver, as opposed to kept around to serve web > UIs or somesuch. > h3. Eliminating {{ExecutorMetricsUpdate}}'s doesn't seem to help > Interestingly, setting large values of {{spark.executor.heartbeatInterval}} > doesn't seem to mitigate the problem; GC-stall sets in at about the same > point in the {{count}} job. > This implies that, in this example, the {{TaskEnd}} events are doing most or > all of the damage. > h3. CMS helps but doesn't solve the problem > In some rough testing, I saw the {{count}} get about twice as far before > dying when using the CMS collector. > h3. What bandwidth do we expect the driver to process events at? > IIUC, every 10s the driver gets O(T) (~100k?) block updates from each of ~500 > executors, and allocating objects for these updates is pushing it over a > tipping point where it can't keep up. > I don't know how to get good numbers on how much data the driver is > processing; does anyone? > There should be monitoring/tests in place to catch a regression where the > driver begins writing 1000x the data to the event-log, or having to process > 1000x the data over the event bus >
[jira] [Updated] (SPARK-21682) Caching 100k-task RDD GC-kills driver due to updatedBlockStatuses
[ https://issues.apache.org/jira/browse/SPARK-21682?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ryan Williams updated SPARK-21682: -- Affects Version/s: 2.0.2 > Caching 100k-task RDD GC-kills driver due to updatedBlockStatuses > - > > Key: SPARK-21682 > URL: https://issues.apache.org/jira/browse/SPARK-21682 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.0.2, 2.1.1, 2.2.0 >Reporter: Ryan Williams > > h3. Summary > * {{internal.metrics.updatedBlockStatuses}} breaks a contract about what > big-O sizes accumulators' values can be: > ** they are each of size O(P), where P is the number of partitions in a > cached RDD > ** ⇒ the driver must process O(P²) data from {{TaskEnd}} events, instead of > O(P) > ** ⇒ the driver also must process O(P*E) work every 10s from > {{ExecutorMetricsUpdates}} (where E is the number of executors; cf. > {{spark.executor.heartbeatInterval}}) > * when operating on a 100k-partition cached RDD, the driver enters a GC loop > due to all the allocations it must do to process {{ExecutorMetricsUpdate}} > and {{TaskEnd}} events with {{updatedBlockStatuses}} attached > * this metric should be disabled, or some ability to blacklist it from the > command-line should be added. > * [SPARK-20084|https://issues.apache.org/jira/browse/SPARK-20084] addressed > one part of this - the event-log size had exploded - but the root problem > still exists / is worse > h3. {{count}} a 100k-partition RDD: works fine without {{.cache}} > In Spark 2.2.0 or 2.1.1: > {code} > spark-shell --conf spark.driver.extraJavaOptions="-XX:+PrintGCDetails > -XX:+PrintGCTimeStamps -verbose:gc" > scala> val rdd = sc.parallelize(1 to 10, 10) > scala> rdd.count > {code} > In YARN and local modes, this finishes in ~20s seconds with ~20 partial GCs > logged, all taking under 0.1s ([example > output|https://gist.github.com/ryan-williams/a3d02ea49d2b5021b53e1680f8fedc37#file-local-mode-rdd-not-cached-works-fine]); > all is well! > h3. {{count}} a 100k-partition cached RDD: GC-dies > If we {{cache}} the RDD first, the same {{count}} job quickly sends the > driver into a GC death spiral: full GC's start after a few thousand tasks and > increase in frequency and length until they last minutes / become continuous > (and, in YARN, the driver loses contact with any executors). > Example outputs: > [local|https://gist.github.com/ryan-williams/a3d02ea49d2b5021b53e1680f8fedc37#file-local-mode-rdd-cached-crashes], > > [YARN|https://gist.github.com/ryan-williams/a3d02ea49d2b5021b53e1680f8fedc37#file-yarn-mode-cached-rdd-dies]. > The YARN example removes any confusion about whether the storing of the > blocks is causing memory pressure on the driver; the driver is basically > doing no work except receiving executor updates and events, and yet it > becomes overloaded. > h3. Can't effectively throw driver heap at the problem > I've tested with 1GB, 10GB, and 20GB heaps, and the larger heaps do what we'd > expect: delay the first Full GC, and make Full GCs longer when they happen. > I don't have a clear sense on whether the onset is linear or quadratic (i.e. > do I get twice as far into the job before the first Full GC with a 20GB as > with a 10GB heap, or only sqrt(2) times as far?). > h3. Mostly memory pressure, not OOMs > An interesting note is that I'm rarely seeing OOMs as a result of this, even > on small heaps. > I think this is consistent with the idea that all this data is being > immediately discarded by the driver, as opposed to kept around to serve web > UIs or somesuch. > h3. Eliminating {{ExecutorMetricsUpdate}}'s doesn't seem to help > Interestingly, setting large values of {{spark.executor.heartbeatInterval}} > doesn't seem to mitigate the problem; GC-stall sets in at about the same > point in the {{count}} job. > This implies that, in this example, the {{TaskEnd}} events are doing most or > all of the damage. > h3. CMS helps but doesn't solve the problem > In some rough testing, I saw the {{count}} get about twice as far before > dying when using the CMS collector. > h3. What bandwidth do we expect the driver to process events at? > IIUC, every 10s the driver gets O(T) (~100k?) block updates from each of ~500 > executors, and allocating objects for these updates is pushing it over a > tipping point where it can't keep up. > I don't know how to get good numbers on how much data the driver is > processing; does anyone? > There should be monitoring/tests in place to catch a regression where the > driver begins writing 1000x the data to the event-log, or having to process > 1000x the data over the event bus > h3. Should this accumulator be disabled altogether? > Seems like yes, to me. Making the driver churn through all this useless data >
[jira] [Updated] (SPARK-21682) Caching 100k-task RDD GC-kills driver (due to updatedBlockStatuses?)
[ https://issues.apache.org/jira/browse/SPARK-21682?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ryan Williams updated SPARK-21682: -- Description: h3. Summary * * {{internal.metrics.updatedBlockStatuses}} breaks a contract about what big-O sizes accumulators' values can be: ** they are each of size O(P), where P is the number of partitions in a cached RDD ** ⇒ the driver must process O(P²) data from {{TaskEnd}} events, instead of O(P) ** ⇒ the driver also must process O(P*E) work every 10s from {{ExecutorMetricsUpdates}} (where E is the number of executors; cf. {{spark.executor.heartbeatInterval}}) * when operating on a 100k-partition cached RDD, the driver enters a GC loop due to all the allocations it must do to process {{ExecutorMetricsUpdate}} and {{TaskEnd}} events with {{updatedBlockStatuses}} attached * this metric should be disabled, or some ability to blacklist it from the command-line should be added. * [SPARK-20084|https://issues.apache.org/jira/browse/SPARK-20084] addressed one part of this - the event-log size had exploded - but the root problem still exists / is worse h3. {{count}} a 100k-partition RDD: works fine without {{.cache}} In Spark 2.2.0 or 2.1.1: {code} spark-shell --conf spark.driver.extraJavaOptions="-XX:+PrintGCDetails -XX:+PrintGCTimeStamps -verbose:gc" scala> val rdd = sc.parallelize(1 to 10, 10) scala> rdd.count {code} In YARN and local modes, this finishes in ~20s seconds with ~20 partial GCs logged, all taking under 0.1s ([example output|https://gist.github.com/ryan-williams/a3d02ea49d2b5021b53e1680f8fedc37#file-local-mode-rdd-not-cached-works-fine]); all is well! h3. {{count}} a 100k-partition cached RDD: GC-dies If we {{cache}} the RDD first, the same {{count}} job quickly sends the driver into a GC death spiral: full GC's start after a few thousand tasks and increase in frequency and length until they last minutes / become continuous (and, in YARN, the driver loses contact with any executors). Example outputs: [local|https://gist.github.com/ryan-williams/a3d02ea49d2b5021b53e1680f8fedc37#file-local-mode-rdd-cached-crashes], [YARN|https://gist.github.com/ryan-williams/a3d02ea49d2b5021b53e1680f8fedc37#file-yarn-mode-cached-rdd-dies]. The YARN example removes any confusion about whether the storing of the blocks is causing memory pressure on the driver; the driver is basically doing no work except receiving executor updates and events, and yet it becomes overloaded. h3. Can't effectively throw driver heap at the problem I've tested with 1GB, 10GB, and 20GB heaps, and the larger heaps do what we'd expect: delay the first Full GC, and make Full GCs longer when they happen. I don't have a clear sense on whether the onset is linear or quadratic (i.e. do I get twice as far into the job before the first Full GC with a 20GB as with a 10GB heap, or only sqrt(2) times as far?). h3. Mostly memory pressure, not OOMs An interesting note is that I'm rarely seeing OOMs as a result of this, even on small heaps. I think this is consistent with the idea that all this data is being immediately discarded by the driver, as opposed to kept around to serve web UIs or somesuch. h3. Eliminating {{ExecutorMetricsUpdate}}'s doesn't seem to help Interestingly, setting large values of {{spark.executor.heartbeatInterval}} doesn't seem to mitigate the problem; GC-stall sets in at about the same point in the {{count}} job. This implies that, in this example, the {{TaskEnd}} events are doing most or all of the damage. h3. CMS helps but doesn't solve the problem In some rough testing, I saw the {{count}} get about twice as far before dying when using the CMS collector. h3. What bandwidth do we expect the driver to process events at? IIUC, every 10s the driver gets O(T) (~100k?) block updates from each of ~500 executors, and allocating objects for these updates is pushing it over a tipping point where it can't keep up. I don't know how to get good numbers on how much data the driver is processing; does anyone? There should be monitoring/tests in place to catch a regression where the driver begins writing 1000x the data to the event-log, or having to process 1000x the data over the event bus h3. Should this accumulator be disabled altogether? Seems like yes, to me. Making the driver churn through all this useless data seems unreasonable (short of a major refactoring of the driver to... offload things to threads?). was: h3. Summary * {{internal.metrics.updatedBlockStatuses}} breaks a contract about what big-O sizes accumulators' values can be: ** they are each of size O(P), where P is the number of partitions in a cached RDD ** ⇒ the driver must process O(P²) data from {{TaskEnd}} events, instead of O(P) ** ⇒ the driver also must process O(P*E) work every 10s from {{ExecutorMetricsUpdates}} (where E is the number of executors; cf.
[jira] [Commented] (SPARK-21682) Caching 100k-task RDD GC-kills driver due to updatedBlockStatuses
[ https://issues.apache.org/jira/browse/SPARK-21682?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16120409#comment-16120409 ] Ryan Williams commented on SPARK-21682: --- Interestingly, I thought the {{updatedBlockStatuses}} issue started in 2.1.x, but I can't {{sc.parallelize(1 to 10, 10).cache.count}} on 2.0.2 either. Maybe the issue started earlier than I thought, or something else is causing the driver GC stall? I can {{sc.parallelize(1 to 10, 10).cache.count}} with no Full GCs / no problem on 1.6.3 in local mode with a 1GB driver. > Caching 100k-task RDD GC-kills driver due to updatedBlockStatuses > - > > Key: SPARK-21682 > URL: https://issues.apache.org/jira/browse/SPARK-21682 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.0.2, 2.1.1, 2.2.0 >Reporter: Ryan Williams > > h3. Summary > * {{internal.metrics.updatedBlockStatuses}} breaks a contract about what > big-O sizes accumulators' values can be: > ** they are each of size O(P), where P is the number of partitions in a > cached RDD > ** ⇒ the driver must process O(P²) data from {{TaskEnd}} events, instead of > O(P) > ** ⇒ the driver also must process O(P*E) work every 10s from > {{ExecutorMetricsUpdates}} (where E is the number of executors; cf. > {{spark.executor.heartbeatInterval}}) > * when operating on a 100k-partition cached RDD, the driver enters a GC loop > due to all the allocations it must do to process {{ExecutorMetricsUpdate}} > and {{TaskEnd}} events with {{updatedBlockStatuses}} attached > * this metric should be disabled, or some ability to blacklist it from the > command-line should be added. > * [SPARK-20084|https://issues.apache.org/jira/browse/SPARK-20084] addressed > one part of this - the event-log size had exploded - but the root problem > still exists / is worse > h3. {{count}} a 100k-partition RDD: works fine without {{.cache}} > In Spark 2.2.0 or 2.1.1: > {code} > spark-shell --conf spark.driver.extraJavaOptions="-XX:+PrintGCDetails > -XX:+PrintGCTimeStamps -verbose:gc" > scala> val rdd = sc.parallelize(1 to 10, 10) > scala> rdd.count > {code} > In YARN and local modes, this finishes in ~20s seconds with ~20 partial GCs > logged, all taking under 0.1s ([example > output|https://gist.github.com/ryan-williams/a3d02ea49d2b5021b53e1680f8fedc37#file-local-mode-rdd-not-cached-works-fine]); > all is well! > h3. {{count}} a 100k-partition cached RDD: GC-dies > If we {{cache}} the RDD first, the same {{count}} job quickly sends the > driver into a GC death spiral: full GC's start after a few thousand tasks and > increase in frequency and length until they last minutes / become continuous > (and, in YARN, the driver loses contact with any executors). > Example outputs: > [local|https://gist.github.com/ryan-williams/a3d02ea49d2b5021b53e1680f8fedc37#file-local-mode-rdd-cached-crashes], > > [YARN|https://gist.github.com/ryan-williams/a3d02ea49d2b5021b53e1680f8fedc37#file-yarn-mode-cached-rdd-dies]. > The YARN example removes any confusion about whether the storing of the > blocks is causing memory pressure on the driver; the driver is basically > doing no work except receiving executor updates and events, and yet it > becomes overloaded. > h3. Can't effectively throw driver heap at the problem > I've tested with 1GB, 10GB, and 20GB heaps, and the larger heaps do what we'd > expect: delay the first Full GC, and make Full GCs longer when they happen. > I don't have a clear sense on whether the onset is linear or quadratic (i.e. > do I get twice as far into the job before the first Full GC with a 20GB as > with a 10GB heap, or only sqrt(2) times as far?). > h3. Mostly memory pressure, not OOMs > An interesting note is that I'm rarely seeing OOMs as a result of this, even > on small heaps. > I think this is consistent with the idea that all this data is being > immediately discarded by the driver, as opposed to kept around to serve web > UIs or somesuch. > h3. Eliminating {{ExecutorMetricsUpdate}}'s doesn't seem to help > Interestingly, setting large values of {{spark.executor.heartbeatInterval}} > doesn't seem to mitigate the problem; GC-stall sets in at about the same > point in the {{count}} job. > This implies that, in this example, the {{TaskEnd}} events are doing most or > all of the damage. > h3. CMS helps but doesn't solve the problem > In some rough testing, I saw the {{count}} get about twice as far before > dying when using the CMS collector. > h3. What bandwidth do we expect the driver to process events at? > IIUC, every 10s the driver gets O(T) (~100k?) block updates from each of ~500 > executors, and allocating objects for these updates is pushing it over a > tipping point where it can't keep up. > I don't know how to get
[jira] [Commented] (SPARK-20642) Use key-value store to keep History Server application listing
[ https://issues.apache.org/jira/browse/SPARK-20642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16120398#comment-16120398 ] Marcelo Vanzin commented on SPARK-20642: PR: https://github.com/apache/spark/pull/18887 > Use key-value store to keep History Server application listing > -- > > Key: SPARK-20642 > URL: https://issues.apache.org/jira/browse/SPARK-20642 > Project: Spark > Issue Type: Sub-task > Components: Spark Core >Affects Versions: 2.3.0 >Reporter: Marcelo Vanzin > > See spec in parent issue (SPARK-18085) for more details. > This task tracks using the new key-value store added in SPARK-20641 to store > the SHS application listing, allowing it to be quickly reloaded when the SHS > restarts. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-21683) "TaskKilled (another attempt succeeded)" log message should be INFO level, not WARN
Ryan Williams created SPARK-21683: - Summary: "TaskKilled (another attempt succeeded)" log message should be INFO level, not WARN Key: SPARK-21683 URL: https://issues.apache.org/jira/browse/SPARK-21683 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 2.2.0 Reporter: Ryan Williams Priority: Minor Under other circumstances, it makes sense for {{TaskKilled}} to prompt a WARN log, but in speculative mode, when another attempt finishes first, that's not worth warning about, and should be INFO level. This partly matters because when running with root log level WARN, and following progress using the console progress bar, these messages are a large amount and proportion of logspam that messes with the console progress display. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-21453) Cached Kafka consumer may be closed too early
[ https://issues.apache.org/jira/browse/SPARK-21453?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16120393#comment-16120393 ] Shixiong Zhu edited comment on SPARK-21453 at 8/9/17 6:13 PM: -- The error message looks like the Kafka broker storing the record for offset 7591351 is offline/standby. If the broker can recover fast, you can just restart your query and it will start from the failed point. was (Author: zsxwing): The error message looks like the Kafka broker storing the record for offset 7591351 is offline/standby. If it can recover fast, you can just restart your query and it will start from the failed point. > Cached Kafka consumer may be closed too early > - > > Key: SPARK-21453 > URL: https://issues.apache.org/jira/browse/SPARK-21453 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Affects Versions: 2.2.0 > Environment: Spark 2.2.0 and kafka 0.10.2.0 >Reporter: Pablo Panero >Priority: Minor > > On a streaming job using built-in kafka source and sink (over SSL), with I > am getting the following exception: > Config of the source: > {code:java} > val df = spark.readStream > .format("kafka") > .option("kafka.bootstrap.servers", config.bootstrapServers) > .option("failOnDataLoss", value = false) > .option("kafka.connections.max.idle.ms", 360) > //SSL: this only applies to communication between Spark and Kafka > brokers; you are still responsible for separately securing Spark inter-node > communication. > .option("kafka.security.protocol", "SASL_SSL") > .option("kafka.sasl.mechanism", "GSSAPI") > .option("kafka.sasl.kerberos.service.name", "kafka") > .option("kafka.ssl.truststore.location", "/etc/pki/java/cacerts") > .option("kafka.ssl.truststore.password", "changeit") > .option("subscribe", config.topicConfigList.keys.mkString(",")) > .load() > {code} > Config of the sink: > {code:java} > .writeStream > .option("checkpointLocation", > s"${config.checkpointDir}/${topicConfig._1}/") > .format("kafka") > .option("kafka.bootstrap.servers", config.bootstrapServers) > .option("kafka.connections.max.idle.ms", 360) > //SSL: this only applies to communication between Spark and Kafka > brokers; you are still responsible for separately securing Spark inter-node > communication. > .option("kafka.security.protocol", "SASL_SSL") > .option("kafka.sasl.mechanism", "GSSAPI") > .option("kafka.sasl.kerberos.service.name", "kafka") > .option("kafka.ssl.truststore.location", "/etc/pki/java/cacerts") > .option("kafka.ssl.truststore.password", "changeit") > .start() > {code} > {code:java} > 17/07/18 10:11:58 WARN SslTransportLayer: Failed to send SSL Close message > java.io.IOException: Broken pipe > at sun.nio.ch.FileDispatcherImpl.write0(Native Method) > at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47) > at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93) > at sun.nio.ch.IOUtil.write(IOUtil.java:65) > at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471) > at > org.apache.kafka.common.network.SslTransportLayer.flush(SslTransportLayer.java:195) > at > org.apache.kafka.common.network.SslTransportLayer.close(SslTransportLayer.java:163) > at org.apache.kafka.common.utils.Utils.closeAll(Utils.java:731) > at > org.apache.kafka.common.network.KafkaChannel.close(KafkaChannel.java:54) > at org.apache.kafka.common.network.Selector.doClose(Selector.java:540) > at org.apache.kafka.common.network.Selector.close(Selector.java:531) > at > org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:378) > at org.apache.kafka.common.network.Selector.poll(Selector.java:303) > at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:349) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:226) > at > org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1047) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995) > at > org.apache.spark.sql.kafka010.CachedKafkaConsumer.poll(CachedKafkaConsumer.scala:298) > at > org.apache.spark.sql.kafka010.CachedKafkaConsumer.org$apache$spark$sql$kafka010$CachedKafkaConsumer$$fetchData(CachedKafkaConsumer.scala:206) > at > org.apache.spark.sql.kafka010.CachedKafkaConsumer$$anonfun$get$1.apply(CachedKafkaConsumer.scala:117) > at > org.apache.spark.sql.kafka010.CachedKafkaConsumer$$anonfun$get$1.apply(CachedKafkaConsumer.scala:106) > at >
[jira] [Commented] (SPARK-21453) Cached Kafka consumer may be closed too early
[ https://issues.apache.org/jira/browse/SPARK-21453?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16120393#comment-16120393 ] Shixiong Zhu commented on SPARK-21453: -- The error message looks like the Kafka broker storing the record for offset 7591351 is offline/standby. If it can recover fast, you can just restart your query and it will start from the failed point. > Cached Kafka consumer may be closed too early > - > > Key: SPARK-21453 > URL: https://issues.apache.org/jira/browse/SPARK-21453 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Affects Versions: 2.2.0 > Environment: Spark 2.2.0 and kafka 0.10.2.0 >Reporter: Pablo Panero >Priority: Minor > > On a streaming job using built-in kafka source and sink (over SSL), with I > am getting the following exception: > Config of the source: > {code:java} > val df = spark.readStream > .format("kafka") > .option("kafka.bootstrap.servers", config.bootstrapServers) > .option("failOnDataLoss", value = false) > .option("kafka.connections.max.idle.ms", 360) > //SSL: this only applies to communication between Spark and Kafka > brokers; you are still responsible for separately securing Spark inter-node > communication. > .option("kafka.security.protocol", "SASL_SSL") > .option("kafka.sasl.mechanism", "GSSAPI") > .option("kafka.sasl.kerberos.service.name", "kafka") > .option("kafka.ssl.truststore.location", "/etc/pki/java/cacerts") > .option("kafka.ssl.truststore.password", "changeit") > .option("subscribe", config.topicConfigList.keys.mkString(",")) > .load() > {code} > Config of the sink: > {code:java} > .writeStream > .option("checkpointLocation", > s"${config.checkpointDir}/${topicConfig._1}/") > .format("kafka") > .option("kafka.bootstrap.servers", config.bootstrapServers) > .option("kafka.connections.max.idle.ms", 360) > //SSL: this only applies to communication between Spark and Kafka > brokers; you are still responsible for separately securing Spark inter-node > communication. > .option("kafka.security.protocol", "SASL_SSL") > .option("kafka.sasl.mechanism", "GSSAPI") > .option("kafka.sasl.kerberos.service.name", "kafka") > .option("kafka.ssl.truststore.location", "/etc/pki/java/cacerts") > .option("kafka.ssl.truststore.password", "changeit") > .start() > {code} > {code:java} > 17/07/18 10:11:58 WARN SslTransportLayer: Failed to send SSL Close message > java.io.IOException: Broken pipe > at sun.nio.ch.FileDispatcherImpl.write0(Native Method) > at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47) > at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93) > at sun.nio.ch.IOUtil.write(IOUtil.java:65) > at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471) > at > org.apache.kafka.common.network.SslTransportLayer.flush(SslTransportLayer.java:195) > at > org.apache.kafka.common.network.SslTransportLayer.close(SslTransportLayer.java:163) > at org.apache.kafka.common.utils.Utils.closeAll(Utils.java:731) > at > org.apache.kafka.common.network.KafkaChannel.close(KafkaChannel.java:54) > at org.apache.kafka.common.network.Selector.doClose(Selector.java:540) > at org.apache.kafka.common.network.Selector.close(Selector.java:531) > at > org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:378) > at org.apache.kafka.common.network.Selector.poll(Selector.java:303) > at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:349) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:226) > at > org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1047) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995) > at > org.apache.spark.sql.kafka010.CachedKafkaConsumer.poll(CachedKafkaConsumer.scala:298) > at > org.apache.spark.sql.kafka010.CachedKafkaConsumer.org$apache$spark$sql$kafka010$CachedKafkaConsumer$$fetchData(CachedKafkaConsumer.scala:206) > at > org.apache.spark.sql.kafka010.CachedKafkaConsumer$$anonfun$get$1.apply(CachedKafkaConsumer.scala:117) > at > org.apache.spark.sql.kafka010.CachedKafkaConsumer$$anonfun$get$1.apply(CachedKafkaConsumer.scala:106) > at > org.apache.spark.util.UninterruptibleThread.runUninterruptibly(UninterruptibleThread.scala:85) > at > org.apache.spark.sql.kafka010.CachedKafkaConsumer.runUninterruptiblyIfPossible(CachedKafkaConsumer.scala:68) > at > org.apache.spark.sql.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:106) >
[jira] [Created] (SPARK-21682) Caching 100k-task RDD GC-kills driver due to updatedBlockStatuses
Ryan Williams created SPARK-21682: - Summary: Caching 100k-task RDD GC-kills driver due to updatedBlockStatuses Key: SPARK-21682 URL: https://issues.apache.org/jira/browse/SPARK-21682 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 2.2.0, 2.1.1 Reporter: Ryan Williams h3. Summary * {{internal.metrics.updatedBlockStatuses}} breaks a contract about what big-O sizes accumulators' values can be: ** they are each of size O(P), where P is the number of partitions in a cached RDD ** ⇒ the driver must process O(P²) data from {{TaskEnd}} events, instead of O(P) ** ⇒ the driver also must process O(P*E) work every 10s from {{ExecutorMetricsUpdates}} (where E is the number of executors; cf. {{spark.executor.heartbeatInterval}}) * when operating on a 100k-partition cached RDD, the driver enters a GC loop due to all the allocations it must do to process {{ExecutorMetricsUpdate}} and {{TaskEnd}} events with {{updatedBlockStatuses}} attached * this metric should be disabled, or some ability to blacklist it from the command-line should be added. * [SPARK-20084|https://issues.apache.org/jira/browse/SPARK-20084] addressed one part of this - the event-log size had exploded - but the root problem still exists / is worse h3. {{count}} a 100k-partition RDD: works fine without {{.cache}} In Spark 2.2.0 or 2.1.1: {code} spark-shell --conf spark.driver.extraJavaOptions="-XX:+PrintGCDetails -XX:+PrintGCTimeStamps -verbose:gc" scala> val rdd = sc.parallelize(1 to 10, 10) scala> rdd.count {code} In YARN and local modes, this finishes in ~20s seconds with ~20 partial GCs logged, all taking under 0.1s ([example output|https://gist.github.com/ryan-williams/a3d02ea49d2b5021b53e1680f8fedc37#file-local-mode-rdd-not-cached-works-fine]); all is well! h3. {{count}} a 100k-partition cached RDD: GC-dies If we {{cache}} the RDD first, the same {{count}} job quickly sends the driver into a GC death spiral: full GC's start after a few thousand tasks and increase in frequency and length until they last minutes / become continuous (and, in YARN, the driver loses contact with any executors). Example outputs: [local|https://gist.github.com/ryan-williams/a3d02ea49d2b5021b53e1680f8fedc37#file-local-mode-rdd-cached-crashes], [YARN|https://gist.github.com/ryan-williams/a3d02ea49d2b5021b53e1680f8fedc37#file-yarn-mode-cached-rdd-dies]. The YARN example removes any confusion about whether the storing of the blocks is causing memory pressure on the driver; the driver is basically doing no work except receiving executor updates and events, and yet it becomes overloaded. h3. Can't effectively throw driver heap at the problem I've tested with 1GB, 10GB, and 20GB heaps, and the larger heaps do what we'd expect: delay the first Full GC, and make Full GCs longer when they happen. I don't have a clear sense on whether the onset is linear or quadratic (i.e. do I get twice as far into the job before the first Full GC with a 20GB as with a 10GB heap, or only sqrt(2) times as far?). h3. Mostly memory pressure, not OOMs An interesting note is that I'm rarely seeing OOMs as a result of this, even on small heaps. I think this is consistent with the idea that all this data is being immediately discarded by the driver, as opposed to kept around to serve web UIs or somesuch. h3. Eliminating {{ExecutorMetricsUpdate}}'s doesn't seem to help Interestingly, setting large values of {{spark.executor.heartbeatInterval}} doesn't seem to mitigate the problem; GC-stall sets in at about the same point in the {{count}} job. This implies that, in this example, the {{TaskEnd}} events are doing most or all of the damage. h3. CMS helps but doesn't solve the problem In some rough testing, I saw the {{count}} get about twice as far before dying when using the CMS collector. h3. What bandwidth do we expect the driver to process events at? IIUC, every 10s the driver gets O(T) (~100k?) block updates from each of ~500 executors, and allocating objects for these updates is pushing it over a tipping point where it can't keep up. I don't know how to get good numbers on how much data the driver is processing; does anyone? There should be monitoring/tests in place to catch a regression where the driver begins writing 1000x the data to the event-log, or having to process 1000x the data over the event bus h3. Should this accumulator be disabled altogether? Seems like yes, to me. Making the driver churn through all this useless data seems unreasonable (short of a major refactoring of the driver to... offload things to threads?). -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail:
[jira] [Created] (SPARK-21681) MLOR do not work correctly when featureStd contains zero
Weichen Xu created SPARK-21681: -- Summary: MLOR do not work correctly when featureStd contains zero Key: SPARK-21681 URL: https://issues.apache.org/jira/browse/SPARK-21681 Project: Spark Issue Type: Bug Components: ML Affects Versions: 2.2.0 Reporter: Weichen Xu MLOR do not work correctly when featureStd contains zero. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-17557) SQL query on parquet table java.lang.UnsupportedOperationException: org.apache.parquet.column.values.dictionary.PlainValuesDictionary
[ https://issues.apache.org/jira/browse/SPARK-17557?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16120181#comment-16120181 ] Steve Drew commented on SPARK-17557: Hi, This issue is still happening (spark 2.1.1). Hopefully this provides better repro-steps. We create a folder daily in an S3 bucket under meta-data/ec where we store event counts at a grain level, to be aggregated up as needed. Somewhere over the past 6 months, the dev made a code change that changed the data type of one of the columns stored there. However, when I try to read one day using a where clause, the error referenced above occurs. spark.read.parquet("s3://my-bucket/meta-data/ec/").*where("td='07-31-2017'")*.groupBy("dataClass","dt","event_type","clientVersion").agg(sum("crow").as("crow")).coalesce(1).write.option("header","true").csv("s3://mya-disco-out/csv/PRR731/") There are two work-arounds that seem to work. #1 - filter by the specific folder if you only need one within your path: spark.read.parquet("s3://my-bucket/meta-data/ec*/td=07-31-2017/*").groupBy("dataClass","dt","event_type","clientVersion").agg(sum("crow").as("crow")).coalesce(1).write.option("header","true").csv("s3://mya-disco-out/csv/PRR731/") #2 - apply a schema manually: val schema = StructType(scala.Array( StructField("folder",StringType,false), StructField("dataClass",StringType,false), StructField("dt",StringType,false), StructField("parentEvent",StringType,false), StructField("eventSuperClass",StringType,false), StructField("event_type",StringType,false), StructField("piifield0", StringType, false), StructField("piifield1",StringType,false), StructField("piifield2",StringType,false), StructField("piifield3",StringType,false), StructField("piifield4",StringType,false), StructField("crow",LongType,false) )) (where crow WAS an "int" for some days and became a "long" "a long" the way.) spark.read*.schema(schema)*.parquet("s3://my-bucket/meta-data/ec/").where("td='07-31-2017'").groupBy("dataClass","dt","event_type","clientVersion").agg(sum("crow").as("crow")).coalesce(1).write.option("header","true").csv("s3://output-path/") Steve > SQL query on parquet table java.lang.UnsupportedOperationException: > org.apache.parquet.column.values.dictionary.PlainValuesDictionary > - > > Key: SPARK-17557 > URL: https://issues.apache.org/jira/browse/SPARK-17557 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.0.0 >Reporter: Egor Pahomov > > Working on 1.6.2, broken on 2.0 > {code} > select * from logs.a where year=2016 and month=9 and day=14 limit 100 > {code} > {code} > java.lang.UnsupportedOperationException: > org.apache.parquet.column.values.dictionary.PlainValuesDictionary$PlainBinaryDictionary > at org.apache.parquet.column.Dictionary.decodeToInt(Dictionary.java:48) > at > org.apache.spark.sql.execution.vectorized.OnHeapColumnVector.getInt(OnHeapColumnVector.java:233) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown > Source) > at > org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) > at > org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:246) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:240) > at > org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:803) > at > org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:803) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70) > at org.apache.spark.scheduler.Task.run(Task.scala:86) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-21680) ML/MLLIB Vector compressed optimization
[ https://issues.apache.org/jira/browse/SPARK-21680?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16120136#comment-16120136 ] Peng Meng commented on SPARK-21680: --- Ok, thanks, I will submit a PR. > ML/MLLIB Vector compressed optimization > --- > > Key: SPARK-21680 > URL: https://issues.apache.org/jira/browse/SPARK-21680 > Project: Spark > Issue Type: Improvement > Components: ML, MLlib >Affects Versions: 2.3.0 >Reporter: Peng Meng > > When use Vector.compressed to change a Vector to SparseVector, the > performance is very low comparing with Vector.toSparse. > This is because you have to scan the value three times using > Vector.compressed, but you just need two times when use Vector.toSparse. > When the length of the vector is large, there is significant performance > difference between this two method. > Code of Vector compressed: > {code:java} > def compressed: Vector = { > val nnz = numNonzeros > // A dense vector needs 8 * size + 8 bytes, while a sparse vector needs > 12 * nnz + 20 bytes. > if (1.5 * (nnz + 1.0) < size) { > toSparse > } else { > toDense > } > } > {code} > I propose to change it to: > {code:java} > // Some comments here > def compressed: Vector = { > val nnz = numNonzeros > // A dense vector needs 8 * size + 8 bytes, while a sparse vector needs > 12 * nnz + 20 bytes. > if (1.5 * (nnz + 1.0) < size) { > val ii = new Array[Int](nnz) > val vv = new Array[Double](nnz) > var k = 0 > foreachActive { (i, v) => > if (v != 0) { > ii(k) = i > vv(k) = v > k += 1 > } > } > new SparseVector(size, ii, vv) > } else { > toDense > } > } > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-21680) ML/MLLIB Vector compressed optimization
[ https://issues.apache.org/jira/browse/SPARK-21680?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16120128#comment-16120128 ] Sean Owen commented on SPARK-21680: --- Yes, the latter should be private and the former calls it too, I suppose. Something like that. > ML/MLLIB Vector compressed optimization > --- > > Key: SPARK-21680 > URL: https://issues.apache.org/jira/browse/SPARK-21680 > Project: Spark > Issue Type: Improvement > Components: ML, MLlib >Affects Versions: 2.3.0 >Reporter: Peng Meng > > When use Vector.compressed to change a Vector to SparseVector, the > performance is very low comparing with Vector.toSparse. > This is because you have to scan the value three times using > Vector.compressed, but you just need two times when use Vector.toSparse. > When the length of the vector is large, there is significant performance > difference between this two method. > Code of Vector compressed: > {code:java} > def compressed: Vector = { > val nnz = numNonzeros > // A dense vector needs 8 * size + 8 bytes, while a sparse vector needs > 12 * nnz + 20 bytes. > if (1.5 * (nnz + 1.0) < size) { > toSparse > } else { > toDense > } > } > {code} > I propose to change it to: > {code:java} > // Some comments here > def compressed: Vector = { > val nnz = numNonzeros > // A dense vector needs 8 * size + 8 bytes, while a sparse vector needs > 12 * nnz + 20 bytes. > if (1.5 * (nnz + 1.0) < size) { > val ii = new Array[Int](nnz) > val vv = new Array[Double](nnz) > var k = 0 > foreachActive { (i, v) => > if (v != 0) { > ii(k) = i > vv(k) = v > k += 1 > } > } > new SparseVector(size, ii, vv) > } else { > toDense > } > } > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-21680) ML/MLLIB Vector compressed optimization
[ https://issues.apache.org/jira/browse/SPARK-21680?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16120115#comment-16120115 ] Peng Meng commented on SPARK-21680: --- Then we will have two toSparse: toSparse and toSparse(size) Do you mean that? Thanks. > ML/MLLIB Vector compressed optimization > --- > > Key: SPARK-21680 > URL: https://issues.apache.org/jira/browse/SPARK-21680 > Project: Spark > Issue Type: Improvement > Components: ML, MLlib >Affects Versions: 2.3.0 >Reporter: Peng Meng > > When use Vector.compressed to change a Vector to SparseVector, the > performance is very low comparing with Vector.toSparse. > This is because you have to scan the value three times using > Vector.compressed, but you just need two times when use Vector.toSparse. > When the length of the vector is large, there is significant performance > difference between this two method. > Code of Vector compressed: > {code:java} > def compressed: Vector = { > val nnz = numNonzeros > // A dense vector needs 8 * size + 8 bytes, while a sparse vector needs > 12 * nnz + 20 bytes. > if (1.5 * (nnz + 1.0) < size) { > toSparse > } else { > toDense > } > } > {code} > I propose to change it to: > {code:java} > // Some comments here > def compressed: Vector = { > val nnz = numNonzeros > // A dense vector needs 8 * size + 8 bytes, while a sparse vector needs > 12 * nnz + 20 bytes. > if (1.5 * (nnz + 1.0) < size) { > val ii = new Array[Int](nnz) > val vv = new Array[Double](nnz) > var k = 0 > foreachActive { (i, v) => > if (v != 0) { > ii(k) = i > vv(k) = v > k += 1 > } > } > new SparseVector(size, ii, vv) > } else { > toDense > } > } > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-21680) ML/MLLIB Vector compressed optimization
[ https://issues.apache.org/jira/browse/SPARK-21680?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16120109#comment-16120109 ] Sean Owen commented on SPARK-21680: --- You definitely want to avoid duplicating the code, but could change toSparse to accept nnz if it's already known. > ML/MLLIB Vector compressed optimization > --- > > Key: SPARK-21680 > URL: https://issues.apache.org/jira/browse/SPARK-21680 > Project: Spark > Issue Type: Improvement > Components: ML, MLlib >Affects Versions: 2.3.0 >Reporter: Peng Meng > > When use Vector.compressed to change a Vector to SparseVector, the > performance is very low comparing with Vector.toSparse. > This is because you have to scan the value three times using > Vector.compressed, but you just need two times when use Vector.toSparse. > When the length of the vector is large, there is significant performance > difference between this two method. > Code of Vector compressed: > {code:java} > def compressed: Vector = { > val nnz = numNonzeros > // A dense vector needs 8 * size + 8 bytes, while a sparse vector needs > 12 * nnz + 20 bytes. > if (1.5 * (nnz + 1.0) < size) { > toSparse > } else { > toDense > } > } > {code} > I propose to change it to: > {code:java} > // Some comments here > def compressed: Vector = { > val nnz = numNonzeros > // A dense vector needs 8 * size + 8 bytes, while a sparse vector needs > 12 * nnz + 20 bytes. > if (1.5 * (nnz + 1.0) < size) { > val ii = new Array[Int](nnz) > val vv = new Array[Double](nnz) > var k = 0 > foreachActive { (i, v) => > if (v != 0) { > ii(k) = i > vv(k) = v > k += 1 > } > } > new SparseVector(size, ii, vv) > } else { > toDense > } > } > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-21504) Add spark version info in table metadata
[ https://issues.apache.org/jira/browse/SPARK-21504?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xiao Li resolved SPARK-21504. - Resolution: Fixed Fix Version/s: 2.3.0 > Add spark version info in table metadata > > > Key: SPARK-21504 > URL: https://issues.apache.org/jira/browse/SPARK-21504 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.1.1, 2.2.0 >Reporter: Xiao Li >Assignee: Xiao Li > Fix For: 2.3.0 > > > Add spark version info in the table metadata. When creating the table, this > value is assigned. It can help users find which version of Spark is used to > create the table. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-21624) Optimize communication cost of RF/GBT/DT
[ https://issues.apache.org/jira/browse/SPARK-21624?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16120105#comment-16120105 ] Peng Meng commented on SPARK-21624: --- Hi [~mlnick], how do you think about this: https://issues.apache.org/jira/browse/SPARK-21680 Thanks. > Optimize communication cost of RF/GBT/DT > > > Key: SPARK-21624 > URL: https://issues.apache.org/jira/browse/SPARK-21624 > Project: Spark > Issue Type: Improvement > Components: ML, MLlib >Affects Versions: 2.3.0 >Reporter: Peng Meng > > {quote}The implementation of RF is bound by either the cost of statistics > computation on workers or by communicating the sufficient statistics.{quote} > The statistics are stored in allStats: > {code:java} > /** >* Flat array of elements. >* Index for start of stats for a (feature, bin) is: >* index = featureOffsets(featureIndex) + binIndex * statsSize >*/ > private var allStats: Array[Double] = new Array[Double](allStatsSize) > {code} > The size of allStats maybe very large, and it can be very sparse, especially > on the nodes that near the leave of the tree. > I have changed allStats from Array to SparseVector, my tests show the > communication is down by about 50%. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-21680) ML/MLLIB Vector compressed optimization
Peng Meng created SPARK-21680: - Summary: ML/MLLIB Vector compressed optimization Key: SPARK-21680 URL: https://issues.apache.org/jira/browse/SPARK-21680 Project: Spark Issue Type: Improvement Components: ML, MLlib Affects Versions: 2.3.0 Reporter: Peng Meng When use Vector.compressed to change a Vector to SparseVector, the performance is very low comparing with Vector.toSparse. This is because you have to scan the value three times using Vector.compressed, but you just need two times when use Vector.toSparse. When the length of the vector is large, there is significant performance difference between this two method. Code of Vector compressed: {code:java} def compressed: Vector = { val nnz = numNonzeros // A dense vector needs 8 * size + 8 bytes, while a sparse vector needs 12 * nnz + 20 bytes. if (1.5 * (nnz + 1.0) < size) { toSparse } else { toDense } } {code} I propose to change it to: {code:java} // Some comments here def compressed: Vector = { val nnz = numNonzeros // A dense vector needs 8 * size + 8 bytes, while a sparse vector needs 12 * nnz + 20 bytes. if (1.5 * (nnz + 1.0) < size) { val ii = new Array[Int](nnz) val vv = new Array[Double](nnz) var k = 0 foreachActive { (i, v) => if (v != 0) { ii(k) = i vv(k) = v k += 1 } } new SparseVector(size, ii, vv) } else { toDense } } {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-21276) Update lz4-java to remove custom LZ4BlockInputStream
[ https://issues.apache.org/jira/browse/SPARK-21276?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sean Owen resolved SPARK-21276. --- Resolution: Fixed Fix Version/s: 2.3.0 Issue resolved by pull request 18883 [https://github.com/apache/spark/pull/18883] > Update lz4-java to remove custom LZ4BlockInputStream > - > > Key: SPARK-21276 > URL: https://issues.apache.org/jira/browse/SPARK-21276 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 2.1.1, 2.2.0 >Reporter: Takeshi Yamamuro >Priority: Trivial > Fix For: 2.3.0 > > > We currently use custom LZ4BlockInputStream to read concatenated byte stream > in shuffle > (https://github.com/apache/spark/blob/master/core/src/main/java/org/apache/spark/io/LZ4BlockInputStream.java#L38). > In the recent pr (https://github.com/lz4/lz4-java/pull/105), this > functionality is implemented even in lz4-java upstream. So, we might update > the lz4-java package that will be released in near future. > Issue about the next lz4-java release > https://github.com/lz4/lz4-java/issues/98 > Diff between the latest release and the master in lz4-java > https://github.com/lz4/lz4-java/compare/62f7547abb0819d1ca1e669645ee1a9d26cd60b0...6480bd9e06f92471bf400c16d4d5f3fd2afa3b3d > * fixed NPE in XXHashFactory similarly > * Don't place resources in default package to support shading > * Fixes ByteBuffer methods failing to apply arrayOffset() for array-backed > * Try to load lz4-java from java.library.path, then fallback to bundled > * Add ppc64le binary > * Add s390x JNI binding > * Add basic LZ4 Frame v1.5.0 support > * enable aarch64 support for lz4-java > * Allow unsafeInstance() for ppc64le archiecture > * Add unsafeInstance support for AArch64 > * Support 64-bit JNI build on Solaris > * Avoid over-allocating a buffer > * Allow EndMark to be incompressible for LZ4FrameInputStream. > * Concat byte stream -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-21276) Update lz4-java to remove custom LZ4BlockInputStream
[ https://issues.apache.org/jira/browse/SPARK-21276?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sean Owen reassigned SPARK-21276: - Assignee: Takeshi Yamamuro > Update lz4-java to remove custom LZ4BlockInputStream > - > > Key: SPARK-21276 > URL: https://issues.apache.org/jira/browse/SPARK-21276 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 2.1.1, 2.2.0 >Reporter: Takeshi Yamamuro >Assignee: Takeshi Yamamuro >Priority: Trivial > Fix For: 2.3.0 > > > We currently use custom LZ4BlockInputStream to read concatenated byte stream > in shuffle > (https://github.com/apache/spark/blob/master/core/src/main/java/org/apache/spark/io/LZ4BlockInputStream.java#L38). > In the recent pr (https://github.com/lz4/lz4-java/pull/105), this > functionality is implemented even in lz4-java upstream. So, we might update > the lz4-java package that will be released in near future. > Issue about the next lz4-java release > https://github.com/lz4/lz4-java/issues/98 > Diff between the latest release and the master in lz4-java > https://github.com/lz4/lz4-java/compare/62f7547abb0819d1ca1e669645ee1a9d26cd60b0...6480bd9e06f92471bf400c16d4d5f3fd2afa3b3d > * fixed NPE in XXHashFactory similarly > * Don't place resources in default package to support shading > * Fixes ByteBuffer methods failing to apply arrayOffset() for array-backed > * Try to load lz4-java from java.library.path, then fallback to bundled > * Add ppc64le binary > * Add s390x JNI binding > * Add basic LZ4 Frame v1.5.0 support > * enable aarch64 support for lz4-java > * Allow unsafeInstance() for ppc64le archiecture > * Add unsafeInstance support for AArch64 > * Support 64-bit JNI build on Solaris > * Avoid over-allocating a buffer > * Allow EndMark to be incompressible for LZ4FrameInputStream. > * Concat byte stream -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-21656) spark dynamic allocation should not idle timeout executors when tasks still to run
[ https://issues.apache.org/jira/browse/SPARK-21656?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sean Owen updated SPARK-21656: -- Target Version/s: (was: 2.1.1) Fix Version/s: (was: 2.1.1) > spark dynamic allocation should not idle timeout executors when tasks still > to run > -- > > Key: SPARK-21656 > URL: https://issues.apache.org/jira/browse/SPARK-21656 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.1.1 >Reporter: Jong Yoon Lee > Original Estimate: 24h > Remaining Estimate: 24h > > Right now spark lets go of executors when they are idle for the 60s (or > configurable time). I have seen spark let them go when they are idle but they > were really needed. I have seen this issue when the scheduler was waiting to > get node locality but that takes longer then the default idle timeout. In > these jobs the number of executors goes down really small (less than 10) but > there are still like 80,000 tasks to run. > We should consider not allowing executors to idle timeout if they are still > needed according to the number of tasks to be run. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-21678) Disabling quotes while writing a dataframe
[ https://issues.apache.org/jira/browse/SPARK-21678?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sean Owen resolved SPARK-21678. --- Resolution: Fixed [~taransaini43] I read this and can tell you this is not what JIRA is for. I'm a committer. Please don't reopen this. http://spark.apache.org/contributing.html > Disabling quotes while writing a dataframe > -- > > Key: SPARK-21678 > URL: https://issues.apache.org/jira/browse/SPARK-21678 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.2.0 >Reporter: Taran Saini > > Hi, > I have the my dataframe cloumn values which can contain commas, double quotes > etc. > I am transforming the dataframes in order to ensure that all the required > values are escaped. > However, on doing df.write.format("csv") > It again wraps the values in double quotes. How do I disable the same? > And even if the double quotes are there to stay why does it do the following : > {noformat} > L"\, p' Y a\, C G > {noformat} > is written as > {noformat} > "L\"\\, p' Y a\\, C G\\, H" > {noformat} > i.e double escapes the next already escaped values. > and if i myself escape like : > {noformat} > L\"\, p' Y a\, C G > {noformat} > then that is written as > {noformat} > "L\\"\\, p' Y a\\, C G\\, H" > {noformat} > How do we just disable this automatic escaping of characters? -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-21678) Disabling quotes while writing a dataframe
[ https://issues.apache.org/jira/browse/SPARK-21678?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sean Owen resolved SPARK-21678. --- Resolution: Invalid > Disabling quotes while writing a dataframe > -- > > Key: SPARK-21678 > URL: https://issues.apache.org/jira/browse/SPARK-21678 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.2.0 >Reporter: Taran Saini > > Hi, > I have the my dataframe cloumn values which can contain commas, double quotes > etc. > I am transforming the dataframes in order to ensure that all the required > values are escaped. > However, on doing df.write.format("csv") > It again wraps the values in double quotes. How do I disable the same? > And even if the double quotes are there to stay why does it do the following : > {noformat} > L"\, p' Y a\, C G > {noformat} > is written as > {noformat} > "L\"\\, p' Y a\\, C G\\, H" > {noformat} > i.e double escapes the next already escaped values. > and if i myself escape like : > {noformat} > L\"\, p' Y a\, C G > {noformat} > then that is written as > {noformat} > "L\\"\\, p' Y a\\, C G\\, H" > {noformat} > How do we just disable this automatic escaping of characters? -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Reopened] (SPARK-21678) Disabling quotes while writing a dataframe
[ https://issues.apache.org/jira/browse/SPARK-21678?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sean Owen reopened SPARK-21678: --- > Disabling quotes while writing a dataframe > -- > > Key: SPARK-21678 > URL: https://issues.apache.org/jira/browse/SPARK-21678 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.2.0 >Reporter: Taran Saini > > Hi, > I have the my dataframe cloumn values which can contain commas, double quotes > etc. > I am transforming the dataframes in order to ensure that all the required > values are escaped. > However, on doing df.write.format("csv") > It again wraps the values in double quotes. How do I disable the same? > And even if the double quotes are there to stay why does it do the following : > {noformat} > L"\, p' Y a\, C G > {noformat} > is written as > {noformat} > "L\"\\, p' Y a\\, C G\\, H" > {noformat} > i.e double escapes the next already escaped values. > and if i myself escape like : > {noformat} > L\"\, p' Y a\, C G > {noformat} > then that is written as > {noformat} > "L\\"\\, p' Y a\\, C G\\, H" > {noformat} > How do we just disable this automatic escaping of characters? -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Closed] (SPARK-21678) Disabling quotes while writing a dataframe
[ https://issues.apache.org/jira/browse/SPARK-21678?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sean Owen closed SPARK-21678. - > Disabling quotes while writing a dataframe > -- > > Key: SPARK-21678 > URL: https://issues.apache.org/jira/browse/SPARK-21678 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.2.0 >Reporter: Taran Saini > > Hi, > I have the my dataframe cloumn values which can contain commas, double quotes > etc. > I am transforming the dataframes in order to ensure that all the required > values are escaped. > However, on doing df.write.format("csv") > It again wraps the values in double quotes. How do I disable the same? > And even if the double quotes are there to stay why does it do the following : > {noformat} > L"\, p' Y a\, C G > {noformat} > is written as > {noformat} > "L\"\\, p' Y a\\, C G\\, H" > {noformat} > i.e double escapes the next already escaped values. > and if i myself escape like : > {noformat} > L\"\, p' Y a\, C G > {noformat} > then that is written as > {noformat} > "L\\"\\, p' Y a\\, C G\\, H" > {noformat} > How do we just disable this automatic escaping of characters? -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-21678) Disabling quotes while writing a dataframe
[ https://issues.apache.org/jira/browse/SPARK-21678?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16120037#comment-16120037 ] Takeshi Yamamuro commented on SPARK-21678: -- I think, if spark sets `setCharToEscapeQuoteEscaping("\0")` in CsvWriterSettings below, the output is a thing like what you want. But, I'm not sure that we should add these entries option-by-option there. cc: [~hyukjin.kwon] https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala#L151 > Disabling quotes while writing a dataframe > -- > > Key: SPARK-21678 > URL: https://issues.apache.org/jira/browse/SPARK-21678 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.2.0 >Reporter: Taran Saini > > Hi, > I have the my dataframe cloumn values which can contain commas, double quotes > etc. > I am transforming the dataframes in order to ensure that all the required > values are escaped. > However, on doing df.write.format("csv") > It again wraps the values in double quotes. How do I disable the same? > And even if the double quotes are there to stay why does it do the following : > {noformat} > L"\, p' Y a\, C G > {noformat} > is written as > {noformat} > "L\"\\, p' Y a\\, C G\\, H" > {noformat} > i.e double escapes the next already escaped values. > and if i myself escape like : > {noformat} > L\"\, p' Y a\, C G > {noformat} > then that is written as > {noformat} > "L\\"\\, p' Y a\\, C G\\, H" > {noformat} > How do we just disable this automatic escaping of characters? -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-21679) KMeans Clustering is Not Deterministic
[ https://issues.apache.org/jira/browse/SPARK-21679?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Christoph Brücke updated SPARK-21679: - Description: I’m trying to figure out how to use KMeans in order to achieve reproducible results. I have found that running the same kmeans instance on the same data, with different partitioning will produce different clusterings. Given a simple KMeans run with fixed seed returns different results on the same training data, if the training data is partitioned differently. Consider the following example. The same KMeans clustering set up is run on identical data. The only difference is the partitioning of the training data (one partition vs. four partitions). {noformat} import org.apache.spark.sql.DataFrame import org.apache.spark.ml.clustering.KMeans import org.apache.spark.ml.features.VectorAssembler // generate random data for clustering val randomData = spark.range(1, 1000).withColumn("a", rand(123)).withColumn("b", rand(321)) val vecAssembler = new VectorAssembler().setInputCols(Array("a", "b")).setOutputCol("features") val data = vecAssembler.transform(randomData) // instantiate KMeans with fixed seed val kmeans = new KMeans().setK(10).setSeed(9876L) // train the model with different partitioning val dataWith1Partition = data.repartition(1) println("1 Partition: " + kmeans.fit(dataWith1Partition).computeCost(dataWith1Partition)) val dataWith4Partition = data.repartition(4) println("4 Partition: " + kmeans.fit(dataWith4Partition).computeCost(dataWith4Partition)) {noformat} I get the following related cost {noformat} 1 Partition: 16.028212597888057 4 Partition: 16.14758460544976 {noformat} What I want to achieve is that repeated computations of the KMeans Clustering should yield identical result on identical training data, regardless of the partitioning. Looking through the Spark source code, I guess the cause is the initialization method of KMeans which in turn uses the `takeSample` method, which does not seem to be partition agnostic. Is this behaviour expected? Is there anything I could do to achieve reproducible results? was: I’m trying to figure out how to use KMeans in order to achieve reproducible results. I have found that running the same kmeans instance on the same data, with different partitioning will produce different clusterings. Given a simple KMeans run with fixed seed returns different results on the same training data, if the training data is partitioned differently. Consider the following example. The same KMeans clustering set up is run on identical data. The only difference is the partitioning of the training data (one partition vs. four partitions). {noformat} import org.apache.spark.sql.DataFrame import org.apache.spark.ml.clustering.KMeans import org.apache.spark.ml.features.VectorAssembler // generate random data for clustering val randomData = spark.range(1, 1000).withColumn("a", rand(123)).withColumn("b", rand(321)) val vecAssembler = new VectorAssembler().setInputCols(Array("a", "b")).setOutputCol("features") val data = vecAssembler.transform(randomData) // instantiate KMeans with fixed seed val kmeans = new KMeans().setK(10).setSeed(9876L) // train the model with different partitioning val dataWith1Partition = data.repartition(1) println("1 Partition: " + kmeans.fit(dataWith1Partition).computeCost(dataWith1Partition)) val dataWith4Partition = data.repartition(4) println("4 Partition: " + kmeans.fit(dataWith4Partition).computeCost(dataWith4Partition)) {noformat} I get the following related cost {{1 Partition: 16.028212597888057 4 Partition: 16.14758460544976}} What I want to achieve is that repeated computations of the KMeans Clustering should yield identical result on identical training data, regardless of the partitioning. Looking through the Spark source code, I guess the cause is the initialization method of KMeans which in turn uses the `takeSample` method, which does not seem to be partition agnostic. Is this behaviour expected? Is there anything I could do to achieve reproducible results? > KMeans Clustering is Not Deterministic > -- > > Key: SPARK-21679 > URL: https://issues.apache.org/jira/browse/SPARK-21679 > Project: Spark > Issue Type: Bug > Components: ML >Affects Versions: 2.1.0, 2.2.0 >Reporter: Christoph Brücke > > I’m trying to figure out how to use KMeans in order to achieve reproducible > results. I have found that running the same kmeans instance on the same data, > with different partitioning will produce different clusterings. > Given a simple KMeans run with fixed seed returns different results on the > same > training data, if the training data is partitioned differently. > Consider the following example. The same KMeans clustering set up is run on > identical data. The
[jira] [Updated] (SPARK-21679) KMeans Clustering is Not Deterministic
[ https://issues.apache.org/jira/browse/SPARK-21679?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Christoph Brücke updated SPARK-21679: - Description: I’m trying to figure out how to use KMeans in order to achieve reproducible results. I have found that running the same kmeans instance on the same data, with different partitioning will produce different clusterings. Given a simple KMeans run with fixed seed returns different results on the same training data, if the training data is partitioned differently. Consider the following example. The same KMeans clustering set up is run on identical data. The only difference is the partitioning of the training data (one partition vs. four partitions). {noformat} import org.apache.spark.sql.DataFrame import org.apache.spark.ml.clustering.KMeans import org.apache.spark.ml.features.VectorAssembler // generate random data for clustering val randomData = spark.range(1, 1000).withColumn("a", rand(123)).withColumn("b", rand(321)) val vecAssembler = new VectorAssembler().setInputCols(Array("a", "b")).setOutputCol("features") val data = vecAssembler.transform(randomData) // instantiate KMeans with fixed seed val kmeans = new KMeans().setK(10).setSeed(9876L) // train the model with different partitioning val dataWith1Partition = data.repartition(1) println("1 Partition: " + kmeans.fit(dataWith1Partition).computeCost(dataWith1Partition)) val dataWith4Partition = data.repartition(4) println("4 Partition: " + kmeans.fit(dataWith4Partition).computeCost(dataWith4Partition)) {noformat} I get the following related cost {{1 Partition: 16.028212597888057 4 Partition: 16.14758460544976}} What I want to achieve is that repeated computations of the KMeans Clustering should yield identical result on identical training data, regardless of the partitioning. Looking through the Spark source code, I guess the cause is the initialization method of KMeans which in turn uses the `takeSample` method, which does not seem to be partition agnostic. Is this behaviour expected? Is there anything I could do to achieve reproducible results? was: I’m trying to figure out how to use KMeans in order to achieve reproducible results. I have found that running the same kmeans instance on the same data, with different partitioning will produce different clusterings. Given a simple KMeans run with fixed seed returns different results on the same training data, if the training data is partitioned differently. Consider the following example. The same KMeans clustering set up is run on identical data. The only difference is the partitioning of the training data (one partition vs. four partitions). {{import org.apache.spark.sql.DataFrame import org.apache.spark.ml.clustering.KMeans import org.apache.spark.ml.features.VectorAssembler // generate random data for clustering val randomData = spark.range(1, 1000).withColumn("a", rand(123)).withColumn("b", rand(321)) val vecAssembler = new VectorAssembler().setInputCols(Array("a", "b")).setOutputCol("features") val data = vecAssembler.transform(randomData) // instantiate KMeans with fixed seed val kmeans = new KMeans().setK(10).setSeed(9876L) // train the model with different partitioning val dataWith1Partition = data.repartition(1) println("1 Partition: " + kmeans.fit(dataWith1Partition).computeCost(dataWith1Partition)) val dataWith4Partition = data.repartition(4) println("4 Partition: " + kmeans.fit(dataWith4Partition).computeCost(dataWith4Partition))}} I get the following related cost {{1 Partition: 16.028212597888057 4 Partition: 16.14758460544976}} What I want to achieve is that repeated computations of the KMeans Clustering should yield identical result on identical training data, regardless of the partitioning. Looking through the Spark source code, I guess the cause is the initialization method of KMeans which in turn uses the `takeSample` method, which does not seem to be partition agnostic. Is this behaviour expected? Is there anything I could do to achieve reproducible results? > KMeans Clustering is Not Deterministic > -- > > Key: SPARK-21679 > URL: https://issues.apache.org/jira/browse/SPARK-21679 > Project: Spark > Issue Type: Bug > Components: ML >Affects Versions: 2.1.0, 2.2.0 >Reporter: Christoph Brücke > > I’m trying to figure out how to use KMeans in order to achieve reproducible > results. I have found that running the same kmeans instance on the same data, > with different partitioning will produce different clusterings. > Given a simple KMeans run with fixed seed returns different results on the > same > training data, if the training data is partitioned differently. > Consider the following example. The same KMeans clustering set up is run on > identical data. The only difference is the partitioning of
[jira] [Updated] (SPARK-21679) KMeans Clustering is Not Deterministic
[ https://issues.apache.org/jira/browse/SPARK-21679?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Christoph Brücke updated SPARK-21679: - Description: I’m trying to figure out how to use KMeans in order to achieve reproducible results. I have found that running the same kmeans instance on the same data, with different partitioning will produce different clusterings. Given a simple KMeans run with fixed seed returns different results on the same training data, if the training data is partitioned differently. Consider the following example. The same KMeans clustering set up is run on identical data. The only difference is the partitioning of the training data (one partition vs. four partitions). {{import org.apache.spark.sql.DataFrame import org.apache.spark.ml.clustering.KMeans import org.apache.spark.ml.features.VectorAssembler // generate random data for clustering val randomData = spark.range(1, 1000).withColumn("a", rand(123)).withColumn("b", rand(321)) val vecAssembler = new VectorAssembler().setInputCols(Array("a", "b")).setOutputCol("features") val data = vecAssembler.transform(randomData) // instantiate KMeans with fixed seed val kmeans = new KMeans().setK(10).setSeed(9876L) // train the model with different partitioning val dataWith1Partition = data.repartition(1) println("1 Partition: " + kmeans.fit(dataWith1Partition).computeCost(dataWith1Partition)) val dataWith4Partition = data.repartition(4) println("4 Partition: " + kmeans.fit(dataWith4Partition).computeCost(dataWith4Partition))}} I get the following related cost {{1 Partition: 16.028212597888057 4 Partition: 16.14758460544976}} What I want to achieve is that repeated computations of the KMeans Clustering should yield identical result on identical training data, regardless of the partitioning. Looking through the Spark source code, I guess the cause is the initialization method of KMeans which in turn uses the `takeSample` method, which does not seem to be partition agnostic. Is this behaviour expected? Is there anything I could do to achieve reproducible results? was: I’m trying to figure out how to use KMeans in order to achieve reproducible results. I have found that running the same kmeans instance on the same data, with different partitioning will produce different clusterings. Given a simple KMeans run with fixed seed returns different results on the same training data, if the training data is partitioned differently. Consider the following example. The same KMeans clustering set up is run on identical data. The only difference is the partitioning of the training data (one partition vs. four partitions). {{ import org.apache.spark.sql.DataFrame import org.apache.spark.ml.clustering.KMeans import org.apache.spark.ml.features.VectorAssembler // generate random data for clustering val randomData = spark.range(1, 1000).withColumn("a", rand(123)).withColumn("b", rand(321)) val vecAssembler = new VectorAssembler().setInputCols(Array("a", "b")).setOutputCol("features") val data = vecAssembler.transform(randomData) // instantiate KMeans with fixed seed val kmeans = new KMeans().setK(10).setSeed(9876L) // train the model with different partitioning val dataWith1Partition = data.repartition(1) println("1 Partition: " + kmeans.fit(dataWith1Partition).computeCost(dataWith1Partition)) val dataWith4Partition = data.repartition(4) println("4 Partition: " + kmeans.fit(dataWith4Partition).computeCost(dataWith4Partition)) }} I get the following related cost {{ 1 Partition: 16.028212597888057 4 Partition: 16.14758460544976 }} What I want to achieve is that repeated computations of the KMeans Clustering should yield identical result on identical training data, regardless of the partitioning. Looking through the Spark source code, I guess the cause is the initialization method of KMeans which in turn uses the `takeSample` method, which does not seem to be partition agnostic. Is this behaviour expected? Is there anything I could do to achieve reproducible results? > KMeans Clustering is Not Deterministic > -- > > Key: SPARK-21679 > URL: https://issues.apache.org/jira/browse/SPARK-21679 > Project: Spark > Issue Type: Bug > Components: ML >Affects Versions: 2.1.0, 2.2.0 >Reporter: Christoph Brücke > > I’m trying to figure out how to use KMeans in order to achieve reproducible > results. I have found that running the same kmeans instance on the same data, > with different partitioning will produce different clusterings. > Given a simple KMeans run with fixed seed returns different results on the > same > training data, if the training data is partitioned differently. > Consider the following example. The same KMeans clustering set up is run on > identical data. The only difference is the partitioning of the training
[jira] [Created] (SPARK-21679) KMeans Clustering is Not Deterministic
Christoph Brücke created SPARK-21679: Summary: KMeans Clustering is Not Deterministic Key: SPARK-21679 URL: https://issues.apache.org/jira/browse/SPARK-21679 Project: Spark Issue Type: Bug Components: ML Affects Versions: 2.2.0, 2.1.0 Reporter: Christoph Brücke I’m trying to figure out how to use KMeans in order to achieve reproducible results. I have found that running the same kmeans instance on the same data, with different partitioning will produce different clusterings. Given a simple KMeans run with fixed seed returns different results on the same training data, if the training data is partitioned differently. Consider the following example. The same KMeans clustering set up is run on identical data. The only difference is the partitioning of the training data (one partition vs. four partitions). {{ import org.apache.spark.sql.DataFrame import org.apache.spark.ml.clustering.KMeans import org.apache.spark.ml.features.VectorAssembler // generate random data for clustering val randomData = spark.range(1, 1000).withColumn("a", rand(123)).withColumn("b", rand(321)) val vecAssembler = new VectorAssembler().setInputCols(Array("a", "b")).setOutputCol("features") val data = vecAssembler.transform(randomData) // instantiate KMeans with fixed seed val kmeans = new KMeans().setK(10).setSeed(9876L) // train the model with different partitioning val dataWith1Partition = data.repartition(1) println("1 Partition: " + kmeans.fit(dataWith1Partition).computeCost(dataWith1Partition)) val dataWith4Partition = data.repartition(4) println("4 Partition: " + kmeans.fit(dataWith4Partition).computeCost(dataWith4Partition)) }} I get the following related cost {{ 1 Partition: 16.028212597888057 4 Partition: 16.14758460544976 }} What I want to achieve is that repeated computations of the KMeans Clustering should yield identical result on identical training data, regardless of the partitioning. Looking through the Spark source code, I guess the cause is the initialization method of KMeans which in turn uses the `takeSample` method, which does not seem to be partition agnostic. Is this behaviour expected? Is there anything I could do to achieve reproducible results? -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-21453) Cached Kafka consumer may be closed too early
[ https://issues.apache.org/jira/browse/SPARK-21453?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16119988#comment-16119988 ] Pablo Panero commented on SPARK-21453: -- [~zsxwing] Concerning the cached consumer failure all I could get is: {code} 17/08/09 03:55:49 WARN Client: Exception encountered while connecting to the server : org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.ipc.StandbyException): Operation category READ is not supported in state standby. Visit https://s.apache.org/sbnn-error 17/08/09 03:55:49 WARN Client: Exception encountered while connecting to the server : org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.ipc.StandbyException): Operation category READ is not supported in state standby. Visit https://s.apache.org/sbnn-error 17/08/09 03:55:49 WARN Client: Exception encountered while connecting to the server : org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.ipc.StandbyException): Operation category READ is not supported in state standby. Visit https://s.apache.org/sbnn-error 17/08/09 07:36:19 ERROR Executor: Exception in task 0.0 in stage 18285.0 (TID 54855) org.apache.kafka.common.errors.NetworkException: The server disconnected before a response was received. 17/08/09 07:36:19 ERROR Executor: Exception in task 0.0 in stage 18285.0 (TID 54855) org.apache.kafka.common.errors.NetworkException: The server disconnected before a response was received. 17/08/09 07:36:19 ERROR Executor: Exception in task 0.0 in stage 18285.0 (TID 54855) org.apache.kafka.common.errors.NetworkException: The server disconnected before a response was received. 17/08/09 07:38:27 ERROR Executor: Exception in task 0.1 in stage 18285.0 (TID 54858) java.util.concurrent.TimeoutException: Cannot fetch record for offset 7591351 in 12 milliseconds at org.apache.spark.sql.kafka010.CachedKafkaConsumer.org$apache$spark$sql$kafka010$CachedKafkaConsumer$$fetchData(CachedKafkaConsumer.scala:219) at org.apache.spark.sql.kafka010.CachedKafkaConsumer$$anonfun$get$1.apply(CachedKafkaConsumer.scala:117) at org.apache.spark.sql.kafka010.CachedKafkaConsumer$$anonfun$get$1.apply(CachedKafkaConsumer.scala:106) at org.apache.spark.util.UninterruptibleThread.runUninterruptibly(UninterruptibleThread.scala:85) at org.apache.spark.sql.kafka010.CachedKafkaConsumer.runUninterruptiblyIfPossible(CachedKafkaConsumer.scala:68) at org.apache.spark.sql.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:106) at org.apache.spark.sql.kafka010.KafkaSourceRDD$$anon$1.getNext(KafkaSourceRDD.scala:157) at org.apache.spark.sql.kafka010.KafkaSourceRDD$$anon$1.getNext(KafkaSourceRDD.scala:148) at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395) at org.apache.spark.sql.kafka010.KafkaWriteTask.execute(KafkaWriteTask.scala:47) at org.apache.spark.sql.kafka010.KafkaWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$1.apply$mcV$sp(KafkaWriter.scala:91) at org.apache.spark.sql.kafka010.KafkaWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$1.apply(KafkaWriter.scala:91) at org.apache.spark.sql.kafka010.KafkaWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$1.apply(KafkaWriter.scala:91) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1337) at org.apache.spark.sql.kafka010.KafkaWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1.apply(KafkaWriter.scala:91) at org.apache.spark.sql.kafka010.KafkaWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1.apply(KafkaWriter.scala:89) at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:926) at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:926) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2062) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2062) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:108) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at
[jira] [Commented] (SPARK-21677) json_tuple throws NullPointException when column is null as string type.
[ https://issues.apache.org/jira/browse/SPARK-21677?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16119928#comment-16119928 ] Liang-Chi Hsieh commented on SPARK-21677: - [~hyukjin.kwon] Thanks! Definitely we are interested in this. We will work on this. > json_tuple throws NullPointException when column is null as string type. > > > Key: SPARK-21677 > URL: https://issues.apache.org/jira/browse/SPARK-21677 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.3.0 >Reporter: Hyukjin Kwon >Priority: Minor > Labels: Starter > > I was testing {{json_tuple}} before using this to extract values from JSONs > in my testing cluster but I found it could actually throw > {{NullPointException}} as below sometimes: > {code} > scala> Seq(("""{"Hyukjin": 224, "John": > 1225}""")).toDS.selectExpr("json_tuple(value, trim(' Hyukjin'))").show() > +---+ > | c0| > +---+ > |224| > +---+ > scala> Seq(("""{"Hyukjin": 224, "John": > 1225}""")).toDS.selectExpr("json_tuple(value, trim(' Jackson'))").show() > ++ > | c0| > ++ > |null| > ++ > scala> Seq(("""{"Hyukjin": 224, "John": > 1225}""")).toDS.selectExpr("json_tuple(value, trim(null))").show() > ... > java.lang.NullPointerException > at > org.apache.spark.sql.catalyst.expressions.JsonTuple$$anonfun$foldableFieldNames$1.apply(jsonExpressions.scala:367) > at > org.apache.spark.sql.catalyst.expressions.JsonTuple$$anonfun$foldableFieldNames$1.apply(jsonExpressions.scala:366) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) > at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) > at scala.collection.AbstractTraversable.map(Traversable.scala:104) > at > org.apache.spark.sql.catalyst.expressions.JsonTuple.foldableFieldNames$lzycompute(jsonExpressions.scala:366) > at > org.apache.spark.sql.catalyst.expressions.JsonTuple.foldableFieldNames(jsonExpressions.scala:365) > at > org.apache.spark.sql.catalyst.expressions.JsonTuple.constantFields$lzycompute(jsonExpressions.scala:373) > at > org.apache.spark.sql.catalyst.expressions.JsonTuple.constantFields(jsonExpressions.scala:373) > at > org.apache.spark.sql.catalyst.expressions.JsonTuple.org$apache$spark$sql$catalyst$expressions$JsonTuple$$parseRow(jsonExpressions.scala:417) > at > org.apache.spark.sql.catalyst.expressions.JsonTuple$$anonfun$eval$4.apply(jsonExpressions.scala:401) > at > org.apache.spark.sql.catalyst.expressions.JsonTuple$$anonfun$eval$4.apply(jsonExpressions.scala:400) > at org.apache.spark.util.Utils$.tryWithResource(Utils.scala:2559) > at > org.apache.spark.sql.catalyst.expressions.JsonTuple.eval(jsonExpressions.scala:400) > {code} > It sounds we should show explicit error messages or return {{NULL}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Reopened] (SPARK-21678) Disabling quotes while writing a dataframe
[ https://issues.apache.org/jira/browse/SPARK-21678?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Taran Saini reopened SPARK-21678: - > Disabling quotes while writing a dataframe > -- > > Key: SPARK-21678 > URL: https://issues.apache.org/jira/browse/SPARK-21678 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.2.0 >Reporter: Taran Saini > > Hi, > I have the my dataframe cloumn values which can contain commas, double quotes > etc. > I am transforming the dataframes in order to ensure that all the required > values are escaped. > However, on doing df.write.format("csv") > It again wraps the values in double quotes. How do I disable the same? > And even if the double quotes are there to stay why does it do the following : > {noformat} > L"\, p' Y a\, C G > {noformat} > is written as > {noformat} > "L\"\\, p' Y a\\, C G\\, H" > {noformat} > i.e double escapes the next already escaped values. > and if i myself escape like : > {noformat} > L\"\, p' Y a\, C G > {noformat} > then that is written as > {noformat} > "L\\"\\, p' Y a\\, C G\\, H" > {noformat} > How do we just disable this automatic escaping of characters? -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-21678) Disabling quotes while writing a dataframe
[ https://issues.apache.org/jira/browse/SPARK-21678?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16119898#comment-16119898 ] Taran Saini edited comment on SPARK-21678 at 8/9/17 1:30 PM: - this is not a question. This is a bug! Only if somebody reads this and lets me know whether it is a bug or a question. was (Author: taransaini43): this is not a question. This is a bug! Only if somebody reads this and let me know whether it is a bug or a question. > Disabling quotes while writing a dataframe > -- > > Key: SPARK-21678 > URL: https://issues.apache.org/jira/browse/SPARK-21678 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.2.0 >Reporter: Taran Saini > > Hi, > I have the my dataframe cloumn values which can contain commas, double quotes > etc. > I am transforming the dataframes in order to ensure that all the required > values are escaped. > However, on doing df.write.format("csv") > It again wraps the values in double quotes. How do I disable the same? > And even if the double quotes are there to stay why does it do the following : > {noformat} > L"\, p' Y a\, C G > {noformat} > is written as > {noformat} > "L\"\\, p' Y a\\, C G\\, H" > {noformat} > i.e double escapes the next already escaped values. > and if i myself escape like : > {noformat} > L\"\, p' Y a\, C G > {noformat} > then that is written as > {noformat} > "L\\"\\, p' Y a\\, C G\\, H" > {noformat} > How do we just disable this automatic escaping of characters? -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-21678) Disabling quotes while writing a dataframe
[ https://issues.apache.org/jira/browse/SPARK-21678?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16119898#comment-16119898 ] Taran Saini commented on SPARK-21678: - this is not a question. This is a bug! Only if somebody reads this and let me know whether it is a bug or a question. > Disabling quotes while writing a dataframe > -- > > Key: SPARK-21678 > URL: https://issues.apache.org/jira/browse/SPARK-21678 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.2.0 >Reporter: Taran Saini > > Hi, > I have the my dataframe cloumn values which can contain commas, double quotes > etc. > I am transforming the dataframes in order to ensure that all the required > values are escaped. > However, on doing df.write.format("csv") > It again wraps the values in double quotes. How do I disable the same? > And even if the double quotes are there to stay why does it do the following : > {noformat} > L"\, p' Y a\, C G > {noformat} > is written as > {noformat} > "L\"\\, p' Y a\\, C G\\, H" > {noformat} > i.e double escapes the next already escaped values. > and if i myself escape like : > {noformat} > L\"\, p' Y a\, C G > {noformat} > then that is written as > {noformat} > "L\\"\\, p' Y a\\, C G\\, H" > {noformat} > How do we just disable this automatic escaping of characters? -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-21678) Disabling quotes while writing a dataframe
[ https://issues.apache.org/jira/browse/SPARK-21678?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Taran Saini updated SPARK-21678: Description: Hi, I have the my dataframe cloumn values which can contain commas, double quotes etc. I am transforming the dataframes in order to ensure that all the required values are escaped. However, on doing df.write.format("csv") It again wraps the values in double quotes. How do I disable the same? And even if the double quotes are there to stay why does it do the following : {noformat} L"\, p' Y a\, C G {noformat} is written as {noformat} "L\"\\, p' Y a\\, C G\\, H" {noformat} i.e double escapes the next already escaped values. and if i myself escape like : {noformat} L\"\, p' Y a\, C G {noformat} then that is written as {noformat} "L\\"\\, p' Y a\\, C G\\, H" {noformat} How do we just disable this automatic escaping of characters? was: Hi, I have the my dataframe cloumn values which can contain commas, double quotes etc. I am transforming the dataframes in order to ensure that all the required values are escaped. However, on doing df.write.format("csv") It again wraps the values in double quotes. How do I disable the same? And even if the double quotes are there to stay why does it do the following : {quote} L"\, p' Y a\, C G {quote} is written as {quote} "L\"\\, p' Y a\\, C G\\, H" {quote} i.e double escapes the next already escaped values. and if i myself escape like : {quote} L\"\, p' Y a\, C G {quote} then that is written as {quote} "L\\"\\, p' Y a\\, C G\\, H" {quote} How do we just disable this automatic escaping of characters? > Disabling quotes while writing a dataframe > -- > > Key: SPARK-21678 > URL: https://issues.apache.org/jira/browse/SPARK-21678 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.2.0 >Reporter: Taran Saini > > Hi, > I have the my dataframe cloumn values which can contain commas, double quotes > etc. > I am transforming the dataframes in order to ensure that all the required > values are escaped. > However, on doing df.write.format("csv") > It again wraps the values in double quotes. How do I disable the same? > And even if the double quotes are there to stay why does it do the following : > {noformat} > L"\, p' Y a\, C G > {noformat} > is written as > {noformat} > "L\"\\, p' Y a\\, C G\\, H" > {noformat} > i.e double escapes the next already escaped values. > and if i myself escape like : > {noformat} > L\"\, p' Y a\, C G > {noformat} > then that is written as > {noformat} > "L\\"\\, p' Y a\\, C G\\, H" > {noformat} > How do we just disable this automatic escaping of characters? -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-21678) Disabling quotes while writing a dataframe
[ https://issues.apache.org/jira/browse/SPARK-21678?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sean Owen resolved SPARK-21678. --- Resolution: Invalid Questions belong on the mailing list or SO. > Disabling quotes while writing a dataframe > -- > > Key: SPARK-21678 > URL: https://issues.apache.org/jira/browse/SPARK-21678 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.2.0 >Reporter: Taran Saini > > Hi, > I have the my dataframe cloumn values which can contain commas, double quotes > etc. > I am transforming the dataframes in order to ensure that all the required > values are escaped. > However, on doing df.write.format("csv") > It again wraps the values in double quotes. How do I disable the same? > And even if the double quotes are there to stay why does it do the following : > L"\, p' Y a\, C G is written as "L\"\\, p' Y a\\, C G\\, H" i.e double > escapes the next already escaped values. > and if i myself escape like : > L\"\, p' Y a\, C G then that is written as "L\\"\\, p' Y a\\, C G\\, H" > How do we just disable this automatic escaping of characters? -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-21678) Disabling quotes while writing a dataframe
[ https://issues.apache.org/jira/browse/SPARK-21678?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Taran Saini updated SPARK-21678: Description: Hi, I have the my dataframe cloumn values which can contain commas, double quotes etc. I am transforming the dataframes in order to ensure that all the required values are escaped. However, on doing df.write.format("csv") It again wraps the values in double quotes. How do I disable the same? And even if the double quotes are there to stay why does it do the following : {quote} L"\, p' Y a\, C G {quote} is written as {quote} "L\"\\, p' Y a\\, C G\\, H" {quote} i.e double escapes the next already escaped values. and if i myself escape like : {quote} L\"\, p' Y a\, C G {quote} then that is written as {quote} "L\\"\\, p' Y a\\, C G\\, H" {quote} How do we just disable this automatic escaping of characters? was: Hi, I have the my dataframe cloumn values which can contain commas, double quotes etc. I am transforming the dataframes in order to ensure that all the required values are escaped. However, on doing df.write.format("csv") It again wraps the values in double quotes. How do I disable the same? And even if the double quotes are there to stay why does it do the following : L"\, p' Y a\, C G is written as "L\"\\, p' Y a\\, C G\\, H" i.e double escapes the next already escaped values. and if i myself escape like : L\"\, p' Y a\, C G then that is written as "L\\"\\, p' Y a\\, C G\\, H" How do we just disable this automatic escaping of characters? > Disabling quotes while writing a dataframe > -- > > Key: SPARK-21678 > URL: https://issues.apache.org/jira/browse/SPARK-21678 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.2.0 >Reporter: Taran Saini > > Hi, > I have the my dataframe cloumn values which can contain commas, double quotes > etc. > I am transforming the dataframes in order to ensure that all the required > values are escaped. > However, on doing df.write.format("csv") > It again wraps the values in double quotes. How do I disable the same? > And even if the double quotes are there to stay why does it do the following : > {quote} > L"\, p' Y a\, C G > {quote} > is written as > {quote} > "L\"\\, p' Y a\\, C G\\, H" > {quote} > i.e double escapes the next already escaped values. > and if i myself escape like : > {quote} > L\"\, p' Y a\, C G > {quote} > then that is written as > {quote} > "L\\"\\, p' Y a\\, C G\\, H" > {quote} > How do we just disable this automatic escaping of characters? -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-21678) Disabling quotes while writing a dataframe
[ https://issues.apache.org/jira/browse/SPARK-21678?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Taran Saini updated SPARK-21678: Description: Hi, I have the my dataframe cloumn values which can contain commas, double quotes etc. I am transforming the dataframes in order to ensure that all the required values are escaped. However, on doing df.write.format("csv") It again wraps the values in double quotes. How do I disable the same? And even if the double quotes are there to stay why does it do the following : L"\, p' Y a\, C G is written as "L\"\\, p' Y a\\, C G\\, H" i.e double escapes the next already escaped values. and if i myself escape like : L\"\, p' Y a\, C G then that is written as "L\\"\\, p' Y a\\, C G\\, H" How do we just disable this automatic escaping of characters? was: Hi, I have the my dataframe cloumn values which can contain commas, double quotes etc. I am transforming the dataframes in order to ensure that all the required values are escaped. However, on doing df.write.format("csv") It again wraps the values in double quotes. How do I disable the same? And even if the double quotes are there to stay why does it do the following : L"\, p' Y a\, C G is written as "L\"\\, p' Y a\\, C G\\, H" i.e double escapes the next already escaped values. and if i myself escape like : L\"\, p' Y a\, C G then that is written as "L\\"\\, p' Y a\\, C G\\, H" How do we just disable this automatic escaping of characters? > Disabling quotes while writing a dataframe > -- > > Key: SPARK-21678 > URL: https://issues.apache.org/jira/browse/SPARK-21678 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.2.0 >Reporter: Taran Saini > > Hi, > I have the my dataframe cloumn values which can contain commas, double quotes > etc. > I am transforming the dataframes in order to ensure that all the required > values are escaped. > However, on doing df.write.format("csv") > It again wraps the values in double quotes. How do I disable the same? > And even if the double quotes are there to stay why does it do the following : > L"\, p' Y a\, C G is written as "L\"\\, p' Y a\\, C G\\, H" i.e double > escapes the next already escaped values. > and if i myself escape like : > L\"\, p' Y a\, C G then that is written as "L\\"\\, p' Y a\\, C G\\, H" > How do we just disable this automatic escaping of characters? -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-21677) json_tuple throws NullPointException when column is null as string type.
[ https://issues.apache.org/jira/browse/SPARK-21677?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16119891#comment-16119891 ] Hyukjin Kwon edited comment on SPARK-21677 at 8/9/17 1:26 PM: -- cc [~viirya], I remember your mentee was checking through JSON related code paths. Does this make sense to you and would you be interested in this? I don't have time to work on this and am currently fighting with AppVeyor time limit issue. was (Author: hyukjin.kwon): cc [~viirya], I remember your mentee was checking through JSON related code paths. Does this make sense to you and would you be interested in this? > json_tuple throws NullPointException when column is null as string type. > > > Key: SPARK-21677 > URL: https://issues.apache.org/jira/browse/SPARK-21677 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.3.0 >Reporter: Hyukjin Kwon >Priority: Minor > Labels: Starter > > I was testing {{json_tuple}} before using this to extract values from JSONs > in my testing cluster but I found it could actually throw > {{NullPointException}} as below sometimes: > {code} > scala> Seq(("""{"Hyukjin": 224, "John": > 1225}""")).toDS.selectExpr("json_tuple(value, trim(' Hyukjin'))").show() > +---+ > | c0| > +---+ > |224| > +---+ > scala> Seq(("""{"Hyukjin": 224, "John": > 1225}""")).toDS.selectExpr("json_tuple(value, trim(' Jackson'))").show() > ++ > | c0| > ++ > |null| > ++ > scala> Seq(("""{"Hyukjin": 224, "John": > 1225}""")).toDS.selectExpr("json_tuple(value, trim(null))").show() > ... > java.lang.NullPointerException > at > org.apache.spark.sql.catalyst.expressions.JsonTuple$$anonfun$foldableFieldNames$1.apply(jsonExpressions.scala:367) > at > org.apache.spark.sql.catalyst.expressions.JsonTuple$$anonfun$foldableFieldNames$1.apply(jsonExpressions.scala:366) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) > at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) > at scala.collection.AbstractTraversable.map(Traversable.scala:104) > at > org.apache.spark.sql.catalyst.expressions.JsonTuple.foldableFieldNames$lzycompute(jsonExpressions.scala:366) > at > org.apache.spark.sql.catalyst.expressions.JsonTuple.foldableFieldNames(jsonExpressions.scala:365) > at > org.apache.spark.sql.catalyst.expressions.JsonTuple.constantFields$lzycompute(jsonExpressions.scala:373) > at > org.apache.spark.sql.catalyst.expressions.JsonTuple.constantFields(jsonExpressions.scala:373) > at > org.apache.spark.sql.catalyst.expressions.JsonTuple.org$apache$spark$sql$catalyst$expressions$JsonTuple$$parseRow(jsonExpressions.scala:417) > at > org.apache.spark.sql.catalyst.expressions.JsonTuple$$anonfun$eval$4.apply(jsonExpressions.scala:401) > at > org.apache.spark.sql.catalyst.expressions.JsonTuple$$anonfun$eval$4.apply(jsonExpressions.scala:400) > at org.apache.spark.util.Utils$.tryWithResource(Utils.scala:2559) > at > org.apache.spark.sql.catalyst.expressions.JsonTuple.eval(jsonExpressions.scala:400) > {code} > It sounds we should show explicit error messages or return {{NULL}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-21678) Disabling quotes while writing a dataframe
[ https://issues.apache.org/jira/browse/SPARK-21678?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Taran Saini updated SPARK-21678: Description: Hi, I have the my dataframe cloumn values which can contain commas, double quotes etc. I am transforming the dataframes in order to ensure that all the required values are escaped. However, on doing df.write.format("csv") It again wraps the values in double quotes. How do I disable the same? And even if the double quotes are there to stay why does it do the following : L"\, p' Y a\, C G is written as "L\"\\, p' Y a\\, C G\\, H" i.e double escapes the next already escaped values. and if i myself escape like : L\"\, p' Y a\, C G then that is written as "L\\"\\, p' Y a\\, C G\\, H" How do we just disable this automatic escaping of characters? was: Hi, I have the my dataframe cloumn values which can contain commas, double quotes etc. I am transforming the dataframes in order to ensure that all the required values are escaped. However, on doing df.write.format("csv") It again wraps the values in double quotes. How do I disable the same? And even if the double quotes are there to stay why does it do the following : L"\, p' Y a\, C G is written as "L\"\\, p' Y a\\, C G\\, H" i.e double escapes the next already escaped values. I and if i myself escape like : L\"\, p' Y a\, C G then that is written as "L\\"\\, p' Y a\\, C G\\, H" How do we just disable this automatic escaping of characters? > Disabling quotes while writing a dataframe > -- > > Key: SPARK-21678 > URL: https://issues.apache.org/jira/browse/SPARK-21678 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.2.0 >Reporter: Taran Saini > > Hi, > I have the my dataframe cloumn values which can contain commas, double quotes > etc. > I am transforming the dataframes in order to ensure that all the required > values are escaped. > However, on doing df.write.format("csv") > It again wraps the values in double quotes. How do I disable the same? > And even if the double quotes are there to stay why does it do the following : > L"\, p' Y a\, C G is written > as "L\"\\, p' Y a\\, C G\\, H" i.e double escapes the next already escaped > values. > and > if i myself escape like : > L\"\, p' Y a\, C G then that is written as > "L\\"\\, p' Y a\\, C G\\, H" > How do we just disable this automatic escaping of characters? -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-21677) json_tuple throws NullPointException when column is null as string type.
[ https://issues.apache.org/jira/browse/SPARK-21677?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16119891#comment-16119891 ] Hyukjin Kwon commented on SPARK-21677: -- cc [~viirya], I remember your mentee was checking through JSON related code paths. Does this make sense to you and would you be interested in this? > json_tuple throws NullPointException when column is null as string type. > > > Key: SPARK-21677 > URL: https://issues.apache.org/jira/browse/SPARK-21677 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.3.0 >Reporter: Hyukjin Kwon >Priority: Minor > Labels: Starter > > I was testing {{json_tuple}} before using this to extract values from JSONs > in my testing cluster but I found it could actually throw > {{NullPointException}} as below sometimes: > {code} > scala> Seq(("""{"Hyukjin": 224, "John": > 1225}""")).toDS.selectExpr("json_tuple(value, trim(' Hyukjin'))").show() > +---+ > | c0| > +---+ > |224| > +---+ > scala> Seq(("""{"Hyukjin": 224, "John": > 1225}""")).toDS.selectExpr("json_tuple(value, trim(' Jackson'))").show() > ++ > | c0| > ++ > |null| > ++ > scala> Seq(("""{"Hyukjin": 224, "John": > 1225}""")).toDS.selectExpr("json_tuple(value, trim(null))").show() > ... > java.lang.NullPointerException > at > org.apache.spark.sql.catalyst.expressions.JsonTuple$$anonfun$foldableFieldNames$1.apply(jsonExpressions.scala:367) > at > org.apache.spark.sql.catalyst.expressions.JsonTuple$$anonfun$foldableFieldNames$1.apply(jsonExpressions.scala:366) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) > at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) > at scala.collection.AbstractTraversable.map(Traversable.scala:104) > at > org.apache.spark.sql.catalyst.expressions.JsonTuple.foldableFieldNames$lzycompute(jsonExpressions.scala:366) > at > org.apache.spark.sql.catalyst.expressions.JsonTuple.foldableFieldNames(jsonExpressions.scala:365) > at > org.apache.spark.sql.catalyst.expressions.JsonTuple.constantFields$lzycompute(jsonExpressions.scala:373) > at > org.apache.spark.sql.catalyst.expressions.JsonTuple.constantFields(jsonExpressions.scala:373) > at > org.apache.spark.sql.catalyst.expressions.JsonTuple.org$apache$spark$sql$catalyst$expressions$JsonTuple$$parseRow(jsonExpressions.scala:417) > at > org.apache.spark.sql.catalyst.expressions.JsonTuple$$anonfun$eval$4.apply(jsonExpressions.scala:401) > at > org.apache.spark.sql.catalyst.expressions.JsonTuple$$anonfun$eval$4.apply(jsonExpressions.scala:400) > at org.apache.spark.util.Utils$.tryWithResource(Utils.scala:2559) > at > org.apache.spark.sql.catalyst.expressions.JsonTuple.eval(jsonExpressions.scala:400) > {code} > It sounds we should show explicit error messages or return {{NULL}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-21678) Disabling quotes while writing a dataframe
Taran Saini created SPARK-21678: --- Summary: Disabling quotes while writing a dataframe Key: SPARK-21678 URL: https://issues.apache.org/jira/browse/SPARK-21678 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 2.2.0 Reporter: Taran Saini Hi, I have the my dataframe cloumn values which can contain commas, double quotes etc. I am transforming the dataframes in order to ensure that all the required values are escaped. However, on doing df.write.format("csv") It again wraps the values in double quotes. How do I disable the same? And even if the double quotes are there to stay why does it do the following : L"\, p' Y a\, C G is written as "L\"\\, p' Y a\\, C G\\, H" i.e double escapes the next already escaped values. I and if i myself escape like : L\"\, p' Y a\, C G then that is written as "L\\"\\, p' Y a\\, C G\\, H" How do we just disable this automatic escaping of characters? -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-21677) json_tuple throws NullPointException when column is null as string type.
Hyukjin Kwon created SPARK-21677: Summary: json_tuple throws NullPointException when column is null as string type. Key: SPARK-21677 URL: https://issues.apache.org/jira/browse/SPARK-21677 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 2.3.0 Reporter: Hyukjin Kwon Priority: Minor I was testing {{json_tuple}} before using this to extract values from JSONs in my testing cluster but I found it could actually throw {{NullPointException}} as below sometimes: {code} scala> Seq(("""{"Hyukjin": 224, "John": 1225}""")).toDS.selectExpr("json_tuple(value, trim(' Hyukjin'))").show() +---+ | c0| +---+ |224| +---+ scala> Seq(("""{"Hyukjin": 224, "John": 1225}""")).toDS.selectExpr("json_tuple(value, trim(' Jackson'))").show() ++ | c0| ++ |null| ++ scala> Seq(("""{"Hyukjin": 224, "John": 1225}""")).toDS.selectExpr("json_tuple(value, trim(null))").show() ... java.lang.NullPointerException at org.apache.spark.sql.catalyst.expressions.JsonTuple$$anonfun$foldableFieldNames$1.apply(jsonExpressions.scala:367) at org.apache.spark.sql.catalyst.expressions.JsonTuple$$anonfun$foldableFieldNames$1.apply(jsonExpressions.scala:366) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.spark.sql.catalyst.expressions.JsonTuple.foldableFieldNames$lzycompute(jsonExpressions.scala:366) at org.apache.spark.sql.catalyst.expressions.JsonTuple.foldableFieldNames(jsonExpressions.scala:365) at org.apache.spark.sql.catalyst.expressions.JsonTuple.constantFields$lzycompute(jsonExpressions.scala:373) at org.apache.spark.sql.catalyst.expressions.JsonTuple.constantFields(jsonExpressions.scala:373) at org.apache.spark.sql.catalyst.expressions.JsonTuple.org$apache$spark$sql$catalyst$expressions$JsonTuple$$parseRow(jsonExpressions.scala:417) at org.apache.spark.sql.catalyst.expressions.JsonTuple$$anonfun$eval$4.apply(jsonExpressions.scala:401) at org.apache.spark.sql.catalyst.expressions.JsonTuple$$anonfun$eval$4.apply(jsonExpressions.scala:400) at org.apache.spark.util.Utils$.tryWithResource(Utils.scala:2559) at org.apache.spark.sql.catalyst.expressions.JsonTuple.eval(jsonExpressions.scala:400) {code} It sounds we should show explicit error messages or return {{NULL}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-21665) Need to close resources after use
[ https://issues.apache.org/jira/browse/SPARK-21665?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sean Owen resolved SPARK-21665. --- Resolution: Fixed Fix Version/s: 2.3.0 Issue resolved by pull request 18880 [https://github.com/apache/spark/pull/18880] > Need to close resources after use > -- > > Key: SPARK-21665 > URL: https://issues.apache.org/jira/browse/SPARK-21665 > Project: Spark > Issue Type: Improvement > Components: Spark Core, YARN >Affects Versions: 2.2.0 >Reporter: Vinod KC >Assignee: Vinod KC >Priority: Trivial > Fix For: 2.3.0 > > > Some methods in Core - SparkSubmitArguments.scala, Spark-launcher - > AbstractCommandBuilder.java, resource-managers- YARN - Client.scala fail to > clean resources -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-21665) Need to close resources after use
[ https://issues.apache.org/jira/browse/SPARK-21665?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sean Owen reassigned SPARK-21665: - Assignee: Vinod KC Priority: Trivial (was: Minor) > Need to close resources after use > -- > > Key: SPARK-21665 > URL: https://issues.apache.org/jira/browse/SPARK-21665 > Project: Spark > Issue Type: Improvement > Components: Spark Core, YARN >Affects Versions: 2.2.0 >Reporter: Vinod KC >Assignee: Vinod KC >Priority: Trivial > > Some methods in Core - SparkSubmitArguments.scala, Spark-launcher - > AbstractCommandBuilder.java, resource-managers- YARN - Client.scala fail to > clean resources -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-21673) Spark local directory is not set correctly
[ https://issues.apache.org/jira/browse/SPARK-21673?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16119874#comment-16119874 ] Jake Charland commented on SPARK-21673: --- https://github.com/apache/spark/pull/18894 > Spark local directory is not set correctly > -- > > Key: SPARK-21673 > URL: https://issues.apache.org/jira/browse/SPARK-21673 > Project: Spark > Issue Type: Bug > Components: Block Manager, Mesos >Affects Versions: 2.2.0 >Reporter: Jake Charland > > Currently the way Spark discovers the Mesos sandbox is wrong. As seen here > https://github.com/apache/spark/blob/e26dac5feb02033f980b1e69c9b0ff50869b6f9e/core/src/main/scala/org/apache/spark/util/Utils.scala#L806 > it is checking the env variable called MESOS_DIRECTORY however this env > variable was depricated (see > https://www.mail-archive.com/dev@mesos.apache.org/msg36621.html) in favor of > using MESOS_SANDBOX env variable. This should be updated in the spark code to > reflect this change in mesos. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-21676) cannot compile on hadoop 2.2.0 and hive
[ https://issues.apache.org/jira/browse/SPARK-21676?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sean Owen resolved SPARK-21676. --- Resolution: Invalid Hadoop 2.2 is not supported > cannot compile on hadoop 2.2.0 and hive > --- > > Key: SPARK-21676 > URL: https://issues.apache.org/jira/browse/SPARK-21676 > Project: Spark > Issue Type: Bug > Components: Build >Affects Versions: 2.2.0 > Environment: centos6, java8, maven3.5.0, hadoop2.2, hive-0.12.0 >Reporter: Qinghe Jin > > Using following command to compile: > “./make-distribution.sh --tgz -Phadoop-2.2 -Pyarn -Phive > -Dhadoop.version=2.2.0” > Then get the following output: > >>>[error] > >>>SPARK_DIR/spark-2.2.0/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala:149: > >>> value getThreadStatistics is not a member of > >>>org.apache.hadoop.fs.FileSystem.Statistics > >>>[error] val f = () => > >>>FileSystem.getAllStatistics.asScala.map(_.getThreadStatistics.getBytesRead).sum > >>>[error] ^ > >>>[error] > >>>SPARK_DIR/spark-2.2.0/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala:149: > >>> ambiguous implicit values: > >>>[error] both object BigIntIsIntegral in object Numeric of type > >>>scala.math.Numeric.BigIntIsIntegral.type > >>>[error] and object ShortIsIntegral in object Numeric of type > >>>scala.math.Numeric.ShortIsIntegral.type > >>>[error] match expected type Numeric[B] > >>>[error] val f = () => > >>>FileSystem.getAllStatistics.asScala.map(_.getThreadStatistics.getBytesRead).sum > >>>[error] > >>> ^ > >>>[error] > >>>SPARK_DIR/spark-2.2.0/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala:166: > >>> could not find implicit value for parameter num: Numeric[(Nothing, > >>>Nothing)] > >>>[error] }.sum > >>>[error] ^ > >>>[error] > >>>SPARK_DIR/spark-2.2.0/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala:180: > >>> value getThreadStatistics is not a member of > >>>org.apache.hadoop.fs.FileSystem.Statistics > >>>[error] val threadStats = > >>>FileSystem.getAllStatistics.asScala.map(_.getThreadStatistics) > >>>[error] ^ > >>>[error] > >>>SPARK_DIR/spark-2.2.0/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala:181: > >>> value getBytesWritten is not a member of Nothing > >>>[error] val f = () => threadStats.map(_.getBytesWritten).sum > >>>[error] ^ > >>>[error] > >>>SPARK_DIR/spark-2.2.0/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala:181: > >>> ambiguous implicit values: > >>>[error] both object BigIntIsIntegral in object Numeric of type > >>>scala.math.Numeric.BigIntIsIntegral.type > >>>[error] and object ShortIsIntegral in object Numeric of type > >>>scala.math.Numeric.ShortIsIntegral.type > >>>[error] match expected type Numeric[B] > >>>[error] val f = () => threadStats.map(_.getBytesWritten).sum > >>>[error] ^ > >>>[WARNING] unused-1.0.0.jar, spark-network-common_2.11-2.2.0.jar, > >>>spark-network-shuffle_2.11-2.2.0.jar, spark-tags_2.11-2.2.0.jar define 1 > >>>overlapping classes: > >>>[WARNING] - org.apache.spark.unused.UnusedStubClass > >>>[WARNING] maven-shade-plugin has detected that some class files are > >>>[WARNING] present in two or more JARs. When this happens, only one > >>>[WARNING] single version of the class is copied to the uber jar. > >>>[WARNING] Usually this is not harmful and you can skip these warnings, > >>>[WARNING] otherwise try to manually exclude artifacts based on > >>>[WARNING] mvn dependency:tree -Ddetail=true and the above output. > >>>[WARNING] See http://maven.apache.org/plugins/maven-shade-plugin/ > >>>[INFO] > >>>[INFO] --- maven-source-plugin:3.0.1:jar-no-fork (create-source-jar) @ > >>>spark-network-yarn_2.11 --- > >>>[INFO] Building jar: > >>>SPARK_DIR/spark-2.2.0/common/network-yarn/target/spark-network-yarn_2.11-2.2.0-sources.jar > >>>[INFO] > >>>[INFO] --- maven-source-plugin:3.0.1:test-jar-no-fork (create-source-jar) > >>>@ spark-network-yarn_2.11 --- > >>>[INFO] Building jar: > >>>SPARK_DIR/spark-2.2.0/common/network-yarn/target/spark-network-yarn_2.11-2.2.0-test-sources.jar > >>>[error] > >>>SPARK_DIR/spark-2.2.0/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala:324: > >>> not found: type InputSplitWithLocationInfo > >>>[error] case lsplit: InputSplitWithLocationInfo => > >>>[error]^ > >>>[error] > >>>SPARK_DIR/spark-2.2.0/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala:403: > >>> not found: type
[jira] [Created] (SPARK-21676) cannot compile on hadoop 2.2.0 and hive
Qinghe Jin created SPARK-21676: -- Summary: cannot compile on hadoop 2.2.0 and hive Key: SPARK-21676 URL: https://issues.apache.org/jira/browse/SPARK-21676 Project: Spark Issue Type: Bug Components: Build Affects Versions: 2.2.0 Environment: centos6, java8, maven3.5.0, hadoop2.2, hive-0.12.0 Reporter: Qinghe Jin Using following command to compile: “./make-distribution.sh --tgz -Phadoop-2.2 -Pyarn -Phive -Dhadoop.version=2.2.0” Then get the following output: >>>[error] >>>SPARK_DIR/spark-2.2.0/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala:149: >>> value getThreadStatistics is not a member of >>>org.apache.hadoop.fs.FileSystem.Statistics >>>[error] val f = () => >>>FileSystem.getAllStatistics.asScala.map(_.getThreadStatistics.getBytesRead).sum >>>[error] ^ >>>[error] >>>SPARK_DIR/spark-2.2.0/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala:149: >>> ambiguous implicit values: >>>[error] both object BigIntIsIntegral in object Numeric of type >>>scala.math.Numeric.BigIntIsIntegral.type >>>[error] and object ShortIsIntegral in object Numeric of type >>>scala.math.Numeric.ShortIsIntegral.type >>>[error] match expected type Numeric[B] >>>[error] val f = () => >>>FileSystem.getAllStatistics.asScala.map(_.getThreadStatistics.getBytesRead).sum >>>[error] >>> ^ >>>[error] >>>SPARK_DIR/spark-2.2.0/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala:166: >>> could not find implicit value for parameter num: Numeric[(Nothing, Nothing)] >>>[error] }.sum >>>[error] ^ >>>[error] >>>SPARK_DIR/spark-2.2.0/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala:180: >>> value getThreadStatistics is not a member of >>>org.apache.hadoop.fs.FileSystem.Statistics >>>[error] val threadStats = >>>FileSystem.getAllStatistics.asScala.map(_.getThreadStatistics) >>>[error] ^ >>>[error] >>>SPARK_DIR/spark-2.2.0/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala:181: >>> value getBytesWritten is not a member of Nothing >>>[error] val f = () => threadStats.map(_.getBytesWritten).sum >>>[error] ^ >>>[error] >>>SPARK_DIR/spark-2.2.0/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala:181: >>> ambiguous implicit values: >>>[error] both object BigIntIsIntegral in object Numeric of type >>>scala.math.Numeric.BigIntIsIntegral.type >>>[error] and object ShortIsIntegral in object Numeric of type >>>scala.math.Numeric.ShortIsIntegral.type >>>[error] match expected type Numeric[B] >>>[error] val f = () => threadStats.map(_.getBytesWritten).sum >>>[error] ^ >>>[WARNING] unused-1.0.0.jar, spark-network-common_2.11-2.2.0.jar, >>>spark-network-shuffle_2.11-2.2.0.jar, spark-tags_2.11-2.2.0.jar define 1 >>>overlapping classes: >>>[WARNING] - org.apache.spark.unused.UnusedStubClass >>>[WARNING] maven-shade-plugin has detected that some class files are >>>[WARNING] present in two or more JARs. When this happens, only one >>>[WARNING] single version of the class is copied to the uber jar. >>>[WARNING] Usually this is not harmful and you can skip these warnings, >>>[WARNING] otherwise try to manually exclude artifacts based on >>>[WARNING] mvn dependency:tree -Ddetail=true and the above output. >>>[WARNING] See http://maven.apache.org/plugins/maven-shade-plugin/ >>>[INFO] >>>[INFO] --- maven-source-plugin:3.0.1:jar-no-fork (create-source-jar) @ >>>spark-network-yarn_2.11 --- >>>[INFO] Building jar: >>>SPARK_DIR/spark-2.2.0/common/network-yarn/target/spark-network-yarn_2.11-2.2.0-sources.jar >>>[INFO] >>>[INFO] --- maven-source-plugin:3.0.1:test-jar-no-fork (create-source-jar) @ >>>spark-network-yarn_2.11 --- >>>[INFO] Building jar: >>>SPARK_DIR/spark-2.2.0/common/network-yarn/target/spark-network-yarn_2.11-2.2.0-test-sources.jar >>>[error] >>>SPARK_DIR/spark-2.2.0/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala:324: >>> not found: type InputSplitWithLocationInfo >>>[error] case lsplit: InputSplitWithLocationInfo => >>>[error]^ >>>[error] >>>SPARK_DIR/spark-2.2.0/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala:403: >>> not found: type SplitLocationInfo >>>[error]infos: Array[SplitLocationInfo]): Option[Seq[String]] = { >>>[error] ^ >>>[error] >>>SPARK_DIR/spark-2.2.0/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala:325: >>> value getLocationInfo is not a member of org.apache.hadoop.mapred.InputSplit >>>[error] HadoopRDD.convertSplitLocationInfo(lsplit.getLocationInfo)
[jira] [Assigned] (SPARK-21663) MapOutputTrackerSuite case test("remote fetch below max RPC message size") should call stop
[ https://issues.apache.org/jira/browse/SPARK-21663?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Wenchen Fan reassigned SPARK-21663: --- Assignee: wangjiaochun > MapOutputTrackerSuite case test("remote fetch below max RPC message size") > should call stop > --- > > Key: SPARK-21663 > URL: https://issues.apache.org/jira/browse/SPARK-21663 > Project: Spark > Issue Type: Test > Components: Tests >Affects Versions: 2.2.0 >Reporter: wangjiaochun >Assignee: wangjiaochun >Priority: Minor > Fix For: 2.2.1, 2.3.0 > > > MapOutputTrackerSuite case test("remote fetch below max RPC message size") > should call masterTracker.stop() free resource after run over. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-21663) MapOutputTrackerSuite case test("remote fetch below max RPC message size") should call stop
[ https://issues.apache.org/jira/browse/SPARK-21663?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Wenchen Fan resolved SPARK-21663. - Resolution: Fixed Fix Version/s: 2.3.0 2.2.1 Issue resolved by pull request 18867 [https://github.com/apache/spark/pull/18867] > MapOutputTrackerSuite case test("remote fetch below max RPC message size") > should call stop > --- > > Key: SPARK-21663 > URL: https://issues.apache.org/jira/browse/SPARK-21663 > Project: Spark > Issue Type: Test > Components: Tests >Affects Versions: 2.2.0 >Reporter: wangjiaochun >Priority: Minor > Fix For: 2.2.1, 2.3.0 > > > MapOutputTrackerSuite case test("remote fetch below max RPC message size") > should call masterTracker.stop() free resource after run over. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-21520) Improvement a special case for non-deterministic projects and filters in optimizer
[ https://issues.apache.org/jira/browse/SPARK-21520?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16119617#comment-16119617 ] caoxuewen commented on SPARK-21520: --- User 'heary-cao' has created a pull request for this issue: https://github.com/apache/spark/pull/18892 > Improvement a special case for non-deterministic projects and filters in > optimizer > -- > > Key: SPARK-21520 > URL: https://issues.apache.org/jira/browse/SPARK-21520 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.3.0 >Reporter: caoxuewen > > Currently, Did a lot of special handling for non-deterministic projects and > filters in optimizer. but not good enough. this patch add a new special case > for non-deterministic projects and filters. Deal with that we only need to > read user needs fields for non-deterministic projects and filters in > optimizer. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-21520) Improvement a special case for non-deterministic projects and filters in optimizer
[ https://issues.apache.org/jira/browse/SPARK-21520?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] caoxuewen updated SPARK-21520: -- Description: Currently, Did a lot of special handling for non-deterministic projects and filters in optimizer. but not good enough. this patch add a new special case for non-deterministic projects and filters. Deal with that we only need to read user needs fields for non-deterministic projects and filters in optimizer. was: Currently, when the rand function is present in the SQL statement, hivetable searches all columns in the table. e.g: select k,k,sum(id) from (select d004 as id, floor(rand() * 1) as k, ceil(c010) as cceila from XXX_table) a group by k,k; generate WholeStageCodegen subtrees: == Subtree 1 / 2 == *HashAggregate(keys=[k#403L], functions=[partial_sum(cast(id#402 as bigint))], output=[k#403L, sum#800L]) +- Project [d004#607 AS id#402, FLOOR((rand(8828525941469309371) * 1.0)) AS k#403L] +- HiveTableScan [c030#606L, d004#607, d005#608, d025#609, c002#610, d023#611, d024#612, c005#613L, c008#614, c009#615, c010#616, d021#617, d022#618, c017#619, c018#620, c019#621, c020#622, c021#623, c022#624, c023#625, c024#626, c025#627, c026#628, c027#629, ... 169 more fields], MetastoreRelation XXX_database, XXX_table == Subtree 2 / 2 == *HashAggregate(keys=[k#403L], functions=[sum(cast(id#402 as bigint))], output=[k#403L, k#403L, sum(id)#797L]) +- Exchange hashpartitioning(k#403L, 200) +- *HashAggregate(keys=[k#403L], functions=[partial_sum(cast(id#402 as bigint))], output=[k#403L, sum#800L]) +- Project [d004#607 AS id#402, FLOOR((rand(8828525941469309371) * 1.0)) AS k#403L] +- HiveTableScan [c030#606L, d004#607, d005#608, d025#609, c002#610, d023#611, d024#612, c005#613L, c008#614, c009#615, c010#616, d021#617, d022#618, c017#619, c018#620, c019#621, c020#622, c021#623, c022#624, c023#625, c024#626, c025#627, c026#628, c027#629, ... 169 more fields], MetastoreRelation XXX_database, XXX_table All columns will be searched in HiveTableScans , Consequently, All column data is read to a ORC table. e.g: INFO ReaderImpl: Reading ORC rows from hdfs://opena:8020/.../XXX_table/.../p_date=2017-05-25/p_hour=10/part-9 with {include: [true, true, true, true, true, true, true, true, true, true, true, true, true, true, true, true, true, true, true, true, true, true, true, true, true, true, true, true, true, true, true, true, true, true, true, true, true, true, true, true, true, true, true, true, true, true, true, true, true, true, true, true, true, true, true, true, true, true, true, true, true, true, true, true, true, true, true, true, true, true, true, true, true, true, true, true, true, true, true, true, true, true, true, true, true, true, true, true, true, true, true, true, true, true, true, true, true, true, true, true, true, true, true, true, true, true, true, true, true, true, true, true, true, true, true, true, true, true, true, true, true], offset: 0, length: 9223372036854775807} so, The execution of the SQL statement will become very slow. > Improvement a special case for non-deterministic projects and filters in > optimizer > -- > > Key: SPARK-21520 > URL: https://issues.apache.org/jira/browse/SPARK-21520 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.3.0 >Reporter: caoxuewen > > Currently, Did a lot of special handling for non-deterministic projects and > filters in optimizer. but not good enough. this patch add a new special case > for non-deterministic projects and filters. Deal with that we only need to > read user needs fields for non-deterministic projects and filters in > optimizer. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-21520) Improvement a special case for non-deterministic projects and filters in optimizer
[ https://issues.apache.org/jira/browse/SPARK-21520?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] caoxuewen updated SPARK-21520: -- Summary: Improvement a special case for non-deterministic projects and filters in optimizer (was: Hivetable scan for all the columns the SQL statement contains the 'rand') > Improvement a special case for non-deterministic projects and filters in > optimizer > -- > > Key: SPARK-21520 > URL: https://issues.apache.org/jira/browse/SPARK-21520 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.3.0 >Reporter: caoxuewen > > Currently, when the rand function is present in the SQL statement, hivetable > searches all columns in the table. > e.g: > select k,k,sum(id) from (select d004 as id, floor(rand() * 1) as k, > ceil(c010) as cceila from XXX_table) a > group by k,k; > generate WholeStageCodegen subtrees: > == Subtree 1 / 2 == > *HashAggregate(keys=[k#403L], functions=[partial_sum(cast(id#402 as > bigint))], output=[k#403L, sum#800L]) > +- Project [d004#607 AS id#402, FLOOR((rand(8828525941469309371) * 1.0)) > AS k#403L] >+- HiveTableScan [c030#606L, d004#607, d005#608, d025#609, c002#610, > d023#611, d024#612, c005#613L, c008#614, c009#615, c010#616, d021#617, > d022#618, c017#619, c018#620, c019#621, c020#622, c021#623, c022#624, > c023#625, c024#626, c025#627, c026#628, c027#629, ... 169 more fields], > MetastoreRelation XXX_database, XXX_table > == Subtree 2 / 2 == > *HashAggregate(keys=[k#403L], functions=[sum(cast(id#402 as bigint))], > output=[k#403L, k#403L, sum(id)#797L]) > +- Exchange hashpartitioning(k#403L, 200) >+- *HashAggregate(keys=[k#403L], functions=[partial_sum(cast(id#402 as > bigint))], output=[k#403L, sum#800L]) > +- Project [d004#607 AS id#402, FLOOR((rand(8828525941469309371) * > 1.0)) AS k#403L] > +- HiveTableScan [c030#606L, d004#607, d005#608, d025#609, c002#610, > d023#611, d024#612, c005#613L, c008#614, c009#615, c010#616, d021#617, > d022#618, c017#619, c018#620, c019#621, c020#622, c021#623, c022#624, > c023#625, c024#626, c025#627, c026#628, c027#629, ... 169 more fields], > MetastoreRelation XXX_database, XXX_table > > All columns will be searched in HiveTableScans , Consequently, All column > data is read to a ORC table. > e.g: > INFO ReaderImpl: Reading ORC rows from > hdfs://opena:8020/.../XXX_table/.../p_date=2017-05-25/p_hour=10/part-9 > with {include: [true, true, true, true, true, true, true, true, true, true, > true, true, true, true, true, true, true, true, true, true, true, true, true, > true, true, true, true, true, true, true, true, true, true, true, true, true, > true, true, true, true, true, true, true, true, true, true, true, true, true, > true, true, true, true, true, true, true, true, true, true, true, true, true, > true, true, true, true, true, true, true, true, true, true, true, true, true, > true, true, true, true, true, true, true, true, true, true, true, true, true, > true, true, true, true, true, true, true, true, true, true, true, true, true, > true, true, true, true, true, true, true, true, true, true, true, true, true, > true, true, true, true, true, true, true], offset: 0, length: > 9223372036854775807} > so, The execution of the SQL statement will become very slow. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-21675) Add a navigation bar at the bottom of the Details for Stage Page
Kent Yao created SPARK-21675: Summary: Add a navigation bar at the bottom of the Details for Stage Page Key: SPARK-21675 URL: https://issues.apache.org/jira/browse/SPARK-21675 Project: Spark Issue Type: Improvement Components: Web UI Affects Versions: 2.2.0, 2.1.1 Reporter: Kent Yao Priority: Minor 1. In Spark Web UI, the Details for Stage Page don't have a navigation bar at the bottom. When we drop down to the bottom, it is better for us to see a navi bar right there to go whereever we what. 2. Executor ID does not equal to Host, we may separate them. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-21034) Allow filter pushdown filters through non deterministic functions for columns involved in groupby / join
[ https://issues.apache.org/jira/browse/SPARK-21034?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Abhijit Bhole updated SPARK-21034: -- Description: If the column is involved in aggregation / join then pushing down filter should not change the results. Here is a sample code - {code:java} from pyspark.sql import functions as F df = spark.createDataFrame([{ "a": 1, "b" : 2, "c":7}, { "a": 3, "b" : 4, "c" : 8}, { "a": 1, "b" : 5, "c":7}, { "a": 1, "b" : 6, "c":7} ]) df.groupBy(["a"]).agg(F.sum("b")).where("a = 1").explain() df.groupBy(["a"]).agg(F.sum("b"), F.first("c")).where("a = 1").explain() == Physical Plan == *HashAggregate(keys=[a#15L], functions=[sum(b#16L)]) +- Exchange hashpartitioning(a#15L, 4) +- *HashAggregate(keys=[a#15L], functions=[partial_sum(b#16L)]) +- *Project [a#15L, b#16L] +- *Filter (isnotnull(a#15L) && (a#15L = 1)) +- Scan ExistingRDD[a#15L,b#16L,c#17L] >>> >>> df.groupBy(["a"]).agg(F.sum("b"), F.first("c")).where("a = 1").explain() == Physical Plan == *Filter (isnotnull(a#15L) && (a#15L = 1)) +- *HashAggregate(keys=[a#15L], functions=[sum(b#16L), first(c#17L, false)]) +- Exchange hashpartitioning(a#15L, 4) +- *HashAggregate(keys=[a#15L], functions=[partial_sum(b#16L), partial_first(c#17L, false)]) +- Scan ExistingRDD[a#15L,b#16L,c#17L] {code} As you can see, the filter is not pushed down when F.first aggregate function is used. was: Here is a sample code - {code:java} from pyspark.sql import functions as F df = spark.createDataFrame([{ "a": 1, "b" : 2, "c":7}, { "a": 3, "b" : 4, "c" : 8}, { "a": 1, "b" : 5, "c":7}, { "a": 1, "b" : 6, "c":7} ]) df.groupBy(["a"]).agg(F.sum("b")).where("a = 1").explain() df.groupBy(["a"]).agg(F.sum("b"), F.first("c")).where("a = 1").explain() == Physical Plan == *HashAggregate(keys=[a#15L], functions=[sum(b#16L)]) +- Exchange hashpartitioning(a#15L, 4) +- *HashAggregate(keys=[a#15L], functions=[partial_sum(b#16L)]) +- *Project [a#15L, b#16L] +- *Filter (isnotnull(a#15L) && (a#15L = 1)) +- Scan ExistingRDD[a#15L,b#16L,c#17L] >>> >>> df.groupBy(["a"]).agg(F.sum("b"), F.first("c")).where("a = 1").explain() == Physical Plan == *Filter (isnotnull(a#15L) && (a#15L = 1)) +- *HashAggregate(keys=[a#15L], functions=[sum(b#16L), first(c#17L, false)]) +- Exchange hashpartitioning(a#15L, 4) +- *HashAggregate(keys=[a#15L], functions=[partial_sum(b#16L), partial_first(c#17L, false)]) +- Scan ExistingRDD[a#15L,b#16L,c#17L] {code} As you can see, the filter is not pushed down when F.first aggregate function is used. > Allow filter pushdown filters through non deterministic functions for columns > involved in groupby / join > > > Key: SPARK-21034 > URL: https://issues.apache.org/jira/browse/SPARK-21034 > Project: Spark > Issue Type: Bug > Components: Optimizer, SQL >Affects Versions: 2.1.1, 2.2.0 >Reporter: Abhijit Bhole > > If the column is involved in aggregation / join then pushing down filter > should not change the results. > Here is a sample code - > {code:java} > from pyspark.sql import functions as F > df = spark.createDataFrame([{ "a": 1, "b" : 2, "c":7}, { "a": 3, "b" : 4, "c" > : 8}, >{ "a": 1, "b" : 5, "c":7}, { "a": 1, "b" : 6, > "c":7} ]) > df.groupBy(["a"]).agg(F.sum("b")).where("a = 1").explain() > df.groupBy(["a"]).agg(F.sum("b"), F.first("c")).where("a = 1").explain() > == Physical Plan == > *HashAggregate(keys=[a#15L], functions=[sum(b#16L)]) > +- Exchange hashpartitioning(a#15L, 4) >+- *HashAggregate(keys=[a#15L], functions=[partial_sum(b#16L)]) > +- *Project [a#15L, b#16L] > +- *Filter (isnotnull(a#15L) && (a#15L = 1)) > +- Scan ExistingRDD[a#15L,b#16L,c#17L] > >>> > >>> df.groupBy(["a"]).agg(F.sum("b"), F.first("c")).where("a = 1").explain() > == Physical Plan == > *Filter (isnotnull(a#15L) && (a#15L = 1)) > +- *HashAggregate(keys=[a#15L], functions=[sum(b#16L), first(c#17L, false)]) >+- Exchange hashpartitioning(a#15L, 4) > +- *HashAggregate(keys=[a#15L], functions=[partial_sum(b#16L), > partial_first(c#17L, false)]) > +- Scan ExistingRDD[a#15L,b#16L,c#17L] > {code} > As you can see, the filter is not pushed down when F.first aggregate function > is used. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-21034) Allow filter pushdown filters through non deterministic functions for columns involved in groupby / join
[ https://issues.apache.org/jira/browse/SPARK-21034?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Abhijit Bhole updated SPARK-21034: -- Summary: Allow filter pushdown filters through non deterministic functions for columns involved in groupby / join (was: Filter not getting pushed down the groupBy clause when first() or last() aggregate function is used) > Allow filter pushdown filters through non deterministic functions for columns > involved in groupby / join > > > Key: SPARK-21034 > URL: https://issues.apache.org/jira/browse/SPARK-21034 > Project: Spark > Issue Type: Bug > Components: Optimizer, SQL >Affects Versions: 2.1.1, 2.2.0 >Reporter: Abhijit Bhole > > Here is a sample code - > {code:java} > from pyspark.sql import functions as F > df = spark.createDataFrame([{ "a": 1, "b" : 2, "c":7}, { "a": 3, "b" : 4, "c" > : 8}, >{ "a": 1, "b" : 5, "c":7}, { "a": 1, "b" : 6, > "c":7} ]) > df.groupBy(["a"]).agg(F.sum("b")).where("a = 1").explain() > df.groupBy(["a"]).agg(F.sum("b"), F.first("c")).where("a = 1").explain() > == Physical Plan == > *HashAggregate(keys=[a#15L], functions=[sum(b#16L)]) > +- Exchange hashpartitioning(a#15L, 4) >+- *HashAggregate(keys=[a#15L], functions=[partial_sum(b#16L)]) > +- *Project [a#15L, b#16L] > +- *Filter (isnotnull(a#15L) && (a#15L = 1)) > +- Scan ExistingRDD[a#15L,b#16L,c#17L] > >>> > >>> df.groupBy(["a"]).agg(F.sum("b"), F.first("c")).where("a = 1").explain() > == Physical Plan == > *Filter (isnotnull(a#15L) && (a#15L = 1)) > +- *HashAggregate(keys=[a#15L], functions=[sum(b#16L), first(c#17L, false)]) >+- Exchange hashpartitioning(a#15L, 4) > +- *HashAggregate(keys=[a#15L], functions=[partial_sum(b#16L), > partial_first(c#17L, false)]) > +- Scan ExistingRDD[a#15L,b#16L,c#17L] > {code} > As you can see, the filter is not pushed down when F.first aggregate function > is used. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Issue Comment Deleted] (SPARK-19019) PySpark does not work with Python 3.6.0
[ https://issues.apache.org/jira/browse/SPARK-19019?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] sydt updated SPARK-19019: - Comment: was deleted (was: I meet this problem and resolved .I re-complie source code of hive-exec-1.2.1-spark2.jar of spark-2.1.0. First download sourcecode of hive-exec-1.2.1-spark2.jar and the website is: https://github.com/JoshRosen Second: download the patch and put into ReaderImpl.java https://issues.apache.org/jira/secure/attachment/12750949/HIVE-11592.1.patch Third:recompile and package the hive-exec-1.2.1-spark2.jar Last replace origin jar in spark-2.1.0/jars) > PySpark does not work with Python 3.6.0 > --- > > Key: SPARK-19019 > URL: https://issues.apache.org/jira/browse/SPARK-19019 > Project: Spark > Issue Type: Bug > Components: PySpark >Reporter: Hyukjin Kwon >Assignee: Hyukjin Kwon >Priority: Critical > Fix For: 1.6.4, 2.0.3, 2.1.1, 2.2.0 > > > Currently, PySpark does not work with Python 3.6.0. > Running {{./bin/pyspark}} simply throws the error as below: > {code} > Traceback (most recent call last): > File ".../spark/python/pyspark/shell.py", line 30, in > import pyspark > File ".../spark/python/pyspark/__init__.py", line 46, in > from pyspark.context import SparkContext > File ".../spark/python/pyspark/context.py", line 36, in > from pyspark.java_gateway import launch_gateway > File ".../spark/python/pyspark/java_gateway.py", line 31, in > from py4j.java_gateway import java_import, JavaGateway, GatewayClient > File "", line 961, in _find_and_load > File "", line 950, in _find_and_load_unlocked > File "", line 646, in _load_unlocked > File "", line 616, in _load_backward_compatible > File ".../spark/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py", line > 18, in > File > "/usr/local/Cellar/python3/3.6.0/Frameworks/Python.framework/Versions/3.6/lib/python3.6/pydoc.py", > line 62, in > import pkgutil > File > "/usr/local/Cellar/python3/3.6.0/Frameworks/Python.framework/Versions/3.6/lib/python3.6/pkgutil.py", > line 22, in > ModuleInfo = namedtuple('ModuleInfo', 'module_finder name ispkg') > File ".../spark/python/pyspark/serializers.py", line 394, in namedtuple > cls = _old_namedtuple(*args, **kwargs) > TypeError: namedtuple() missing 3 required keyword-only arguments: 'verbose', > 'rename', and 'module' > {code} > The problem is in > https://github.com/apache/spark/blob/3c68944b229aaaeeaee3efcbae3e3be9a2914855/python/pyspark/serializers.py#L386-L394 > as the error says and the cause seems because the arguments of > {{namedtuple}} are now completely keyword-only arguments from Python 3.6.0 > (See https://bugs.python.org/issue25628). > We currently copy this function via {{types.FunctionType}} which does not set > the default values of keyword-only arguments (meaning > {{namedtuple.__kwdefaults__}}) and this seems causing internally missing > values in the function (non-bound arguments). > This ends up as below: > {code} > import types > import collections > def _copy_func(f): > return types.FunctionType(f.__code__, f.__globals__, f.__name__, > f.__defaults__, f.__closure__) > _old_namedtuple = _copy_func(collections.namedtuple) > _old_namedtuple(, "b") > _old_namedtuple("a") > {code} > If we call as below: > {code} > >>> _old_namedtuple("a", "b") > Traceback (most recent call last): > File "", line 1, in > TypeError: namedtuple() missing 3 required keyword-only arguments: 'verbose', > 'rename', and 'module' > {code} > It throws an exception as above becuase {{__kwdefaults__}} for required > keyword arguments seem unset in the copied function. So, if we give explicit > value for these, > {code} > >>> _old_namedtuple("a", "b", verbose=False, rename=False, module=None) > > {code} > It works fine. > It seems now we should properly set these into the hijected one. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-20901) Feature parity for ORC with Parquet
[ https://issues.apache.org/jira/browse/SPARK-20901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16119552#comment-16119552 ] sydt commented on SPARK-20901: -- about SPARK-19019,I resolved it I meet this problem and resolved .I re-complie source code of hive-exec-1.2.1-spark2.jar of spark-2.1.0. First download sourcecode of hive-exec-1.2.1-spark2.jar and the website is: https://github.com/JoshRosen Second: download the patch and put into ReaderImpl.java https://issues.apache.org/jira/secure/attachment/12750949/HIVE-11592.1.patch Third:recompile and package the hive-exec-1.2.1-spark2.jar Last replace origin jar in spark-2.1.0/jars > Feature parity for ORC with Parquet > --- > > Key: SPARK-20901 > URL: https://issues.apache.org/jira/browse/SPARK-20901 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.1.1, 2.2.0 >Reporter: Dongjoon Hyun > > This issue aims to track the feature parity for ORC with Parquet. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-19019) PySpark does not work with Python 3.6.0
[ https://issues.apache.org/jira/browse/SPARK-19019?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16119553#comment-16119553 ] sydt commented on SPARK-19019: -- I meet this problem and resolved .I re-complie source code of hive-exec-1.2.1-spark2.jar of spark-2.1.0. First download sourcecode of hive-exec-1.2.1-spark2.jar and the website is: https://github.com/JoshRosen Second: download the patch and put into ReaderImpl.java https://issues.apache.org/jira/secure/attachment/12750949/HIVE-11592.1.patch Third:recompile and package the hive-exec-1.2.1-spark2.jar Last replace origin jar in spark-2.1.0/jars > PySpark does not work with Python 3.6.0 > --- > > Key: SPARK-19019 > URL: https://issues.apache.org/jira/browse/SPARK-19019 > Project: Spark > Issue Type: Bug > Components: PySpark >Reporter: Hyukjin Kwon >Assignee: Hyukjin Kwon >Priority: Critical > Fix For: 1.6.4, 2.0.3, 2.1.1, 2.2.0 > > > Currently, PySpark does not work with Python 3.6.0. > Running {{./bin/pyspark}} simply throws the error as below: > {code} > Traceback (most recent call last): > File ".../spark/python/pyspark/shell.py", line 30, in > import pyspark > File ".../spark/python/pyspark/__init__.py", line 46, in > from pyspark.context import SparkContext > File ".../spark/python/pyspark/context.py", line 36, in > from pyspark.java_gateway import launch_gateway > File ".../spark/python/pyspark/java_gateway.py", line 31, in > from py4j.java_gateway import java_import, JavaGateway, GatewayClient > File "", line 961, in _find_and_load > File "", line 950, in _find_and_load_unlocked > File "", line 646, in _load_unlocked > File "", line 616, in _load_backward_compatible > File ".../spark/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py", line > 18, in > File > "/usr/local/Cellar/python3/3.6.0/Frameworks/Python.framework/Versions/3.6/lib/python3.6/pydoc.py", > line 62, in > import pkgutil > File > "/usr/local/Cellar/python3/3.6.0/Frameworks/Python.framework/Versions/3.6/lib/python3.6/pkgutil.py", > line 22, in > ModuleInfo = namedtuple('ModuleInfo', 'module_finder name ispkg') > File ".../spark/python/pyspark/serializers.py", line 394, in namedtuple > cls = _old_namedtuple(*args, **kwargs) > TypeError: namedtuple() missing 3 required keyword-only arguments: 'verbose', > 'rename', and 'module' > {code} > The problem is in > https://github.com/apache/spark/blob/3c68944b229aaaeeaee3efcbae3e3be9a2914855/python/pyspark/serializers.py#L386-L394 > as the error says and the cause seems because the arguments of > {{namedtuple}} are now completely keyword-only arguments from Python 3.6.0 > (See https://bugs.python.org/issue25628). > We currently copy this function via {{types.FunctionType}} which does not set > the default values of keyword-only arguments (meaning > {{namedtuple.__kwdefaults__}}) and this seems causing internally missing > values in the function (non-bound arguments). > This ends up as below: > {code} > import types > import collections > def _copy_func(f): > return types.FunctionType(f.__code__, f.__globals__, f.__name__, > f.__defaults__, f.__closure__) > _old_namedtuple = _copy_func(collections.namedtuple) > _old_namedtuple(, "b") > _old_namedtuple("a") > {code} > If we call as below: > {code} > >>> _old_namedtuple("a", "b") > Traceback (most recent call last): > File "", line 1, in > TypeError: namedtuple() missing 3 required keyword-only arguments: 'verbose', > 'rename', and 'module' > {code} > It throws an exception as above becuase {{__kwdefaults__}} for required > keyword arguments seem unset in the copied function. So, if we give explicit > value for these, > {code} > >>> _old_namedtuple("a", "b", verbose=False, rename=False, module=None) > > {code} > It works fine. > It seems now we should properly set these into the hijected one. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-19109) ORC metadata section can sometimes exceed protobuf message size limit
[ https://issues.apache.org/jira/browse/SPARK-19109?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16119546#comment-16119546 ] sydt edited comment on SPARK-19109 at 8/9/17 8:05 AM: -- I meet this problem and resolved .I re-complie source code of hive-exec-1.2.1-spark2.jar of spark-2.1.0. First download sourcecode of hive-exec-1.2.1-spark2.jar and the website is: https://github.com/JoshRosen Second: download the patch and put into ReaderImpl.java https://issues.apache.org/jira/secure/attachment/12750949/HIVE-11592.1.patch Third:recompile and package the hive-exec-1.2.1-spark2.jar Last replace origin jar in spark-2.1.0/jars was (Author: wangchao2017): I meet this problem and resolved .I re-complie source code of hive-exec-1.2.1-spark2.jar of spark-2.1.0. First download sourcecode of hive-exec-1.2.1-spark2.jar and the website is: https://github.com/JoshRosen Second: download the patch and put into ReaderImpl https://issues.apache.org/jira/secure/attachment/12750949/HIVE-11592.1.patch Third:recompile and package the hive-exec-1.2.1-spark2.jar Last replace origin jar in spark-2.1.0/jars > ORC metadata section can sometimes exceed protobuf message size limit > - > > Key: SPARK-19109 > URL: https://issues.apache.org/jira/browse/SPARK-19109 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 1.6.3, 2.0.2, 2.1.0, 2.2.0 >Reporter: Nic Eggert > > Basically, Spark inherits HIVE-11592 from its Hive dependency. From that > issue: > If there are too many small stripes and with many columns, the overhead for > storing metadata (column stats) can exceed the default protobuf message size > of 64MB. Reading such files will throw the following exception > {code} > Exception in thread "main" > com.google.protobuf.InvalidProtocolBufferException: Protocol message was too > large. May be malicious. Use CodedInputStream.setSizeLimit() to increase > the size limit. > at > com.google.protobuf.InvalidProtocolBufferException.sizeLimitExceeded(InvalidProtocolBufferException.java:110) > at > com.google.protobuf.CodedInputStream.refillBuffer(CodedInputStream.java:755) > at > com.google.protobuf.CodedInputStream.readRawBytes(CodedInputStream.java:811) > at > com.google.protobuf.CodedInputStream.readBytes(CodedInputStream.java:329) > at > org.apache.hadoop.hive.ql.io.orc.OrcProto$StringStatistics.(OrcProto.java:1331) > at > org.apache.hadoop.hive.ql.io.orc.OrcProto$StringStatistics.(OrcProto.java:1281) > at > org.apache.hadoop.hive.ql.io.orc.OrcProto$StringStatistics$1.parsePartialFrom(OrcProto.java:1374) > at > org.apache.hadoop.hive.ql.io.orc.OrcProto$StringStatistics$1.parsePartialFrom(OrcProto.java:1369) > at > com.google.protobuf.CodedInputStream.readMessage(CodedInputStream.java:309) > at > org.apache.hadoop.hive.ql.io.orc.OrcProto$ColumnStatistics.(OrcProto.java:4887) > at > org.apache.hadoop.hive.ql.io.orc.OrcProto$ColumnStatistics.(OrcProto.java:4803) > at > org.apache.hadoop.hive.ql.io.orc.OrcProto$ColumnStatistics$1.parsePartialFrom(OrcProto.java:4990) > at > org.apache.hadoop.hive.ql.io.orc.OrcProto$ColumnStatistics$1.parsePartialFrom(OrcProto.java:4985) > at > com.google.protobuf.CodedInputStream.readMessage(CodedInputStream.java:309) > at > org.apache.hadoop.hive.ql.io.orc.OrcProto$StripeStatistics.(OrcProto.java:12925) > at > org.apache.hadoop.hive.ql.io.orc.OrcProto$StripeStatistics.(OrcProto.java:12872) > at > org.apache.hadoop.hive.ql.io.orc.OrcProto$StripeStatistics$1.parsePartialFrom(OrcProto.java:12961) > at > org.apache.hadoop.hive.ql.io.orc.OrcProto$StripeStatistics$1.parsePartialFrom(OrcProto.java:12956) > at > com.google.protobuf.CodedInputStream.readMessage(CodedInputStream.java:309) > at > org.apache.hadoop.hive.ql.io.orc.OrcProto$Metadata.(OrcProto.java:13599) > at > org.apache.hadoop.hive.ql.io.orc.OrcProto$Metadata.(OrcProto.java:13546) > at > org.apache.hadoop.hive.ql.io.orc.OrcProto$Metadata$1.parsePartialFrom(OrcProto.java:13635) > at > org.apache.hadoop.hive.ql.io.orc.OrcProto$Metadata$1.parsePartialFrom(OrcProto.java:13630) > at > com.google.protobuf.AbstractParser.parsePartialFrom(AbstractParser.java:200) > at > com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:217) > at > com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:223) > at > com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:49) > at > org.apache.hadoop.hive.ql.io.orc.OrcProto$Metadata.parseFrom(OrcProto.java:13746) > at >
[jira] [Commented] (SPARK-19109) ORC metadata section can sometimes exceed protobuf message size limit
[ https://issues.apache.org/jira/browse/SPARK-19109?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16119546#comment-16119546 ] sydt commented on SPARK-19109: -- I meet this problem and resolved .I re-complie source code of hive-exec-1.2.1-spark2.jar of spark-2.1.0. First download sourcecode of hive-exec-1.2.1-spark2.jar and the website is: https://github.com/JoshRosen Second: download the patch and put into ReaderImpl https://issues.apache.org/jira/secure/attachment/12750949/HIVE-11592.1.patch Third:recompile and package the hive-exec-1.2.1-spark2.jar Last replace origin jar in spark-2.1.0/jars > ORC metadata section can sometimes exceed protobuf message size limit > - > > Key: SPARK-19109 > URL: https://issues.apache.org/jira/browse/SPARK-19109 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 1.6.3, 2.0.2, 2.1.0, 2.2.0 >Reporter: Nic Eggert > > Basically, Spark inherits HIVE-11592 from its Hive dependency. From that > issue: > If there are too many small stripes and with many columns, the overhead for > storing metadata (column stats) can exceed the default protobuf message size > of 64MB. Reading such files will throw the following exception > {code} > Exception in thread "main" > com.google.protobuf.InvalidProtocolBufferException: Protocol message was too > large. May be malicious. Use CodedInputStream.setSizeLimit() to increase > the size limit. > at > com.google.protobuf.InvalidProtocolBufferException.sizeLimitExceeded(InvalidProtocolBufferException.java:110) > at > com.google.protobuf.CodedInputStream.refillBuffer(CodedInputStream.java:755) > at > com.google.protobuf.CodedInputStream.readRawBytes(CodedInputStream.java:811) > at > com.google.protobuf.CodedInputStream.readBytes(CodedInputStream.java:329) > at > org.apache.hadoop.hive.ql.io.orc.OrcProto$StringStatistics.(OrcProto.java:1331) > at > org.apache.hadoop.hive.ql.io.orc.OrcProto$StringStatistics.(OrcProto.java:1281) > at > org.apache.hadoop.hive.ql.io.orc.OrcProto$StringStatistics$1.parsePartialFrom(OrcProto.java:1374) > at > org.apache.hadoop.hive.ql.io.orc.OrcProto$StringStatistics$1.parsePartialFrom(OrcProto.java:1369) > at > com.google.protobuf.CodedInputStream.readMessage(CodedInputStream.java:309) > at > org.apache.hadoop.hive.ql.io.orc.OrcProto$ColumnStatistics.(OrcProto.java:4887) > at > org.apache.hadoop.hive.ql.io.orc.OrcProto$ColumnStatistics.(OrcProto.java:4803) > at > org.apache.hadoop.hive.ql.io.orc.OrcProto$ColumnStatistics$1.parsePartialFrom(OrcProto.java:4990) > at > org.apache.hadoop.hive.ql.io.orc.OrcProto$ColumnStatistics$1.parsePartialFrom(OrcProto.java:4985) > at > com.google.protobuf.CodedInputStream.readMessage(CodedInputStream.java:309) > at > org.apache.hadoop.hive.ql.io.orc.OrcProto$StripeStatistics.(OrcProto.java:12925) > at > org.apache.hadoop.hive.ql.io.orc.OrcProto$StripeStatistics.(OrcProto.java:12872) > at > org.apache.hadoop.hive.ql.io.orc.OrcProto$StripeStatistics$1.parsePartialFrom(OrcProto.java:12961) > at > org.apache.hadoop.hive.ql.io.orc.OrcProto$StripeStatistics$1.parsePartialFrom(OrcProto.java:12956) > at > com.google.protobuf.CodedInputStream.readMessage(CodedInputStream.java:309) > at > org.apache.hadoop.hive.ql.io.orc.OrcProto$Metadata.(OrcProto.java:13599) > at > org.apache.hadoop.hive.ql.io.orc.OrcProto$Metadata.(OrcProto.java:13546) > at > org.apache.hadoop.hive.ql.io.orc.OrcProto$Metadata$1.parsePartialFrom(OrcProto.java:13635) > at > org.apache.hadoop.hive.ql.io.orc.OrcProto$Metadata$1.parsePartialFrom(OrcProto.java:13630) > at > com.google.protobuf.AbstractParser.parsePartialFrom(AbstractParser.java:200) > at > com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:217) > at > com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:223) > at > com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:49) > at > org.apache.hadoop.hive.ql.io.orc.OrcProto$Metadata.parseFrom(OrcProto.java:13746) > at > org.apache.hadoop.hive.ql.io.orc.ReaderImpl$MetaInfoObjExtractor.(ReaderImpl.java:468) > at > org.apache.hadoop.hive.ql.io.orc.ReaderImpl.(ReaderImpl.java:314) > at > org.apache.hadoop.hive.ql.io.orc.OrcFile.createReader(OrcFile.java:228) > at org.apache.hadoop.hive.ql.io.orc.FileDump.main(FileDump.java:67) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) > at >
[jira] [Resolved] (SPARK-21662) modify the appname to [SparkSQL::localHostName] instead of [SparkSQL::lP]
[ https://issues.apache.org/jira/browse/SPARK-21662?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sean Owen resolved SPARK-21662. --- Resolution: Not A Problem > modify the appname to [SparkSQL::localHostName] instead of [SparkSQL::lP] > - > > Key: SPARK-21662 > URL: https://issues.apache.org/jira/browse/SPARK-21662 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.2.0 >Reporter: liuzhaokun >Priority: Trivial > > As it says "If user doesn't specify the appName, we want to get > [SparkSQL::localHostName]" in SparkSqlEnv.scala,line 38,appname should be > SparkSQL::localHostName,but it is SparkSQL::lP in fact.So I modify the > localHostName method to get localhost with getCanonicalHostName method. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-21662) modify the appname to [SparkSQL::localHostName] instead of [SparkSQL::lP]
[ https://issues.apache.org/jira/browse/SPARK-21662?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sean Owen updated SPARK-21662: -- Priority: Trivial (was: Critical) > modify the appname to [SparkSQL::localHostName] instead of [SparkSQL::lP] > - > > Key: SPARK-21662 > URL: https://issues.apache.org/jira/browse/SPARK-21662 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.2.0 >Reporter: liuzhaokun >Priority: Trivial > > As it says "If user doesn't specify the appName, we want to get > [SparkSQL::localHostName]" in SparkSqlEnv.scala,line 38,appname should be > SparkSQL::localHostName,but it is SparkSQL::lP in fact.So I modify the > localHostName method to get localhost with getCanonicalHostName method. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-14540) Support Scala 2.12 closures and Java 8 lambdas in ClosureCleaner
[ https://issues.apache.org/jira/browse/SPARK-14540?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16119524#comment-16119524 ] Lukas Rytz edited comment on SPARK-14540 at 8/9/17 7:37 AM: [~joshrosen] the closure in your last example is serializable with 2.12.3. The anonymous class takes an outer parameter, but since https://github.com/scala/scala/pull/5099 the compiler implements an analysis to see that it's not used, and replaces the argument with {{null}}. The lambda body method then becomes static. Example code {code} class C { def foo(f: String => Object) = 0 def bar = { foo { x: Any => new Object{} } } } {code} {noformat} $> scalac -version Scala compiler version 2.12.3 -- Copyright 2002-2017, LAMP/EPFL and Lightbend, Inc. $> scalac Test.scala -Xprint:cleanup,delambdafy [[syntax trees at end of cleanup]] // Test.scala package { class C extends Object { def foo(f: Function1): Int = 0; def bar(): Int = C.this.foo({ ((x: Object) => C.this.$anonfun|$1(x)) }); final private[this] def $anonfun|$1(x: Object): Object = new <$anon: Object>(C.this); def (): C = { C.super.(); () } }; final class anon$1 extends Object { def ($outer: C): <$anon: Object> = { anon$1.super.(); () } } } [[syntax trees at end ofdelambdafy]] // Test.scala package { class C extends Object { def foo(f: Function1): Int = 0; def bar(): Int = C.this.foo({ $anonfun() }); final def $anonfun|$1(x: Object): Object = new <$anon: Object>(null); def (): C = { C.super.(); () } }; final class anon$1 extends Object { def ($outer: C): <$anon: Object> = { anon$1.super.(); () } } } $> javap -v -cp . C ... public static final java.lang.Object $anonfun$bar$1(java.lang.Object); descriptor: (Ljava/lang/Object;)Ljava/lang/Object; flags: ACC_PUBLIC, ACC_STATIC, ACC_FINAL, ACC_SYNTHETIC Code: stack=3, locals=1, args_size=1 0: new #10 // class C$$anon$1 3: dup 4: aconst_null 5: invokespecial #51 // Method C$$anon$1."":(LC;)V 8: areturn ... {noformat} was (Author: lrytz): [~joshrosen] the closure in your last example is serializable with 2.12.3. The anonymous class takes an outer parameter, but since https://github.com/scala/scala/pull/5099 the compiler implements an analysis to see that it's not used, and replaces the argument with {{null}}. Example code {code} class C { def foo(f: String => Object) = 0 def bar = { foo { x: Any => new Object{} } } } {code} {noformat} $> scalac -version Scala compiler version 2.12.3 -- Copyright 2002-2017, LAMP/EPFL and Lightbend, Inc. $> scalac Test.scala -Xprint:cleanup,delambdafy [[syntax trees at end of cleanup]] // Test.scala package { class C extends Object { def foo(f: Function1): Int = 0; def bar(): Int = C.this.foo({ ((x: Object) => C.this.$anonfun|$1(x)) }); final private[this] def $anonfun|$1(x: Object): Object = new <$anon: Object>(C.this); def (): C = { C.super.(); () } }; final class anon$1 extends Object { def ($outer: C): <$anon: Object> = { anon$1.super.(); () } } } [[syntax trees at end ofdelambdafy]] // Test.scala package { class C extends Object { def foo(f: Function1): Int = 0; def bar(): Int = C.this.foo({ $anonfun() }); final def $anonfun|$1(x: Object): Object = new <$anon: Object>(null); def (): C = { C.super.(); () } }; final class anon$1 extends Object { def ($outer: C): <$anon: Object> = { anon$1.super.(); () } } } $> javap -v -cp . C ... public static final java.lang.Object $anonfun$bar$1(java.lang.Object); descriptor: (Ljava/lang/Object;)Ljava/lang/Object; flags: ACC_PUBLIC, ACC_STATIC, ACC_FINAL, ACC_SYNTHETIC Code: stack=3, locals=1, args_size=1 0: new #10 // class C$$anon$1 3: dup 4: aconst_null 5: invokespecial #51 // Method C$$anon$1."":(LC;)V 8: areturn ... {noformat} > Support Scala 2.12 closures and Java 8 lambdas in ClosureCleaner > > > Key: SPARK-14540 > URL: https://issues.apache.org/jira/browse/SPARK-14540 > Project: Spark > Issue Type: Sub-task > Components: Spark Core >Reporter: Josh Rosen > > Using https://github.com/JoshRosen/spark/tree/build-for-2.12, I tried running > ClosureCleanerSuite with Scala 2.12 and ran into two bad test failures: > {code} > [info] - toplevel return statements in closures are identified at cleaning > time *** FAILED ***
[jira] [Comment Edited] (SPARK-14540) Support Scala 2.12 closures and Java 8 lambdas in ClosureCleaner
[ https://issues.apache.org/jira/browse/SPARK-14540?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16119524#comment-16119524 ] Lukas Rytz edited comment on SPARK-14540 at 8/9/17 7:37 AM: [~joshrosen] the closure in your last example is serializable with 2.12.3. The anonymous class takes an outer parameter, but since https://github.com/scala/scala/pull/5099 the compiler implements an analysis to see that it's not used, and replaces the argument with {{null}}. Example code {code} class C { def foo(f: String => Object) = 0 def bar = { foo { x: Any => new Object{} } } } {code} {noformat} $> scalac -version Scala compiler version 2.12.3 -- Copyright 2002-2017, LAMP/EPFL and Lightbend, Inc. $> scalac Test.scala -Xprint:cleanup,delambdafy [[syntax trees at end of cleanup]] // Test.scala package { class C extends Object { def foo(f: Function1): Int = 0; def bar(): Int = C.this.foo({ ((x: Object) => C.this.$anonfun|$1(x)) }); final private[this] def $anonfun|$1(x: Object): Object = new <$anon: Object>(C.this); def (): C = { C.super.(); () } }; final class anon$1 extends Object { def ($outer: C): <$anon: Object> = { anon$1.super.(); () } } } [[syntax trees at end ofdelambdafy]] // Test.scala package { class C extends Object { def foo(f: Function1): Int = 0; def bar(): Int = C.this.foo({ $anonfun() }); final def $anonfun|$1(x: Object): Object = new <$anon: Object>(null); def (): C = { C.super.(); () } }; final class anon$1 extends Object { def ($outer: C): <$anon: Object> = { anon$1.super.(); () } } } $> javap -v -cp . C ... public static final java.lang.Object $anonfun$bar$1(java.lang.Object); descriptor: (Ljava/lang/Object;)Ljava/lang/Object; flags: ACC_PUBLIC, ACC_STATIC, ACC_FINAL, ACC_SYNTHETIC Code: stack=3, locals=1, args_size=1 0: new #10 // class C$$anon$1 3: dup 4: aconst_null 5: invokespecial #51 // Method C$$anon$1."":(LC;)V 8: areturn ... {noformat} was (Author: lrytz): [~joshrosen] the closure in your last example is serializable with 2.12.3. The anonymous class takes an outer parameter, but since https://github.com/scala/scala/pull/5099 the compiler implements an analysis to see that it's not used, and replaces the argument with {{null}}. Example code {code} class C { def foo(f: String => Object) = 0 def bar = { foo { x: Any => new Object{} } } } {code} {noformat} $> scalac -version Scala compiler version 2.12.3 -- Copyright 2002-2017, LAMP/EPFL and Lightbend, Inc. $> scalac Test.scala -Xprint:cleanup,delambdafy [[syntax trees at end of cleanup]] // Test.scala package { class C extends Object { def foo(f: Function1): Int = 0; def bar(): Int = C.this.foo({ ((x: Object) => C.this.$anonfun|$1(x)) }); final private[this] def $anonfun|$1(x: Object): Object = new <$anon: Object>(C.this); def (): C = { C.super.(); () } }; final class anon$1 extends Object { def ($outer: C): <$anon: Object> = { anon$1.super.(); () } } } [[syntax trees at end ofdelambdafy]] // Test.scala package { class C extends Object { def foo(f: Function1): Int = 0; def bar(): Int = C.this.foo({ $anonfun() }); final def $anonfun|$1(x: Object): Object = new <$anon: Object>(null); def (): C = { C.super.(); () } }; final class anon$1 extends Object { def ($outer: C): <$anon: Object> = { anon$1.super.(); () } } } $> javap -v -cp . C ... public static final java.lang.Object $anonfun$bar$1(java.lang.Object); descriptor: (Ljava/lang/Object;)Ljava/lang/Object; flags: ACC_PUBLIC, ACC_STATIC, ACC_FINAL, ACC_SYNTHETIC Code: stack=3, locals=1, args_size=1 0: new #10 // class C$$anon$1 3: dup 4: aconst_null 5: invokespecial #51 // Method C$$anon$1."":(LC;)V 8: areturn ... {noformat} > Support Scala 2.12 closures and Java 8 lambdas in ClosureCleaner > > > Key: SPARK-14540 > URL: https://issues.apache.org/jira/browse/SPARK-14540 > Project: Spark > Issue Type: Sub-task > Components: Spark Core >Reporter: Josh Rosen > > Using https://github.com/JoshRosen/spark/tree/build-for-2.12, I tried running > ClosureCleanerSuite with Scala 2.12 and ran into two bad test failures: > {code} > [info] - toplevel return statements in closures are identified at cleaning > time *** FAILED *** (32 milliseconds) > [info] Expected
[jira] [Comment Edited] (SPARK-14540) Support Scala 2.12 closures and Java 8 lambdas in ClosureCleaner
[ https://issues.apache.org/jira/browse/SPARK-14540?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16119524#comment-16119524 ] Lukas Rytz edited comment on SPARK-14540 at 8/9/17 7:34 AM: [~joshrosen] the closure in your last example is serializable with 2.12.3. The anonymous class takes an outer parameter, but since https://github.com/scala/scala/pull/5099 the compiler implements an analysis to see that it's not used, and replaces the argument with {{null}}. Example code {code} class C { def foo(f: String => Object) = 0 def bar = { foo { x: Any => new Object{} } } } {code} {noformat} $> scalac -version Scala compiler version 2.12.3 -- Copyright 2002-2017, LAMP/EPFL and Lightbend, Inc. $> scalac Test.scala -Xprint:cleanup,delambdafy [[syntax trees at end of cleanup]] // Test.scala package { class C extends Object { def foo(f: Function1): Int = 0; def bar(): Int = C.this.foo({ ((x: Object) => C.this.$anonfun|$1(x)) }); final private[this] def $anonfun|$1(x: Object): Object = new <$anon: Object>(C.this); def (): C = { C.super.(); () } }; final class anon$1 extends Object { def ($outer: C): <$anon: Object> = { anon$1.super.(); () } } } [[syntax trees at end ofdelambdafy]] // Test.scala package { class C extends Object { def foo(f: Function1): Int = 0; def bar(): Int = C.this.foo({ $anonfun() }); final def $anonfun|$1(x: Object): Object = new <$anon: Object>(null); def (): C = { C.super.(); () } }; final class anon$1 extends Object { def ($outer: C): <$anon: Object> = { anon$1.super.(); () } } } $> javap -v -cp . C ... public static final java.lang.Object $anonfun$bar$1(java.lang.Object); descriptor: (Ljava/lang/Object;)Ljava/lang/Object; flags: ACC_PUBLIC, ACC_STATIC, ACC_FINAL, ACC_SYNTHETIC Code: stack=3, locals=1, args_size=1 0: new #10 // class C$$anon$1 3: dup 4: aconst_null 5: invokespecial #51 // Method C$$anon$1."":(LC;)V 8: areturn ... {noformat} was (Author: lrytz): [~joshrosen] the closure in your last example is serializable with 2.12.3. The anonymous class takes an outer parameter, but since https://github.com/scala/scala/pull/5099 the compiler implements an analysis to see that it's not used, and replaces the argument with {{null}}. Example code {code} class C { def foo(f: String => Object) = 0 def bar = { foo { x: Any => new Object{} } } } {code} {noformat} ➜ sandbox git:(backendRefactor) ✗ scalac -version Scala compiler version 2.12.3 -- Copyright 2002-2017, LAMP/EPFL and Lightbend, Inc. ➜ sandbox git:(backendRefactor) ✗ scalac Test.scala -Xprint:cleanup,delambdafy [[syntax trees at end of cleanup]] // Test.scala package { class C extends Object { def foo(f: Function1): Int = 0; def bar(): Int = C.this.foo({ ((x: Object) => C.this.$anonfun|$1(x)) }); final private[this] def $anonfun|$1(x: Object): Object = new <$anon: Object>(C.this); def (): C = { C.super.(); () } }; final class anon$1 extends Object { def ($outer: C): <$anon: Object> = { anon$1.super.(); () } } } [[syntax trees at end ofdelambdafy]] // Test.scala package { class C extends Object { def foo(f: Function1): Int = 0; def bar(): Int = C.this.foo({ $anonfun() }); final def $anonfun|$1(x: Object): Object = new <$anon: Object>(null); def (): C = { C.super.(); () } }; final class anon$1 extends Object { def ($outer: C): <$anon: Object> = { anon$1.super.(); () } } } {noformat} > Support Scala 2.12 closures and Java 8 lambdas in ClosureCleaner > > > Key: SPARK-14540 > URL: https://issues.apache.org/jira/browse/SPARK-14540 > Project: Spark > Issue Type: Sub-task > Components: Spark Core >Reporter: Josh Rosen > > Using https://github.com/JoshRosen/spark/tree/build-for-2.12, I tried running > ClosureCleanerSuite with Scala 2.12 and ran into two bad test failures: > {code} > [info] - toplevel return statements in closures are identified at cleaning > time *** FAILED *** (32 milliseconds) > [info] Expected exception > org.apache.spark.util.ReturnStatementInClosureException to be thrown, but no > exception was thrown. (ClosureCleanerSuite.scala:57) > {code} > and > {code} > [info] - user provided closures are actually cleaned *** FAILED *** (56 > milliseconds) > [info] Expected ReturnStatementInClosureException, but got > org.apache.spark.SparkException: Job aborted due to stage failure: Task not >
[jira] [Commented] (SPARK-14540) Support Scala 2.12 closures and Java 8 lambdas in ClosureCleaner
[ https://issues.apache.org/jira/browse/SPARK-14540?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16119524#comment-16119524 ] Lukas Rytz commented on SPARK-14540: [~joshrosen] the closure in your last example is serializable with 2.12.3. The anonymous class takes an outer parameter, but since https://github.com/scala/scala/pull/5099 the compiler implements an analysis to see that it's not used, and replaces the argument with {{null}}. Example code {code} class C { def foo(f: String => Object) = 0 def bar = { foo { x: Any => new Object{} } } } {code} {noformat} ➜ sandbox git:(backendRefactor) ✗ scalac -version Scala compiler version 2.12.3 -- Copyright 2002-2017, LAMP/EPFL and Lightbend, Inc. ➜ sandbox git:(backendRefactor) ✗ scalac Test.scala -Xprint:cleanup,delambdafy [[syntax trees at end of cleanup]] // Test.scala package { class C extends Object { def foo(f: Function1): Int = 0; def bar(): Int = C.this.foo({ ((x: Object) => C.this.$anonfun|$1(x)) }); final private[this] def $anonfun|$1(x: Object): Object = new <$anon: Object>(C.this); def (): C = { C.super.(); () } }; final class anon$1 extends Object { def ($outer: C): <$anon: Object> = { anon$1.super.(); () } } } [[syntax trees at end ofdelambdafy]] // Test.scala package { class C extends Object { def foo(f: Function1): Int = 0; def bar(): Int = C.this.foo({ $anonfun() }); final def $anonfun|$1(x: Object): Object = new <$anon: Object>(null); def (): C = { C.super.(); () } }; final class anon$1 extends Object { def ($outer: C): <$anon: Object> = { anon$1.super.(); () } } } {noformat} > Support Scala 2.12 closures and Java 8 lambdas in ClosureCleaner > > > Key: SPARK-14540 > URL: https://issues.apache.org/jira/browse/SPARK-14540 > Project: Spark > Issue Type: Sub-task > Components: Spark Core >Reporter: Josh Rosen > > Using https://github.com/JoshRosen/spark/tree/build-for-2.12, I tried running > ClosureCleanerSuite with Scala 2.12 and ran into two bad test failures: > {code} > [info] - toplevel return statements in closures are identified at cleaning > time *** FAILED *** (32 milliseconds) > [info] Expected exception > org.apache.spark.util.ReturnStatementInClosureException to be thrown, but no > exception was thrown. (ClosureCleanerSuite.scala:57) > {code} > and > {code} > [info] - user provided closures are actually cleaned *** FAILED *** (56 > milliseconds) > [info] Expected ReturnStatementInClosureException, but got > org.apache.spark.SparkException: Job aborted due to stage failure: Task not > serializable: java.io.NotSerializableException: java.lang.Object > [info]- element of array (index: 0) > [info]- array (class "[Ljava.lang.Object;", size: 1) > [info]- field (class "java.lang.invoke.SerializedLambda", name: > "capturedArgs", type: "class [Ljava.lang.Object;") > [info]- object (class "java.lang.invoke.SerializedLambda", > SerializedLambda[capturingClass=class > org.apache.spark.util.TestUserClosuresActuallyCleaned$, > functionalInterfaceMethod=scala/runtime/java8/JFunction1$mcII$sp.apply$mcII$sp:(I)I, > implementation=invokeStatic > org/apache/spark/util/TestUserClosuresActuallyCleaned$.org$apache$spark$util$TestUserClosuresActuallyCleaned$$$anonfun$69:(Ljava/lang/Object;I)I, > instantiatedMethodType=(I)I, numCaptured=1]) > [info]- element of array (index: 0) > [info]- array (class "[Ljava.lang.Object;", size: 1) > [info]- field (class "java.lang.invoke.SerializedLambda", name: > "capturedArgs", type: "class [Ljava.lang.Object;") > [info]- object (class "java.lang.invoke.SerializedLambda", > SerializedLambda[capturingClass=class org.apache.spark.rdd.RDD, > functionalInterfaceMethod=scala/Function3.apply:(Ljava/lang/Object;Ljava/lang/Object;Ljava/lang/Object;)Ljava/lang/Object;, > implementation=invokeStatic > org/apache/spark/rdd/RDD.org$apache$spark$rdd$RDD$$$anonfun$20$adapted:(Lscala/Function1;Lorg/apache/spark/TaskContext;Ljava/lang/Object;Lscala/collection/Iterator;)Lscala/collection/Iterator;, > > instantiatedMethodType=(Lorg/apache/spark/TaskContext;Ljava/lang/Object;Lscala/collection/Iterator;)Lscala/collection/Iterator;, > numCaptured=1]) > [info]- field (class "org.apache.spark.rdd.MapPartitionsRDD", name: > "f", type: "interface scala.Function3") > [info]- object (class "org.apache.spark.rdd.MapPartitionsRDD", > MapPartitionsRDD[2] at apply at Transformer.scala:22) > [info]- field (class "scala.Tuple2", name: "_1", type: "class > java.lang.Object") > [info]- root object (class "scala.Tuple2",
[jira] [Commented] (SPARK-21651) Detect MapType in Json InferSchema
[ https://issues.apache.org/jira/browse/SPARK-21651?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16119501#comment-16119501 ] Jochen Niebuhr commented on SPARK-21651: Specifying the Schema myself would mean I'll have to change it every time a new field appears. With the current implementation you could write some schema to JSON with spark and it'll read a different schema or not be able to read it at all if you're using Maps. We could add some flag which activates this feature but I think this might be helpful for some people. > Detect MapType in Json InferSchema > -- > > Key: SPARK-21651 > URL: https://issues.apache.org/jira/browse/SPARK-21651 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.1.0, 2.1.1, 2.2.0 >Reporter: Jochen Niebuhr >Priority: Minor > > When loading Json Files which include a map with very variable keys, the > current schema infer logic might create a very large schema. This will lead > to long load times and possibly out of memory errors. > I've already submitted a pull request to the mongo spark driver which had the > same problem. Should I port this logic over to the json schema infer class? > The MongoDB Spark pull request mentioned is: > https://github.com/mongodb/mongo-spark/pull/24 -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-21596) Audit the places calling HDFSMetadataLog.get
[ https://issues.apache.org/jira/browse/SPARK-21596?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shixiong Zhu resolved SPARK-21596. -- Resolution: Fixed Assignee: Shixiong Zhu Fix Version/s: 2.3.0 2.2.1 > Audit the places calling HDFSMetadataLog.get > > > Key: SPARK-21596 > URL: https://issues.apache.org/jira/browse/SPARK-21596 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Affects Versions: 2.1.1 >Reporter: Shixiong Zhu >Assignee: Shixiong Zhu > Fix For: 2.2.1, 2.3.0 > > > When I was investigating a flaky test, I realized that many places don't > check the return value of `HDFSMetadataLog.get(batchId: Long): Option[T]`. > When a batch is supposed to be there, the caller just ignores None rather > than throwing an error. If some bug causes a query doesn't generate a batch > metadata file, this behavior will hide it and allow the query continuing to > run. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-21523) Fix bug of strong wolfe linesearch `init` parameter lose effectiveness
[ https://issues.apache.org/jira/browse/SPARK-21523?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yanbo Liang resolved SPARK-21523. - Resolution: Fixed Assignee: Weichen Xu Fix Version/s: 2.3.0 2.2.1 > Fix bug of strong wolfe linesearch `init` parameter lose effectiveness > -- > > Key: SPARK-21523 > URL: https://issues.apache.org/jira/browse/SPARK-21523 > Project: Spark > Issue Type: Bug > Components: MLlib >Affects Versions: 2.2.0 >Reporter: Weichen Xu >Assignee: Weichen Xu >Priority: Critical > Fix For: 2.2.1, 2.3.0 > > Original Estimate: 24h > Remaining Estimate: 24h > > We need merge this breeze bugfix into spark because it influence a series of > algos in MLlib which use LBFGS. > https://github.com/scalanlp/breeze/pull/651 -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-21651) Detect MapType in Json InferSchema
[ https://issues.apache.org/jira/browse/SPARK-21651?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16119484#comment-16119484 ] Takeshi Yamamuro commented on SPARK-21651: -- Specifying a schema by yourself is not enough for your case? {code} json data: {"id":"0001", "relations": {"r1": {"x": "7", "z": "1"}, "r2": {"y": "8"} } } scala> spark.read.json("xxx").printSchema root |-- id: string (nullable = true) |-- value: struct (nullable = true) ||-- a: struct (nullable = true) |||-- x: string (nullable = true) |||-- z: string (nullable = true) ||-- b: struct (nullable = true) |||-- y: string (nullable = true) scala> spark.read.schema("id STRING, relations MAP").json("xxx").printSchema root |-- id: string (nullable = true) |-- value: map (nullable = true) ||-- key: string ||-- value: struct (valueContainsNull = true) |||-- x: string (nullable = true) |||-- y: string (nullable = true) |||-- z: string (nullable = true) {code} I feel this optimization is a little domain-specific and any other runtime supporting this kind of optimization? > Detect MapType in Json InferSchema > -- > > Key: SPARK-21651 > URL: https://issues.apache.org/jira/browse/SPARK-21651 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.1.0, 2.1.1, 2.2.0 >Reporter: Jochen Niebuhr >Priority: Minor > > When loading Json Files which include a map with very variable keys, the > current schema infer logic might create a very large schema. This will lead > to long load times and possibly out of memory errors. > I've already submitted a pull request to the mongo spark driver which had the > same problem. Should I port this logic over to the json schema infer class? > The MongoDB Spark pull request mentioned is: > https://github.com/mongodb/mongo-spark/pull/24 -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-21651) Detect MapType in Json InferSchema
[ https://issues.apache.org/jira/browse/SPARK-21651?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16119450#comment-16119450 ] Jochen Niebuhr edited comment on SPARK-21651 at 8/9/17 6:03 AM: Ok, here's an example: If you have an entity in json objects which stores some relations to other entites, it might look like this: {code} { "id": "06d32281-db4d-4d47-911a-c0b59cc0ed26", "relations": { "38401db2-1036-499f-b21e-e9be532cddb2": { /* ... some relation content ... */ }, "1cbb297c-cec8-4288-9edc-9d4b5dad3eec": { /* ... */ } } } { "id": "38401db2-1036-499f-b21e-e9be532cddb2", "relations": { "06d32281-db4d-4d47-911a-c0b59cc0ed26": { /* ... */ }, "1cbb297c-cec8-4288-9edc-9d4b5dad3eec": { /* ... */ } } } { "id": "1cbb297c-cec8-4288-9edc-9d4b5dad3eec", "relations": { "06d32281-db4d-4d47-911a-c0b59cc0ed26": { /* ... */ }, "38401db2-1036-499f-b21e-e9be532cddb2": { /* ... */ } } } {code} If I'm putting that JSON through the JSON Infer Schema step, it will generate a schema like this: {code}Struct, 1cbb297c-cec8-4288-9edc-9d4b5dad3eec: Struct<>, 06d32281-db4d-4d47-911a-c0b59cc0ed26: Struct<>>>{code} If I do this with a sample of 100.000 documents, the schema will become very large and probably crash my job or at least take forever. But since everything in relations shares the same Key and Value types, I could just say relations is a MapType. My schema wouldn't grow as large and I could simply query it. So the expected schema would be: {code}Struct>>{code} In the version I implemented in the MongoDB driver this behavior has the following requirements: * Over 250 keys in a single Struct * All Value Types are compatible was (Author: jniebuhr): If you have an entity in json objects which stores some relations to other entites, it might look like this: {code} { "id": "06d32281-db4d-4d47-911a-c0b59cc0ed26", "relations": { "38401db2-1036-499f-b21e-e9be532cddb2": { /* ... some relation content ... */ }, "1cbb297c-cec8-4288-9edc-9d4b5dad3eec": { /* ... */ } } } { "id": "38401db2-1036-499f-b21e-e9be532cddb2", "relations": { "06d32281-db4d-4d47-911a-c0b59cc0ed26": { /* ... */ }, "1cbb297c-cec8-4288-9edc-9d4b5dad3eec": { /* ... */ } } } { "id": "1cbb297c-cec8-4288-9edc-9d4b5dad3eec", "relations": { "06d32281-db4d-4d47-911a-c0b59cc0ed26": { /* ... */ }, "38401db2-1036-499f-b21e-e9be532cddb2": { /* ... */ } } } {code} If I'm putting that JSON through the JSON Infer Schema step, it will generate a schema like this: {code}Struct, 1cbb297c-cec8-4288-9edc-9d4b5dad3eec: Struct<>, 06d32281-db4d-4d47-911a-c0b59cc0ed26: Struct<>>>{code} If I do this with a sample of 100.000 documents, the schema will become very large and probably crash my job or at least take forever. But since everything in relations shares the same Key and Value types, I could just say relations is a MapType. My schema wouldn't grow as large and I could simply query it. So the expected schema would be: {code}Struct>>{code} In the version I implemented in the MongoDB driver this behavior has the following requirements: * Over 250 keys in a single Struct * All Value Types are compatible > Detect MapType in Json InferSchema > -- > > Key: SPARK-21651 > URL: https://issues.apache.org/jira/browse/SPARK-21651 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.1.0, 2.1.1, 2.2.0 >Reporter: Jochen Niebuhr >Priority: Minor > > When loading Json Files which include a map with very variable keys, the > current schema infer logic might create a very large schema. This will lead > to long load times and possibly out of memory errors. > I've already submitted a pull request to the mongo spark driver which had the > same problem. Should I port this logic over to the json schema infer class? > The MongoDB Spark pull request mentioned is: > https://github.com/mongodb/mongo-spark/pull/24 -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org