[jira] [Updated] (SPARK-18621) PySQL SQL Types (aka Dataframa Schema) have __repr__() with Scala and not Python representation
[ https://issues.apache.org/jira/browse/SPARK-18621?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Romi Kuntsman updated SPARK-18621: -- Description: When using Python's repr() on an object, the expected result is a string that Python can evaluate to construct the object. See: https://docs.python.org/2/library/functions.html#func-repr However, when getting a DataFrame schema in PySpark, the code (in "__repr()__" overload methods) returns the string representation for Scala, rather than for Python. Relevant code in PySpark: https://github.com/apache/spark/blob/5f02d2e5b4d37f554629cbd0e488e856fffd7b6b/python/pyspark/sql/types.py#L442 Python Code: {code} # 1. define object struct1 = StructType([StructField("f1", StringType(), True)]) # 2. print representation, expected to be like above print(repr(struct1)) # 3. actual result: # StructType(List(StructField(f1,StringType,true))) # 4. try to use result in code struct2 = StructType(List(StructField(f1,StringType,true))) # 5. get bunch of errors # Unresolved reference 'List' # Unresolved reference 'f1' # StringType is class, not constructed object # Unresolved reference 'true' {code} was: When using Python's repr() on an object, the expected result is a string that Python can evaluate to construct the object. See: https://docs.python.org/2/library/functions.html#func-repr However, when getting a DataFrame schema in PySpark, the code (in "__repr()__" overload methods) returns the string representation for Scala, rather than for Python. Relevant code in PySpark: https://github.com/apache/spark/blob/5f02d2e5b4d37f554629cbd0e488e856fffd7b6b/python/pyspark/sql/types.py#L442 Python Code: # 1. define object struct1 = StructType([StructField("f1", StringType(), True)]) # 2. print representation, expected to be like above print(repr(struct1)) # 3. actual result: # StructType(List(StructField(f1,StringType,true))) # 4. try to use result in code struct2 = StructType(List(StructField(f1,StringType,true))) # 5. get bunch of errors # Unresolved reference 'List' # Unresolved reference 'f1' # StringType is class, not constructed object # Unresolved reference 'true' > PySQL SQL Types (aka Dataframa Schema) have __repr__() with Scala and not > Python representation > --- > > Key: SPARK-18621 > URL: https://issues.apache.org/jira/browse/SPARK-18621 > Project: Spark > Issue Type: Bug > Components: PySpark >Affects Versions: 1.6.2, 2.0.2 >Reporter: Romi Kuntsman >Priority: Minor > > When using Python's repr() on an object, the expected result is a string that > Python can evaluate to construct the object. > See: https://docs.python.org/2/library/functions.html#func-repr > However, when getting a DataFrame schema in PySpark, the code (in > "__repr()__" overload methods) returns the string representation for Scala, > rather than for Python. > Relevant code in PySpark: > https://github.com/apache/spark/blob/5f02d2e5b4d37f554629cbd0e488e856fffd7b6b/python/pyspark/sql/types.py#L442 > Python Code: > {code} > # 1. define object > struct1 = StructType([StructField("f1", StringType(), True)]) > # 2. print representation, expected to be like above > print(repr(struct1)) > # 3. actual result: > # StructType(List(StructField(f1,StringType,true))) > # 4. try to use result in code > struct2 = StructType(List(StructField(f1,StringType,true))) > # 5. get bunch of errors > # Unresolved reference 'List' > # Unresolved reference 'f1' > # StringType is class, not constructed object > # Unresolved reference 'true' > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-18621) PySQL SQL Types (aka Dataframa Schema) have __repr__() with Scala and not Python representation
Romi Kuntsman created SPARK-18621: - Summary: PySQL SQL Types (aka Dataframa Schema) have __repr__() with Scala and not Python representation Key: SPARK-18621 URL: https://issues.apache.org/jira/browse/SPARK-18621 Project: Spark Issue Type: Bug Components: PySpark Affects Versions: 2.0.2, 1.6.2 Reporter: Romi Kuntsman Priority: Minor When using Python's repr() on an object, the expected result is a string that Python can evaluate to construct the object. See: https://docs.python.org/2/library/functions.html#func-repr However, when getting a DataFrame schema in PySpark, the code (in "__repr()__" overload methods) returns the string representation for Scala, rather than for Python. Relevant code in PySpark: https://github.com/apache/spark/blob/5f02d2e5b4d37f554629cbd0e488e856fffd7b6b/python/pyspark/sql/types.py#L442 Python Code: # 1. define object struct1 = StructType([StructField("f1", StringType(), True)]) # 2. print representation, expected to be like above print(repr(struct1)) # 3. actual result: # StructType(List(StructField(f1,StringType,true))) # 4. try to use result in code struct2 = StructType(List(StructField(f1,StringType,true))) # 5. get bunch of errors # Unresolved reference 'List' # Unresolved reference 'f1' # StringType is class, not constructed object # Unresolved reference 'true' -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-4452) Shuffle data structures can starve others on the same thread for memory
[ https://issues.apache.org/jira/browse/SPARK-4452?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15255658#comment-15255658 ] Romi Kuntsman commented on SPARK-4452: -- Hi, what's the reason this will only be available in Spark 2.0.0, and not 1.6.4 or 1.7.0? > Shuffle data structures can starve others on the same thread for memory > > > Key: SPARK-4452 > URL: https://issues.apache.org/jira/browse/SPARK-4452 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 1.1.0 >Reporter: Tianshuo Deng >Assignee: Tianshuo Deng > Fix For: 2.0.0 > > > When an Aggregator is used with ExternalSorter in a task, spark will create > many small files and could cause too many files open error during merging. > Currently, ShuffleMemoryManager does not work well when there are 2 spillable > objects in a thread, which are ExternalSorter and ExternalAppendOnlyMap(used > by Aggregator) in this case. Here is an example: Due to the usage of mapside > aggregation, ExternalAppendOnlyMap is created first to read the RDD. It may > ask as much memory as it can, which is totalMem/numberOfThreads. Then later > on when ExternalSorter is created in the same thread, the > ShuffleMemoryManager could refuse to allocate more memory to it, since the > memory is already given to the previous requested > object(ExternalAppendOnlyMap). That causes the ExternalSorter keeps spilling > small files(due to the lack of memory) > I'm currently working on a PR to address these two issues. It will include > following changes: > 1. The ShuffleMemoryManager should not only track the memory usage for each > thread, but also the object who holds the memory > 2. The ShuffleMemoryManager should be able to trigger the spilling of a > spillable object. In this way, if a new object in a thread is requesting > memory, the old occupant could be evicted/spilled. Previously the spillable > objects trigger spilling by themselves. So one may not trigger spilling even > if another object in the same thread needs more memory. After this change The > ShuffleMemoryManager could trigger the spilling of an object if it needs to. > 3. Make the iterator of ExternalAppendOnlyMap spillable. Previously > ExternalAppendOnlyMap returns an destructive iterator and can not be spilled > after the iterator is returned. This should be changed so that even after the > iterator is returned, the ShuffleMemoryManager can still spill it. > Currently, I have a working branch in progress: > https://github.com/tsdeng/spark/tree/enhance_memory_manager. Already made > change 3 and have a prototype of change 1 and 2 to evict spillable from > memory manager, still in progress. I will send a PR when it's done. > Any feedback or thoughts on this change is highly appreciated ! -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-11293) Spillable collections leak shuffle memory
[ https://issues.apache.org/jira/browse/SPARK-11293?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15098008#comment-15098008 ] Romi Kuntsman commented on SPARK-11293: --- so add 1.6.0 as affected version... > Spillable collections leak shuffle memory > - > > Key: SPARK-11293 > URL: https://issues.apache.org/jira/browse/SPARK-11293 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 1.3.1, 1.4.1, 1.5.1 >Reporter: Josh Rosen >Assignee: Josh Rosen >Priority: Critical > Fix For: 1.6.0 > > > I discovered multiple leaks of shuffle memory while working on my memory > manager consolidation patch, which added the ability to do strict memory leak > detection for the bookkeeping that used to be performed by the > ShuffleMemoryManager. This uncovered a handful of places where tasks can > acquire execution/shuffle memory but never release it, starving themselves of > memory. > Problems that I found: > * {{ExternalSorter.stop()}} should release the sorter's shuffle/execution > memory. > * BlockStoreShuffleReader should call {{ExternalSorter.stop()}} using a > {{CompletionIterator}}. > * {{ExternalAppendOnlyMap}} exposes no equivalent of {{stop()}} for freeing > its resources. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-11293) Spillable collections leak shuffle memory
[ https://issues.apache.org/jira/browse/SPARK-11293?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15096375#comment-15096375 ] Romi Kuntsman commented on SPARK-11293: --- so should be reopened or not? is there still a memory leak? is there a new memory leak instead of the old one? > Spillable collections leak shuffle memory > - > > Key: SPARK-11293 > URL: https://issues.apache.org/jira/browse/SPARK-11293 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 1.3.1, 1.4.1, 1.5.1 >Reporter: Josh Rosen >Assignee: Josh Rosen >Priority: Critical > Fix For: 1.6.0 > > > I discovered multiple leaks of shuffle memory while working on my memory > manager consolidation patch, which added the ability to do strict memory leak > detection for the bookkeeping that used to be performed by the > ShuffleMemoryManager. This uncovered a handful of places where tasks can > acquire execution/shuffle memory but never release it, starving themselves of > memory. > Problems that I found: > * {{ExternalSorter.stop()}} should release the sorter's shuffle/execution > memory. > * BlockStoreShuffleReader should call {{ExternalSorter.stop()}} using a > {{CompletionIterator}}. > * {{ExternalAppendOnlyMap}} exposes no equivalent of {{stop()}} for freeing > its resources. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-3665) Java API for GraphX
[ https://issues.apache.org/jira/browse/SPARK-3665?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15085452#comment-15085452 ] Romi Kuntsman commented on SPARK-3665: -- So at what version of Spark is it expected to happen? > Java API for GraphX > --- > > Key: SPARK-3665 > URL: https://issues.apache.org/jira/browse/SPARK-3665 > Project: Spark > Issue Type: Improvement > Components: GraphX, Java API >Affects Versions: 1.0.0 >Reporter: Ankur Dave >Assignee: Ankur Dave > > The Java API will wrap the Scala API in a similar manner as JavaRDD. > Components will include: > # JavaGraph > #- removes optional param from persist, subgraph, mapReduceTriplets, > Graph.fromEdgeTuples, Graph.fromEdges, Graph.apply > #- removes implicit {{=:=}} param from mapVertices, outerJoinVertices > #- merges multiple parameters lists > #- incorporates GraphOps > # JavaVertexRDD > # JavaEdgeRDD -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Issue Comment Deleted] (SPARK-3665) Java API for GraphX
[ https://issues.apache.org/jira/browse/SPARK-3665?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Romi Kuntsman updated SPARK-3665: - Comment: was deleted (was: So at what version of Spark is it expected to happen?) > Java API for GraphX > --- > > Key: SPARK-3665 > URL: https://issues.apache.org/jira/browse/SPARK-3665 > Project: Spark > Issue Type: Improvement > Components: GraphX, Java API >Affects Versions: 1.0.0 >Reporter: Ankur Dave >Assignee: Ankur Dave > > The Java API will wrap the Scala API in a similar manner as JavaRDD. > Components will include: > # JavaGraph > #- removes optional param from persist, subgraph, mapReduceTriplets, > Graph.fromEdgeTuples, Graph.fromEdges, Graph.apply > #- removes implicit {{=:=}} param from mapVertices, outerJoinVertices > #- merges multiple parameters lists > #- incorporates GraphOps > # JavaVertexRDD > # JavaEdgeRDD -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-3665) Java API for GraphX
[ https://issues.apache.org/jira/browse/SPARK-3665?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15085454#comment-15085454 ] Romi Kuntsman commented on SPARK-3665: -- So at what version of Spark is it expected to happen? > Java API for GraphX > --- > > Key: SPARK-3665 > URL: https://issues.apache.org/jira/browse/SPARK-3665 > Project: Spark > Issue Type: Improvement > Components: GraphX, Java API >Affects Versions: 1.0.0 >Reporter: Ankur Dave >Assignee: Ankur Dave > > The Java API will wrap the Scala API in a similar manner as JavaRDD. > Components will include: > # JavaGraph > #- removes optional param from persist, subgraph, mapReduceTriplets, > Graph.fromEdgeTuples, Graph.fromEdges, Graph.apply > #- removes implicit {{=:=}} param from mapVertices, outerJoinVertices > #- merges multiple parameters lists > #- incorporates GraphOps > # JavaVertexRDD > # JavaEdgeRDD -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-11293) Spillable collections leak shuffle memory
[ https://issues.apache.org/jira/browse/SPARK-11293?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15008738#comment-15008738 ] Romi Kuntsman commented on SPARK-11293: --- The memory manager was rewritten there? Could it have introduced a memory leak in a different place or of a different kind? Is there a regression test to verify? > Spillable collections leak shuffle memory > - > > Key: SPARK-11293 > URL: https://issues.apache.org/jira/browse/SPARK-11293 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 1.3.1, 1.4.1, 1.5.1 >Reporter: Josh Rosen >Assignee: Josh Rosen >Priority: Critical > Fix For: 1.6.0 > > > I discovered multiple leaks of shuffle memory while working on my memory > manager consolidation patch, which added the ability to do strict memory leak > detection for the bookkeeping that used to be performed by the > ShuffleMemoryManager. This uncovered a handful of places where tasks can > acquire execution/shuffle memory but never release it, starving themselves of > memory. > Problems that I found: > * {{ExternalSorter.stop()}} should release the sorter's shuffle/execution > memory. > * BlockStoreShuffleReader should call {{ExternalSorter.stop()}} using a > {{CompletionIterator}}. > * {{ExternalAppendOnlyMap}} exposes no equivalent of {{stop()}} for freeing > its resources. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-6962) Netty BlockTransferService hangs in the middle of SQL query
[ https://issues.apache.org/jira/browse/SPARK-6962?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15005851#comment-15005851 ] Romi Kuntsman commented on SPARK-6962: -- what's the status of this? something similar happens to me in 1.4.0 and also in 1.5.1 the job hangs forever with the largest shuffle when increasing the number of partitions (as a function of the data size), the issue is fixed > Netty BlockTransferService hangs in the middle of SQL query > --- > > Key: SPARK-6962 > URL: https://issues.apache.org/jira/browse/SPARK-6962 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 1.2.0, 1.2.1, 1.3.0 >Reporter: Jon Chase > Attachments: jstacks.txt > > > Spark SQL queries (though this seems to be a Spark Core issue - I'm just > using queries in the REPL to surface this, so I mention Spark SQL) hang > indefinitely under certain (not totally understood) circumstances. > This is resolved by setting spark.shuffle.blockTransferService=nio, which > seems to point to netty as the issue. Netty was set as the default for the > block transport layer in 1.2.0, which is when this issue started. Setting > the service to nio allows queries to complete normally. > I do not see this problem when running queries over smaller (~20 5MB files) > datasets. When I increase the scope to include more data (several hundred > ~5MB files), the queries will get through several steps but eventuall hang > indefinitely. > Here's the email chain regarding this issue, including stack traces: > http://mail-archives.apache.org/mod_mbox/spark-user/201503.mbox/> For context, here's the announcement regarding the block transfer service > change: > http://mail-archives.apache.org/mod_mbox/spark-dev/201411.mbox/ -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-3767) Support wildcard in Spark properties
[ https://issues.apache.org/jira/browse/SPARK-3767?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14996389#comment-14996389 ] Romi Kuntsman commented on SPARK-3767: -- [~andrewor14] what's going on with this issue? I found a link to it here: http://mail-archives.us.apache.org/mod_mbox/spark-user/201410.mbox/%3ccamjob8kg3_jhh_8ibnx04wya-fi7aeghs+fh1nalmembac7...@mail.gmail.com%3E I want to connect to JMX in Executors, which means each one needs a different port number, is it possible? > Support wildcard in Spark properties > > > Key: SPARK-3767 > URL: https://issues.apache.org/jira/browse/SPARK-3767 > Project: Spark > Issue Type: New Feature > Components: Spark Core >Affects Versions: 1.1.0 >Reporter: Andrew Or > > If the user sets spark.executor.extraJavaOptions, he/she may want to express > the value in terms of the executor ID, for instance. In general it would be a > feature that many will find useful. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-11229) NPE in JoinedRow.isNullAt when spark.shuffle.memoryFraction=0
[ https://issues.apache.org/jira/browse/SPARK-11229?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14968662#comment-14968662 ] Romi Kuntsman commented on SPARK-11229: --- [~marmbrus] it's reproducible in 1.5.1 as [~xwu0226] confirmed, shouldn't it be marked as "fixed in 1.6.0" instead of "cannot reproduce"? > NPE in JoinedRow.isNullAt when spark.shuffle.memoryFraction=0 > - > > Key: SPARK-11229 > URL: https://issues.apache.org/jira/browse/SPARK-11229 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 1.5.1 > Environment: 14.04.1-Ubuntu SMP x86_64 GNU/Linux >Reporter: Romi Kuntsman > > Steps to reproduce: > 1. set spark.shuffle.memoryFraction=0 > 2. load dataframe from parquet file > 3. see it's read correctly by calling dataframe.show() > 4. call dataframe.count() > Expected behaviour: > get count of rows in dataframe > OR, if memoryFraction=0 is an invalid setting, get notified about it > Actual behaviour: > CatalystReadSupport doesn't read the schema (even thought there is one) and > then there's a NullPointerException. > Driver stacktrace: > at > org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1283) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1271) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1270) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) > at > org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1270) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697) > at scala.Option.foreach(Option.scala:236) > at > org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:697) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1496) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1458) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1447) > at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) > at > org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:567) > at org.apache.spark.SparkContext.runJob(SparkContext.scala:1822) > at org.apache.spark.SparkContext.runJob(SparkContext.scala:1835) > at org.apache.spark.SparkContext.runJob(SparkContext.scala:1848) > at org.apache.spark.SparkContext.runJob(SparkContext.scala:1919) > at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:905) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108) > at org.apache.spark.rdd.RDD.withScope(RDD.scala:306) > at org.apache.spark.rdd.RDD.collect(RDD.scala:904) > at > org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:177) > at > org.apache.spark.sql.DataFrame$$anonfun$collect$1.apply(DataFrame.scala:1385) > at > org.apache.spark.sql.DataFrame$$anonfun$collect$1.apply(DataFrame.scala:1385) > at > org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:56) > at > org.apache.spark.sql.DataFrame.withNewExecutionId(DataFrame.scala:1903) > at org.apache.spark.sql.DataFrame.collect(DataFrame.scala:1384) > at org.apache.spark.sql.DataFrame.count(DataFrame.scala:1402) > ... 14 more > Caused by: java.lang.NullPointerException > at > org.apache.spark.sql.catalyst.expressions.JoinedRow.isNullAt(JoinedRow.scala:70) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificMutableProjection.apply(Unknown > Source) > at > org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator$$anonfun$generateProcessRow$1.apply(TungstenAggregationIterator.scala:194) > at > org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator$$anonfun$generateProcessRow$1.apply(TungstenAggregationIterator.scala:192) > at > org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.processInputs(TungstenAggregationIterator.scala:368) > at > org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.start(TungstenAggregationIterator.scala:622) > at >
[jira] [Commented] (SPARK-7335) Submitting a query to Thrift Server occurs error: java.lang.IllegalStateException: unread block data
[ https://issues.apache.org/jira/browse/SPARK-7335?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14966490#comment-14966490 ] Romi Kuntsman commented on SPARK-7335: -- [~meiyoula] can you please reopen the issue? I got this error as well, and wasted a lot of time realizing it was actually a ClassNotFoundException due to a missing jar - it doesn't say that anywhere in the exception. When the class to be serialized/deserialized is not found, then the exception should explicitly say so. When the message says "unread block data" it's very confusing. Here's a reference from another project who got the same error and many people spent time to realize it was a missing jar: https://github.com/tuplejump/calliope-release/issues/6 > Submitting a query to Thrift Server occurs error: > java.lang.IllegalStateException: unread block data > > > Key: SPARK-7335 > URL: https://issues.apache.org/jira/browse/SPARK-7335 > Project: Spark > Issue Type: Bug > Components: SQL >Reporter: meiyoula >Priority: Critical > > java.lang.IllegalStateException: unread block data > at > java.io.ObjectInputStream$BlockDataInputStream.setBlockDataMode(ObjectInputStream.java:2421) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1382) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) > at > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) > at > org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62) > at > org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:87) > at > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:163) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > at java.lang.Thread.run(Thread.java:745) -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-11229) NPE in JoinedRow.isNullAt when spark.shuffle.memoryFraction=0
Romi Kuntsman created SPARK-11229: - Summary: NPE in JoinedRow.isNullAt when spark.shuffle.memoryFraction=0 Key: SPARK-11229 URL: https://issues.apache.org/jira/browse/SPARK-11229 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 1.5.1 Environment: 14.04.1-Ubuntu SMP x86_64 GNU/Linux Reporter: Romi Kuntsman Steps to reproduce: 1. set spark.shuffle.memoryFraction=0 2. load dataframe from parquet file 3. see it's read correctly by calling dataframe.show() 4. call dataframe.count() Expected behaviour: get count of rows in dataframe OR, if memoryFraction=0 is an invalid setting, get notified about it Actual behaviour: CatalystReadSupport doesn't read the schema (even thought there is one) and then there's a NullPointerException. Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1283) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1271) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1270) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1270) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:697) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1496) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1458) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1447) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:567) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1822) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1835) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1848) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1919) at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:905) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108) at org.apache.spark.rdd.RDD.withScope(RDD.scala:306) at org.apache.spark.rdd.RDD.collect(RDD.scala:904) at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:177) at org.apache.spark.sql.DataFrame$$anonfun$collect$1.apply(DataFrame.scala:1385) at org.apache.spark.sql.DataFrame$$anonfun$collect$1.apply(DataFrame.scala:1385) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:56) at org.apache.spark.sql.DataFrame.withNewExecutionId(DataFrame.scala:1903) at org.apache.spark.sql.DataFrame.collect(DataFrame.scala:1384) at org.apache.spark.sql.DataFrame.count(DataFrame.scala:1402) ... 14 more Caused by: java.lang.NullPointerException at org.apache.spark.sql.catalyst.expressions.JoinedRow.isNullAt(JoinedRow.scala:70) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificMutableProjection.apply(Unknown Source) at org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator$$anonfun$generateProcessRow$1.apply(TungstenAggregationIterator.scala:194) at org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator$$anonfun$generateProcessRow$1.apply(TungstenAggregationIterator.scala:192) at org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.processInputs(TungstenAggregationIterator.scala:368) at org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.start(TungstenAggregationIterator.scala:622) at org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1.org$apache$spark$sql$execution$aggregate$TungstenAggregate$$anonfun$$executePartition$1(TungstenAggregate.scala:110) at org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1$$anonfun$2.apply(TungstenAggregate.scala:119) at org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1$$anonfun$2.apply(TungstenAggregate.scala:119) at
[jira] [Commented] (SPARK-11153) Turns off Parquet filter push-down for string and binary columns
[ https://issues.apache.org/jira/browse/SPARK-11153?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14966306#comment-14966306 ] Romi Kuntsman commented on SPARK-11153: --- Does this mean that all Spark 1.5.1 are recommended to set spark.sql.parquet.filterPushdown to false? > Turns off Parquet filter push-down for string and binary columns > > > Key: SPARK-11153 > URL: https://issues.apache.org/jira/browse/SPARK-11153 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 1.5.0, 1.5.1 >Reporter: Cheng Lian >Assignee: Cheng Lian >Priority: Blocker > Fix For: 1.5.2, 1.6.0 > > > Due to PARQUET-251, {{BINARY}} columns in existing Parquet files may be > written with corrupted statistics information. This information is used by > filter push-down optimization. Since Spark 1.5 turns on Parquet filter > push-down by default, we may end up with wrong query results. PARQUET-251 has > been fixed in parquet-mr 1.8.1, but Spark 1.5 is still using 1.7.0. > Note that this kind of corrupted Parquet files could be produced by any > Parquet data models. > This affects all Spark SQL data types that can be mapped to Parquet > {{BINARY}}, namely: > - {{StringType}} > - {{BinaryType}} > - {{DecimalType}} (but Spark SQL doesn't support pushing down {{DecimalType}} > columns for now.) > To avoid wrong query results, we should disable filter push-down for columns > of {{StringType}} and {{BinaryType}} until we upgrade to parquet-mr 1.8. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-11228) Job stuck in Executor failure loop when NettyTransport failed to bind
Romi Kuntsman created SPARK-11228: - Summary: Job stuck in Executor failure loop when NettyTransport failed to bind Key: SPARK-11228 URL: https://issues.apache.org/jira/browse/SPARK-11228 Project: Spark Issue Type: Bug Components: Scheduler Affects Versions: 1.5.1 Environment: 14.04.1-Ubuntu SMP x86_64 GNU/Linux Reporter: Romi Kuntsman I changed my network connection while a local spark cluster is running. In port 8080, I see the master and worker running. I'm running Spark in Java in client mode, so the driver is running inside my IDE. When trying to start a job on the local spark cluster, I get an endless loop of the errors below at #1. It only stops when I kill the application manually. When looking at the worker log, I see an endless loop of the errors below at #2. Expected behaviour would be failing the job after a few failed retries / timeout. (IP anonymized to 1.2.3.4) 1. Errors see on driver: 2015-10-21 11:20:54,793 INFO [org.apache.spark.scheduler.TaskSchedulerImpl] Adding task set 0.0 with 2 tasks 2015-10-21 11:20:55,847 INFO [org.apache.spark.deploy.client.AppClient$ClientEndpoint] Executor updated: app-20151021112052-0005/1 is now EXITED (Command exited with code 1) 2015-10-21 11:20:55,847 INFO [org.apache.spark.scheduler.cluster.SparkDeploySchedulerBackend] Executor app-20151021112052-0005/1 removed: Command exited with code 1 2015-10-21 11:20:55,848 INFO [org.apache.spark.scheduler.cluster.SparkDeploySchedulerBackend] Asked to remove non-existent executor 1 2015-10-21 11:20:55,848 INFO [org.apache.spark.deploy.client.AppClient$ClientEndpoint] Executor added: app-20151021112052-0005/2 on worker-20151021090623-1.2.3.4-57305 (1.2.3.4:57305) with 1 cores 2015-10-21 11:20:55,848 INFO [org.apache.spark.scheduler.cluster.SparkDeploySchedulerBackend] Granted executor ID app-20151021112052-0005/2 on hostPort 1.2.3.4:57305 with 1 cores, 4.9 GB RAM 2015-10-21 11:20:55,849 INFO [org.apache.spark.deploy.client.AppClient$ClientEndpoint] Executor updated: app-20151021112052-0005/2 is now LOADING 2015-10-21 11:20:55,852 INFO [org.apache.spark.deploy.client.AppClient$ClientEndpoint] Executor updated: app-20151021112052-0005/2 is now RUNNING 2015-10-21 11:20:57,165 INFO [org.apache.spark.deploy.client.AppClient$ClientEndpoint] Executor updated: app-20151021112052-0005/2 is now EXITED (Command exited with code 1) 2015-10-21 11:20:57,165 INFO [org.apache.spark.scheduler.cluster.SparkDeploySchedulerBackend] Executor app-20151021112052-0005/2 removed: Command exited with code 1 2015-10-21 11:20:57,166 INFO [org.apache.spark.scheduler.cluster.SparkDeploySchedulerBackend] Asked to remove non-existent executor 2 2015-10-21 11:20:57,166 INFO [org.apache.spark.deploy.client.AppClient$ClientEndpoint] Executor added: app-20151021112052-0005/3 on worker-20151021090623-1.2.3.4-57305 (1.2.3.4:57305) with 1 cores 2015-10-21 11:20:57,167 INFO [org.apache.spark.scheduler.cluster.SparkDeploySchedulerBackend] Granted executor ID app-20151021112052-0005/3 on hostPort 1.2.3.4:57305 with 1 cores, 4.9 GB RAM 2015-10-21 11:20:57,167 INFO [org.apache.spark.deploy.client.AppClient$ClientEndpoint] Executor updated: app-20151021112052-0005/3 is now LOADING 2015-10-21 11:20:57,169 INFO [org.apache.spark.deploy.client.AppClient$ClientEndpoint] Executor updated: app-20151021112052-0005/3 is now RUNNING 2015-10-21 11:20:58,531 INFO [org.apache.spark.deploy.client.AppClient$ClientEndpoint] Executor updated: app-20151021112052-0005/3 is now EXITED (Command exited with code 1) 2015-10-21 11:20:58,531 INFO [org.apache.spark.scheduler.cluster.SparkDeploySchedulerBackend] Executor app-20151021112052-0005/3 removed: Command exited with code 1 2015-10-21 11:20:58,532 INFO [org.apache.spark.scheduler.cluster.SparkDeploySchedulerBackend] Asked to remove non-existent executor 3 2015-10-21 11:20:58,532 INFO [org.apache.spark.deploy.client.AppClient$ClientEndpoint] Executor added: app-20151021112052-0005/4 on worker-20151021090623-1.2.3.4-57305 (1.2.3.4:57305) with 1 cores 2015-10-21 11:20:58,532 INFO [org.apache.spark.scheduler.cluster.SparkDeploySchedulerBackend] Granted executor ID app-20151021112052-0005/4 on hostPort 1.2.3.4:57305 with 1 cores, 4.9 GB RAM 2015-10-21 11:20:58,533 INFO [org.apache.spark.deploy.client.AppClient$ClientEndpoint] Executor updated: app-20151021112052-0005/4 is now LOADING 2015-10-21 11:20:58,535 INFO [org.apache.spark.deploy.client.AppClient$ClientEndpoint] Executor updated: app-20151021112052-0005/4 is now RUNNING 2015-10-21 11:20:59,932 INFO [org.apache.spark.deploy.client.AppClient$ClientEndpoint] Executor updated: app-20151021112052-0005/4 is now EXITED (Command exited with code 1) 2015-10-21 11:20:59,933 INFO [org.apache.spark.scheduler.cluster.SparkDeploySchedulerBackend] Executor
[jira] [Commented] (SPARK-2563) Re-open sockets to handle connect timeouts
[ https://issues.apache.org/jira/browse/SPARK-2563?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14955264#comment-14955264 ] Romi Kuntsman commented on SPARK-2563: -- i got a socket timeout in spark 1.4.0 is this still relevant for the last version, or is this bug abandoned? > Re-open sockets to handle connect timeouts > -- > > Key: SPARK-2563 > URL: https://issues.apache.org/jira/browse/SPARK-2563 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Reporter: Shivaram Venkataraman >Priority: Minor > > In a large EC2 cluster, I often see the first shuffle stage in a job fail due > to connection timeout exceptions. > If the connection attempt times out, the socket gets closed and from [1] we > get a ClosedChannelException. We should check if the Socket was closed due > to a timeout and open a new socket and try to connect. > FWIW, I was able to work around my problems by increasing the number of SYN > retries in Linux. (I ran echo 8 > /proc/sys/net/ipv4/tcp_syn_retries) > [1] > http://grepcode.com/file/repository.grepcode.com/java/root/jdk/openjdk/6-b14/sun/nio/ch/SocketChannelImpl.java?av=h#573 -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-5421) SparkSql throw OOM at shuffle
[ https://issues.apache.org/jira/browse/SPARK-5421?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14734561#comment-14734561 ] Romi Kuntsman commented on SPARK-5421: -- does this still happen on the latest version? I got some OOM with Spark 1.4.0 > SparkSql throw OOM at shuffle > - > > Key: SPARK-5421 > URL: https://issues.apache.org/jira/browse/SPARK-5421 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 1.2.0 >Reporter: Hong Shen > > ExternalAppendOnlyMap if only for the spark job that aggregator isDefined, > but sparkSQL's shuffledRDD haven't define aggregator, so sparkSQL won't spill > at shuffle, it's very easy to throw OOM at shuffle. I think sparkSQL also > need spill at shuffle. > One of the executor's log, here is stderr: > 15/01/27 07:02:19 INFO spark.MapOutputTrackerWorker: Don't have map outputs > for shuffle 1, fetching them > 15/01/27 07:02:19 INFO spark.MapOutputTrackerWorker: Doing the fetch; tracker > actor = > Actor[akka.tcp://sparkDriver@10.196.128.140:40952/user/MapOutputTracker#1435377484] > 15/01/27 07:02:19 INFO spark.MapOutputTrackerWorker: Got the output locations > 15/01/27 07:02:19 INFO storage.ShuffleBlockFetcherIterator: Getting 143 > non-empty blocks out of 143 blocks > 15/01/27 07:02:19 INFO storage.ShuffleBlockFetcherIterator: Started 4 remote > fetches in 72 ms > 15/01/27 07:47:29 ERROR executor.CoarseGrainedExecutorBackend: RECEIVED > SIGNAL 15: SIGTERM > here is stdout: > 2015-01-27T07:44:43.487+0800: [Full GC 3961343K->3959868K(3961344K), > 29.8959290 secs] > 2015-01-27T07:45:13.460+0800: [Full GC 3961343K->3959992K(3961344K), > 27.9218150 secs] > 2015-01-27T07:45:41.407+0800: [GC 3960347K(3961344K), 3.0457450 secs] > 2015-01-27T07:45:52.950+0800: [Full GC 3961343K->3960113K(3961344K), > 29.3894670 secs] > 2015-01-27T07:46:22.393+0800: [Full GC 3961118K->3960240K(3961344K), > 28.9879600 secs] > 2015-01-27T07:46:51.393+0800: [Full GC 3960240K->3960213K(3961344K), > 34.1530900 secs] > # > # java.lang.OutOfMemoryError: Java heap space > # -XX:OnOutOfMemoryError="kill %p" > # Executing /bin/sh -c "kill 9050"... > 2015-01-27T07:47:25.921+0800: [GC 3960214K(3961344K), 3.3959300 secs] -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-10135) Percent of pruned partitions is shown wrong
Romi Kuntsman created SPARK-10135: - Summary: Percent of pruned partitions is shown wrong Key: SPARK-10135 URL: https://issues.apache.org/jira/browse/SPARK-10135 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 1.4.0 Reporter: Romi Kuntsman Priority: Trivial When reading partitioned Parquet in SparkSQL, an info message about the number of pruned partitions is displayed. Actual: Selected 15 partitions out of 181, pruned -1106.7% partitions. Expected: Selected 15 partitions out of 181, pruned 91.71270718232044% partitions. Fix: (i'm newbie here so please help make patch, thanks!) in DataSourceStrategy.scala in method apply() insted of: val percentPruned = (1 - total.toDouble / selected.toDouble) * 100 should be: val percentPruned = (1 - selected.toDouble / total.toDouble) * 100 -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-2579) Reading from S3 returns an inconsistent number of items with Spark 0.9.1
[ https://issues.apache.org/jira/browse/SPARK-2579?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14324285#comment-14324285 ] Romi Kuntsman commented on SPARK-2579: -- Does this still happen with Spark 1.2.1? Reading from S3 returns an inconsistent number of items with Spark 0.9.1 Key: SPARK-2579 URL: https://issues.apache.org/jira/browse/SPARK-2579 Project: Spark Issue Type: Bug Components: Input/Output Affects Versions: 0.9.1 Reporter: Eemil Lagerspetz Priority: Critical Labels: hdfs, read, s3, skipping I have created a random matrix of 1M rows with 10K items on each row, semicolon-separated. While reading it with Spark 0.9.1 and doing a count, I consistently get less than 1M rows, and a different number every time at that ( !! ). Example below: head -n 1 tool-generate-random-matrix*log == tool-generate-random-matrix-999158.log == Row item counts: 999158 == tool-generate-random-matrix.log == Row item counts: 997163 The data is split into 1000 partitions. When I download it using s3cmd sync, and run the following AWK on it, I get the correct number of rows in each partition (1000x1000 = 1M). What is up? {code:title=checkrows.sh|borderStyle=solid} for k in part-0* do echo $k awk -F ; ' NF != 1 { print Wrong number of items:,NF } END { if (NR != 1000) { print Wrong number of rows:,NR } }' $k done {code} The matrix generation and counting code is below: {code:title=Matrix.scala|borderStyle=solid} package fi.helsinki.cs.nodes.matrix import java.util.Random import org.apache.spark._ import org.apache.spark.SparkContext._ import scala.collection.mutable.ListBuffer import org.apache.spark.rdd.RDD import org.apache.spark.storage.StorageLevel._ object GenerateRandomMatrix { def NewGeMatrix(rSeed: Int, rdd: RDD[Int], features: Int) = { rdd.mapPartitions(part = part.map(xarr = { val rdm = new Random(rSeed + xarr) val arr = new Array[Double](features) for (i - 0 until features) arr(i) = rdm.nextDouble() new Row(xarr, arr) })) } case class Row(id: Int, elements: Array[Double]) {} def rowFromText(line: String) = { val idarr = line.split( ) val arr = idarr(1).split(;) // -1 to fix saved matrix indexing error new Row(idarr(0).toInt-1, arr.map(_.toDouble)) } def main(args: Array[String]) { val master = args(0) val tasks = args(1).toInt val savePath = args(2) val read = args.contains(read) val datapoints = 100 val features = 1 val sc = new SparkContext(master, RandomMatrix) if (read) { val randomMatrix: RDD[Row] = sc.textFile(savePath, tasks).map(rowFromText).persist(MEMORY_AND_DISK) println(Row item counts: + randomMatrix.count) } else { val rdd = sc.parallelize(0 until datapoints, tasks) val bcSeed = sc.broadcast(128) /* Generating a matrix of random Doubles */ val randomMatrix = NewGeMatrix(bcSeed.value, rdd, features).persist(MEMORY_AND_DISK) randomMatrix.map(row = row.id + + row.elements.mkString(;)).saveAsTextFile(savePath) } sc.stop } } {code} I run this with: appassembler/bin/tool-generate-random-matrix master 1000 s3n://keys@path/to/data 1matrix.log 2matrix.err Reading from HDFS gives the right count and right number of items on each row. However, I had to run with the full path with the server name, just /matrix does not work (it thinks I want file://): p=hdfs://ec2-54-188-6-77.us-west-2.compute.amazonaws.com:9000/matrix appassembler/bin/tool-generate-random-matrix $( cat /root/spark-ec2/cluster-url ) 1000 $p read 1readmatrix.log 2readmatrix.err -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-4879) Missing output partitions after job completes with speculative execution
[ https://issues.apache.org/jira/browse/SPARK-4879?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14324278#comment-14324278 ] Romi Kuntsman commented on SPARK-4879: -- Could this happen very very rarely when not using speculative execution? Once in a long while, I have a situation where the OutputCommitter says it wrote the file successfully, but the output file doesn't appear there. Missing output partitions after job completes with speculative execution Key: SPARK-4879 URL: https://issues.apache.org/jira/browse/SPARK-4879 Project: Spark Issue Type: Bug Components: Input/Output, Spark Core Affects Versions: 1.0.2, 1.1.1, 1.2.0 Reporter: Josh Rosen Assignee: Josh Rosen Priority: Critical Labels: backport-needed Fix For: 1.3.0 Attachments: speculation.txt, speculation2.txt When speculative execution is enabled ({{spark.speculation=true}}), jobs that save output files may report that they have completed successfully even though some output partitions written by speculative tasks may be missing. h3. Reproduction This symptom was reported to me by a Spark user and I've been doing my own investigation to try to come up with an in-house reproduction. I'm still working on a reliable local reproduction for this issue, which is a little tricky because Spark won't schedule speculated tasks on the same host as the original task, so you need an actual (or containerized) multi-host cluster to test speculation. Here's a simple reproduction of some of the symptoms on EC2, which can be run in {{spark-shell}} with {{--conf spark.speculation=true}}: {code} // Rig a job such that all but one of the tasks complete instantly // and one task runs for 20 seconds on its first attempt and instantly // on its second attempt: val numTasks = 100 sc.parallelize(1 to numTasks, numTasks).repartition(2).mapPartitionsWithContext { case (ctx, iter) = if (ctx.partitionId == 0) { // If this is the one task that should run really slow if (ctx.attemptId == 0) { // If this is the first attempt, run slow Thread.sleep(20 * 1000) } } iter }.map(x = (x, x)).saveAsTextFile(/test4) {code} When I run this, I end up with a job that completes quickly (due to speculation) but reports failures from the speculated task: {code} [...] 14/12/11 01:41:13 INFO scheduler.TaskSetManager: Finished task 37.1 in stage 3.0 (TID 411) in 131 ms on ip-172-31-8-164.us-west-2.compute.internal (100/100) 14/12/11 01:41:13 INFO scheduler.DAGScheduler: Stage 3 (saveAsTextFile at console:22) finished in 0.856 s 14/12/11 01:41:13 INFO spark.SparkContext: Job finished: saveAsTextFile at console:22, took 0.885438374 s 14/12/11 01:41:13 INFO scheduler.TaskSetManager: Ignoring task-finished event for 70.1 in stage 3.0 because task 70 has already completed successfully scala 14/12/11 01:41:13 WARN scheduler.TaskSetManager: Lost task 49.1 in stage 3.0 (TID 413, ip-172-31-8-164.us-west-2.compute.internal): java.io.IOException: Failed to save output of task: attempt_201412110141_0003_m_49_413 org.apache.hadoop.mapred.FileOutputCommitter.moveTaskOutputs(FileOutputCommitter.java:160) org.apache.hadoop.mapred.FileOutputCommitter.moveTaskOutputs(FileOutputCommitter.java:172) org.apache.hadoop.mapred.FileOutputCommitter.commitTask(FileOutputCommitter.java:132) org.apache.spark.SparkHadoopWriter.commit(SparkHadoopWriter.scala:109) org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:991) org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:974) org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62) org.apache.spark.scheduler.Task.run(Task.scala:54) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) java.lang.Thread.run(Thread.java:745) {code} One interesting thing to note about this stack trace: if we look at {{FileOutputCommitter.java:160}} ([link|http://grepcode.com/file/repository.cloudera.com/content/repositories/releases/org.apache.hadoop/hadoop-core/2.5.0-mr1-cdh5.2.0/org/apache/hadoop/mapred/FileOutputCommitter.java#160]), this point in the execution seems to correspond to a case where a task completes, attempts to commit its output, fails for some reason, then deletes the destination file, tries again, and fails: {code} if (fs.isFile(taskOutput)) { 152 Path
[jira] [Commented] (SPARK-2867) saveAsHadoopFile() in PairRDDFunction.scala should allow use other OutputCommiter class
[ https://issues.apache.org/jira/browse/SPARK-2867?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14207997#comment-14207997 ] Romi Kuntsman commented on SPARK-2867: -- In the latest code, it seems to be resolved // Use configured output committer if already set if (conf.getOutputCommitter == null) { hadoopConf.setOutputCommitter(classOf[FileOutputCommitter]) } https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala#L934 saveAsHadoopFile() in PairRDDFunction.scala should allow use other OutputCommiter class --- Key: SPARK-2867 URL: https://issues.apache.org/jira/browse/SPARK-2867 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 1.0.0, 1.1.0 Reporter: Joseph Su Priority: Minor The saveAsHadoopFile() in PairRDDFunction.scala hard-coded the OutputCommitter class as FileOutputCommitter because of the following code in the source: hadoopConf.setOutputCommitter(classOf[FileOutputCommitter]) However, OutputCommitter is a changeable option in regular Hadoop MapReduce program. Users can specify mapred.output.committer.class to change the committer class used by other Hadoop programs. The saveAsHadoopFile() function should remove this hard-coded assignment and provide a way to specify the OutputCommitte used here. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org