[jira] [Created] (SPARK-26404) set spark.pyspark.python or PYSPARK_PYTHON doesn't work in k8s client-cluster mode.

2018-12-18 Thread Dongqing Liu (JIRA)
Dongqing  Liu created SPARK-26404:
-

 Summary: set spark.pyspark.python or PYSPARK_PYTHON doesn't work 
in k8s client-cluster mode.
 Key: SPARK-26404
 URL: https://issues.apache.org/jira/browse/SPARK-26404
 Project: Spark
  Issue Type: Bug
  Components: Kubernetes
Affects Versions: 2.4.0
Reporter: Dongqing  Liu


Neither

   conf.set("spark.executorEnv.PYSPARK_PYTHON", "/opt/pythonenvs/bin/python")

nor 

  conf.set("spark.pyspark.python", "/opt/pythonenvs/bin/python") 

works. 

Looks like the executor always picks python from PATH.

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-26393) Different behaviors of date_add when calling it inside expr

2018-12-18 Thread Hyukjin Kwon (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26393?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hyukjin Kwon resolved SPARK-26393.
--
Resolution: Won't Fix

> Different behaviors of date_add when calling it inside expr
> ---
>
> Key: SPARK-26393
> URL: https://issues.apache.org/jira/browse/SPARK-26393
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Affects Versions: 2.3.2
>Reporter: Ahmed Kamal
>Priority: Minor
>
> When Calling date_add from pyspark.sql.functions directly without using expr, 
> like this : 
> {code:java}
> df.withColumn("added", F.date_add(F.to_date(F.lit('1998-9-26')), 
> F.col('days'))).toPandas(){code}
> It will raise Error : `TypeError: Column is not iterable`
> because it only taking a number not a column 
> but when i try to use it inside an expr, like this :
> {code:java}
> df.withColumn("added", F.expr("date_add(to_date('1998-9-26'), 
> days)")).toPandas(){code}
> It will work fine.
> Shouldn't it behave the same way ? 
> and i think its logical to accept a column  here as well.
> A python Notebook to demonstrate :
> [https://gist.github.com/AhmedKamal20/fec10337e815baa44f115d307e3b07eb]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-26393) Different behaviors of date_add when calling it inside expr

2018-12-18 Thread Hyukjin Kwon (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-26393?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16724765#comment-16724765
 ] 

Hyukjin Kwon commented on SPARK-26393:
--

PySaprk API is matched to Scala side primarily, and looks Scala side doesn't 
have it. There are some more cases like this if you take a look closely, even 
in Scala side.
One problem is the number of Scala's function APIs are getting huge. We should 
rather focus on deduction than addition. Spark's trying to only add some 
absolutely required APIs now and be conservative.
For this case specifically, workaround is super easy as you described.

Related but bigger than this scope: there has been discussion about disallowing 
other types and only allowing {{Column}}s everywhere.

Let's leave this resolved and fix them globally.

> Different behaviors of date_add when calling it inside expr
> ---
>
> Key: SPARK-26393
> URL: https://issues.apache.org/jira/browse/SPARK-26393
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Affects Versions: 2.3.2
>Reporter: Ahmed Kamal
>Priority: Minor
>
> When Calling date_add from pyspark.sql.functions directly without using expr, 
> like this : 
> {code:java}
> df.withColumn("added", F.date_add(F.to_date(F.lit('1998-9-26')), 
> F.col('days'))).toPandas(){code}
> It will raise Error : `TypeError: Column is not iterable`
> because it only taking a number not a column 
> but when i try to use it inside an expr, like this :
> {code:java}
> df.withColumn("added", F.expr("date_add(to_date('1998-9-26'), 
> days)")).toPandas(){code}
> It will work fine.
> Shouldn't it behave the same way ? 
> and i think its logical to accept a column  here as well.
> A python Notebook to demonstrate :
> [https://gist.github.com/AhmedKamal20/fec10337e815baa44f115d307e3b07eb]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-21514) Hive has updated with new support for S3 and InsertIntoHiveTable.scala should update also

2018-12-18 Thread Noritaka Sekiyama (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-21514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16724763#comment-16724763
 ] 

Noritaka Sekiyama commented on SPARK-21514:
---

I'm working on fixing this. Will update once I have done.
Please let me know if anyone is already working on this.

> Hive has updated with new support for S3 and InsertIntoHiveTable.scala should 
> update also
> -
>
> Key: SPARK-21514
> URL: https://issues.apache.org/jira/browse/SPARK-21514
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.2.0
>Reporter: Javier Ros
>Priority: Major
>
> Hive has updated adding new parameters to optimize the usage of S3, now you 
> can avoid the usage of S3 as the stagingdir using the parameters 
> hive.blobstore.supported.schemes & hive.blobstore.optimizations.enabled.
> The InsertIntoHiveTable.scala file should be updated with the same 
> improvement to match the behavior of Hive.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-26381) Pickle Serialization Error Causing Crash

2018-12-18 Thread Hyukjin Kwon (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-26381?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16724750#comment-16724750
 ] 

Hyukjin Kwon commented on SPARK-26381:
--

[~ryan.clancy], please provide the codes to reproduce.

> Pickle Serialization Error Causing Crash
> 
>
> Key: SPARK-26381
> URL: https://issues.apache.org/jira/browse/SPARK-26381
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Affects Versions: 2.3.1, 2.4.0
> Environment: Tested on two environments:
>  * Spark 2.4.0 - single machine only
>  * Spark 2.3.1 - YARN installation with 5 nodes and files on HDFS
> The error occurs in both environments.
>Reporter: Ryan
>Priority: Major
>
> There is a pickle serialization error when I try and use AllenNLP for doing 
> NER within a Spark worker - it is causing a crash. When running on just the 
> Spark driver or in a standalone program, everything works as expected.
>  
> {code:java}
> Caused by: org.apache.spark.api.python.PythonException: Traceback (most 
> recent call last): 
>  File 
> "/data/disk12/yarn/local/usercache/raclancy/appcache/application_1543437939000_1040/container_1543437939000_1040_01_02/pyspark.zip/pyspark/worker.py",
>  line 217, in main 
>    func, profiler, deserializer, serializer = read_command(pickleSer, infile) 
>  File 
> "/data/disk12/yarn/local/usercache/raclancy/appcache/application_1543437939000_1040/container_1543437939000_1040_01_02/pyspark.zip/pyspark/worker.py",
>  line 61, in read_command 
>    command = serializer.loads(command.value) 
>  File 
> "/data/disk12/yarn/local/usercache/raclancy/appcache/application_1543437939000_1040/container_1543437939000_1040_01_02/pyspark.zip/pyspark/serializers.py",
>  line 559, in loads 
>    return pickle.loads(obj, encoding=encoding) 
> TypeError: __init__() missing 3 required positional arguments: 
> 'non_padded_namespaces', 'padding_token', and 'oov_token' 
>    at 
> org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:298)
>  
>    at 
> org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:438) 
>    at 
> org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:421) 
>    at 
> org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:252)
>  
>    at 
> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
>  
>    at scala.collection.Iterator$class.foreach(Iterator.scala:893) 
>    at 
> org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
>  
>    at 
> scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59) 
>    at 
> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104) 
>    at 
> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48) 
>    at 
> scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310) 
>    at 
> org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28) 
>    at 
> scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302) 
>    at 
> org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
>  
>    at 
> scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289) 
>    at 
> org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
>  
>    at 
> org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:939) 
>    at 
> org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:939) 
>    at 
> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2074)
>  
>    at 
> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2074)
>  
>    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) 
>    at org.apache.spark.scheduler.Task.run(Task.scala:109) 
>    at 
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345) 
>    at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>  
>    at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>  
>    ... 1 more
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-26391) Spark Streaming Kafka with Offset Gaps

2018-12-18 Thread Hyukjin Kwon (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26391?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hyukjin Kwon resolved SPARK-26391.
--
Resolution: Invalid

Questions should go to mailing list. You could have a better answer from 
developers and users.

> Spark Streaming Kafka with Offset Gaps
> --
>
> Key: SPARK-26391
> URL: https://issues.apache.org/jira/browse/SPARK-26391
> Project: Spark
>  Issue Type: Question
>  Components: Spark Core, Structured Streaming
>Affects Versions: 2.4.0
>Reporter: Rishabh
>Priority: Major
>
> I have an app that uses Kafka Streaming to pull data from `input` topic and 
> push to `output` topic with `processing.guarantee=exactly_once`. Due to 
> `exactly_once` gaps (transaction markers) are created in Kafka. Let's call 
> this app `kafka-streamer`.
> Now I've another app that listens to this output topic (actually they are 
> multiple topics with a Pattern/Regex) and processes the data using 
> [https://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html]. 
> Let's call this app `spark-streamer`.
> Due to the gaps, the first thing that happens is spark streaming fails. To 
> fix this I enabled `spark.streaming.kafka.allowNonConsecutiveOffsets=true` in 
> the spark config before creating the StreamingContext. Now let's look at the 
> issues that were faced when I start `spark-streamer`:
>  # Even though there are new offsets to be polled/consumed, it requires 
> another message push to the topic partition to be able to start processing. 
> If I start the app (and there are messages in queue to be polled) and don't 
> push any topic, the code will timeout after default 120ms and throw an 
> exception.
>  # It doesn't fetch the last record. It fetches the record till second-last. 
> This means to poll/process the last record, another message has to be pushed. 
> This is a problem for us since `spark-streamer` is listening to multiple 
> topics (based on a pattern) and there might be a topic where throughput is 
> low but the data should still make it to Spark for processing.
>  # In general if no data/message is pushed then it'll die after 120ms default 
> timeout for polling.
> Now in the limited amount of time I had, I tried going through the 
> spark-streaming-kafka code and was only able to find an answer to the third 
> problem which is this - 
> [https://github.com/apache/spark/blob/master/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaDataConsumer.scala#L178]
> My questions are:
>  # Why do we throw an exception in `compactedNext()` if no data is polled ?
>  # I wasn't able to figure out why the first and second issue happened, would 
> be great if somebody can point out a solution or reason behind the behaviour ?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-26388) No support for "alter table .. replace columns" to drop columns

2018-12-18 Thread Hyukjin Kwon (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-26388?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16724751#comment-16724751
 ] 

Hyukjin Kwon commented on SPARK-26388:
--

Can you add expected input / output in the description? that should be easier 
to follow.

> No support for "alter table .. replace columns" to drop columns
> ---
>
> Key: SPARK-26388
> URL: https://issues.apache.org/jira/browse/SPARK-26388
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.2.1, 2.3.1, 2.3.2
>Reporter: nirav patel
>Priority: Major
>
> Looks like hive {{replace columns}} is not working with spark 2.2.1 and 2.3.1
>  
> {{alterSchemaSql : alter table myschema.mytable replace columns (a int,b 
> int,d int) Exception in thread "main" 
> org.apache.spark.sql.catalyst.parser.ParseException: Operation not allowed: 
> alter table replace columns(line 2, pos 6) }}
> {{ADD COLUMNS}} works which seemed to previously reported and fixed as well:
> https://issues.apache.org/jira/browse/SPARK-18893
>  
> Replace columns should be supported as well. afaik, that's the only way to 
> delete hive columns.
>  
>  
> It supposed to work according to this docs:
> [https://docs.databricks.com/spark/latest/spark-sql/language-manual/alter-table-or-view.html#replace-columns]
> [https://spark.apache.org/docs/2.2.0/sql-programming-guide.html#supported-hive-features]
>  
> but it's throwing error for me on 2 different versions.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-26381) Pickle Serialization Error Causing Crash

2018-12-18 Thread Hyukjin Kwon (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26381?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hyukjin Kwon resolved SPARK-26381.
--
Resolution: Incomplete

> Pickle Serialization Error Causing Crash
> 
>
> Key: SPARK-26381
> URL: https://issues.apache.org/jira/browse/SPARK-26381
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Affects Versions: 2.3.1, 2.4.0
> Environment: Tested on two environments:
>  * Spark 2.4.0 - single machine only
>  * Spark 2.3.1 - YARN installation with 5 nodes and files on HDFS
> The error occurs in both environments.
>Reporter: Ryan
>Priority: Major
>
> There is a pickle serialization error when I try and use AllenNLP for doing 
> NER within a Spark worker - it is causing a crash. When running on just the 
> Spark driver or in a standalone program, everything works as expected.
>  
> {code:java}
> Caused by: org.apache.spark.api.python.PythonException: Traceback (most 
> recent call last): 
>  File 
> "/data/disk12/yarn/local/usercache/raclancy/appcache/application_1543437939000_1040/container_1543437939000_1040_01_02/pyspark.zip/pyspark/worker.py",
>  line 217, in main 
>    func, profiler, deserializer, serializer = read_command(pickleSer, infile) 
>  File 
> "/data/disk12/yarn/local/usercache/raclancy/appcache/application_1543437939000_1040/container_1543437939000_1040_01_02/pyspark.zip/pyspark/worker.py",
>  line 61, in read_command 
>    command = serializer.loads(command.value) 
>  File 
> "/data/disk12/yarn/local/usercache/raclancy/appcache/application_1543437939000_1040/container_1543437939000_1040_01_02/pyspark.zip/pyspark/serializers.py",
>  line 559, in loads 
>    return pickle.loads(obj, encoding=encoding) 
> TypeError: __init__() missing 3 required positional arguments: 
> 'non_padded_namespaces', 'padding_token', and 'oov_token' 
>    at 
> org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:298)
>  
>    at 
> org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:438) 
>    at 
> org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:421) 
>    at 
> org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:252)
>  
>    at 
> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
>  
>    at scala.collection.Iterator$class.foreach(Iterator.scala:893) 
>    at 
> org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
>  
>    at 
> scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59) 
>    at 
> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104) 
>    at 
> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48) 
>    at 
> scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310) 
>    at 
> org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28) 
>    at 
> scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302) 
>    at 
> org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
>  
>    at 
> scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289) 
>    at 
> org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
>  
>    at 
> org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:939) 
>    at 
> org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:939) 
>    at 
> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2074)
>  
>    at 
> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2074)
>  
>    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) 
>    at org.apache.spark.scheduler.Task.run(Task.scala:109) 
>    at 
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345) 
>    at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>  
>    at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>  
>    ... 1 more
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-26377) java.lang.IllegalStateException: No current assignment for partition

2018-12-18 Thread Hyukjin Kwon (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26377?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hyukjin Kwon resolved SPARK-26377.
--
Resolution: Invalid

I'm going to leave this resolved. Let's iterate in Spark mailing lists before 
filing it as an issue.

> java.lang.IllegalStateException: No current assignment for partition
> 
>
> Key: SPARK-26377
> URL: https://issues.apache.org/jira/browse/SPARK-26377
> Project: Spark
>  Issue Type: Bug
>  Components: DStreams
>Affects Versions: 2.2.1
>Reporter: pavan
>Priority: Major
>
> Hi,
>    I am using sparkkafkaDirectStream with subscriberPattern with initial 
> offsets for topics and a pattern. On running the SparkJob on the job server  
> i am getting the following exception.The job is terminated. 
> Kafka Params:
> "bootstrap.servers" -> credentials.getBrokers,
>  "key.deserializer" -> classOf[StringDeserializer],
>  "value.deserializer" -> classOf[ByteArrayDeserializer],
>  "enable.auto.commit" -> (false: java.lang.Boolean)
> "group.id" -> "abc"
> API:
> KafkaUtils.createDirectStream(streamingContext, PreferConsistent, 
> SubscribePattern[K, V](regexPattern, allKafkaParams, offsets), 
> perPartitionConfig)
>  
> Error Log:
> { "duration": "33.523 secs", "classPath": 
> "com.appiot.dataingestion.DataIngestionJob", "startTime": 
> "2018-12-15T18:28:08.207Z", "context": 
> "c~1d750906-1fa7-44f9-a258-04963ac53150~9dc097e3-bf0f-432c-9c27-68c41a4009cd",
>  "result": 
> { "message": "java.lang.IllegalStateException: No current assignment for 
> partition com-cibigdata2.v1.iot.raw_timeseries-0", "errorClass": 
> "java.lang.RuntimeException", "stack": "java.lang.RuntimeException: 
> java.lang.IllegalStateException: No current assignment for partition 
> com-cibigdata2.v1.iot.raw_timeseries-0\n\tat 
> org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:269)\n\tat
>  
> org.apache.kafka.clients.consumer.internals.SubscriptionState.seek(SubscriptionState.java:294)\n\tat
>  
> org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1249)\n\tat
>  
> org.apache.spark.streaming.kafka010.SubscribePattern$$anonfun$onStart$4.apply(ConsumerStrategy.scala:160)\n\tat
>  
> org.apache.spark.streaming.kafka010.SubscribePattern$$anonfun$onStart$4.apply(ConsumerStrategy.scala:159)\n\tat
>  scala.collection.Iterator$class.foreach(Iterator.scala:893)\n\tat 
> scala.collection.AbstractIterator.foreach(Iterator.scala:1336)\n\tat 
> scala.collection.IterableLike$class.foreach(IterableLike.scala:72)\n\tat 
> scala.collection.AbstractIterable.foreach(Iterable.scala:54)\n\tat 
> org.apache.spark.streaming.kafka010.SubscribePattern.onStart(ConsumerStrategy.scala:159)\n\tat
>  
> org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.consumer(DirectKafkaInputDStream.scala:72)\n\tat
>  
> org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.start(DirectKafkaInputDStream.scala:242)\n\tat
>  
> org.apache.spark.streaming.DStreamGraph$$anonfun$start$7.apply(DStreamGraph.scala:54)\n\tat
>  
> org.apache.spark.streaming.DStreamGraph$$anonfun$start$7.apply(DStreamGraph.scala:54)\n\tat
>  
> scala.collection.parallel.mutable.ParArray$ParArrayIterator.foreach_quick(ParArray.scala:143)\n\tat
>  
> scala.collection.parallel.mutable.ParArray$ParArrayIterator.foreach(ParArray.scala:136)\n\tat
>  
> scala.collection.parallel.ParIterableLike$Foreach.leaf(ParIterableLike.scala:972)\n\tat
>  
> scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:49)\n\tat
>  
> scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48)\n\tat 
> scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48)\n\tat 
> scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:51)\n\tat 
> scala.collection.parallel.ParIterableLike$Foreach.tryLeaf(ParIterableLike.scala:969)\n\tat
>  
> scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:152)\n\tat
>  
> scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:443)\n\tat
>  
> scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160)\n\tat
>  scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)\n\tat 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)\n\tat
>  
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)\n\tat
>  
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)\n\tat
>  ... run in separate thread using org.apache.spark.util.ThreadUtils ... 
> ()\n\tat 
> org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:578)\n\tat
>  
> org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:572)\n\tat
>  
> 

[jira] [Resolved] (SPARK-26366) Except with transform regression

2018-12-18 Thread Xiao Li (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26366?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xiao Li resolved SPARK-26366.
-
   Resolution: Fixed
Fix Version/s: 3.0.0
   2.4.1

> Except with transform regression
> 
>
> Key: SPARK-26366
> URL: https://issues.apache.org/jira/browse/SPARK-26366
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core, SQL
>Affects Versions: 2.3.2
>Reporter: Dan Osipov
>Assignee: Marco Gaido
>Priority: Major
> Fix For: 2.4.1, 3.0.0
>
>
> There appears to be a regression between Spark 2.2 and 2.3. Below is the code 
> to reproduce it:
>  
> {code:java}
> import org.apache.spark.sql.functions.col
> import org.apache.spark.sql.Row
> import org.apache.spark.sql.types._
> val inputDF = spark.sqlContext.createDataFrame(
>   spark.sparkContext.parallelize(Seq(
> Row("0", "john", "smith", "j...@smith.com"),
> Row("1", "jane", "doe", "j...@doe.com"),
> Row("2", "apache", "spark", "sp...@apache.org"),
> Row("3", "foo", "bar", null)
>   )),
>   StructType(List(
> StructField("id", StringType, nullable=true),
> StructField("first_name", StringType, nullable=true),
> StructField("last_name", StringType, nullable=true),
> StructField("email", StringType, nullable=true)
>   ))
> )
> val exceptDF = inputDF.transform( toProcessDF =>
>   toProcessDF.filter(
>   (
> col("first_name").isin(Seq("john", "jane"): _*)
>   and col("last_name").isin(Seq("smith", "doe"): _*)
>   )
>   or col("email").isin(List(): _*)
>   )
> )
> inputDF.except(exceptDF).show()
> {code}
> Output with Spark 2.2:
> {noformat}
> +---+--+-++
> | id|first_name|last_name| email|
> +---+--+-++
> | 2| apache| spark|sp...@apache.org|
> | 3| foo| bar| null|
> +---+--+-++{noformat}
> Output with Spark 2.3:
> {noformat}
> +---+--+-++
> | id|first_name|last_name| email|
> +---+--+-++
> | 2| apache| spark|sp...@apache.org|
> +---+--+-++{noformat}
> Note, changing the last line to 
> {code:java}
> inputDF.except(exceptDF.cache()).show()
> {code}
> produces identical output for both Spark 2.3 and 2.2
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-26366) Except with transform regression

2018-12-18 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-26366?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16724744#comment-16724744
 ] 

ASF GitHub Bot commented on SPARK-26366:


asfgit closed pull request #23315: [SPARK-26366][SQL] ReplaceExceptWithFilter 
should consider NULL as False
URL: https://github.com/apache/spark/pull/23315
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceExceptWithFilter.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceExceptWithFilter.scala
index efd3944eba7f5..4996d24dfd298 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceExceptWithFilter.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceExceptWithFilter.scala
@@ -36,7 +36,8 @@ import org.apache.spark.sql.catalyst.rules.Rule
  * Note:
  * Before flipping the filter condition of the right node, we should:
  * 1. Combine all it's [[Filter]].
- * 2. Apply InferFiltersFromConstraints rule (to take into account of NULL 
values in the condition).
+ * 2. Update the attribute references to the left node;
+ * 3. Add a Coalesce(condition, False) (to take into account of NULL values in 
the condition).
  */
 object ReplaceExceptWithFilter extends Rule[LogicalPlan] {
 
@@ -47,23 +48,28 @@ object ReplaceExceptWithFilter extends Rule[LogicalPlan] {
 
 plan.transform {
   case e @ Except(left, right, false) if isEligible(left, right) =>
-val newCondition = transformCondition(left, skipProject(right))
-newCondition.map { c =>
-  Distinct(Filter(Not(c), left))
-}.getOrElse {
+val filterCondition = 
combineFilters(skipProject(right)).asInstanceOf[Filter].condition
+if (filterCondition.deterministic) {
+  transformCondition(left, filterCondition).map { c =>
+Distinct(Filter(Not(c), left))
+  }.getOrElse {
+e
+  }
+} else {
   e
 }
 }
   }
 
-  private def transformCondition(left: LogicalPlan, right: LogicalPlan): 
Option[Expression] = {
-val filterCondition =
-  
InferFiltersFromConstraints(combineFilters(right)).asInstanceOf[Filter].condition
-
-val attributeNameMap: Map[String, Attribute] = left.output.map(x => 
(x.name, x)).toMap
-
-if (filterCondition.references.forall(r => 
attributeNameMap.contains(r.name))) {
-  Some(filterCondition.transform { case a: AttributeReference => 
attributeNameMap(a.name) })
+  private def transformCondition(plan: LogicalPlan, condition: Expression): 
Option[Expression] = {
+val attributeNameMap: Map[String, Attribute] = plan.output.map(x => 
(x.name, x)).toMap
+if (condition.references.forall(r => attributeNameMap.contains(r.name))) {
+  val rewrittenCondition = condition.transform {
+case a: AttributeReference => attributeNameMap(a.name)
+  }
+  // We need to consider as False when the condition is NULL, otherwise we 
do not return those
+  // rows containing NULL which are instead filtered in the Except right 
plan
+  Some(Coalesce(Seq(rewrittenCondition, Literal.FalseLiteral)))
 } else {
   None
 }
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceOperatorSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceOperatorSuite.scala
index 3b1b2d588ef67..c8e15c7da763e 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceOperatorSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceOperatorSuite.scala
@@ -20,11 +20,12 @@ package org.apache.spark.sql.catalyst.optimizer
 import org.apache.spark.sql.Row
 import org.apache.spark.sql.catalyst.dsl.expressions._
 import org.apache.spark.sql.catalyst.dsl.plans._
-import org.apache.spark.sql.catalyst.expressions.{Alias, Literal, Not}
+import org.apache.spark.sql.catalyst.expressions.{Alias, Coalesce, If, 
Literal, Not}
 import org.apache.spark.sql.catalyst.expressions.aggregate.First
 import org.apache.spark.sql.catalyst.plans.{LeftAnti, LeftSemi, PlanTest}
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.rules.RuleExecutor
+import org.apache.spark.sql.types.BooleanType
 
 class ReplaceOperatorSuite extends PlanTest {
 
@@ -65,8 +66,7 @@ class ReplaceOperatorSuite extends PlanTest {
 
 val correctAnswer =
   Aggregate(table1.output, table1.output,
-Filter(Not((attributeA.isNotNull && attributeB.isNotNull) &&
-  (attributeA >= 2 && attributeB < 1)),
+

[jira] [Assigned] (SPARK-26366) Except with transform regression

2018-12-18 Thread Xiao Li (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26366?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xiao Li reassigned SPARK-26366:
---

Assignee: Marco Gaido

> Except with transform regression
> 
>
> Key: SPARK-26366
> URL: https://issues.apache.org/jira/browse/SPARK-26366
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core, SQL
>Affects Versions: 2.3.2
>Reporter: Dan Osipov
>Assignee: Marco Gaido
>Priority: Major
>
> There appears to be a regression between Spark 2.2 and 2.3. Below is the code 
> to reproduce it:
>  
> {code:java}
> import org.apache.spark.sql.functions.col
> import org.apache.spark.sql.Row
> import org.apache.spark.sql.types._
> val inputDF = spark.sqlContext.createDataFrame(
>   spark.sparkContext.parallelize(Seq(
> Row("0", "john", "smith", "j...@smith.com"),
> Row("1", "jane", "doe", "j...@doe.com"),
> Row("2", "apache", "spark", "sp...@apache.org"),
> Row("3", "foo", "bar", null)
>   )),
>   StructType(List(
> StructField("id", StringType, nullable=true),
> StructField("first_name", StringType, nullable=true),
> StructField("last_name", StringType, nullable=true),
> StructField("email", StringType, nullable=true)
>   ))
> )
> val exceptDF = inputDF.transform( toProcessDF =>
>   toProcessDF.filter(
>   (
> col("first_name").isin(Seq("john", "jane"): _*)
>   and col("last_name").isin(Seq("smith", "doe"): _*)
>   )
>   or col("email").isin(List(): _*)
>   )
> )
> inputDF.except(exceptDF).show()
> {code}
> Output with Spark 2.2:
> {noformat}
> +---+--+-++
> | id|first_name|last_name| email|
> +---+--+-++
> | 2| apache| spark|sp...@apache.org|
> | 3| foo| bar| null|
> +---+--+-++{noformat}
> Output with Spark 2.3:
> {noformat}
> +---+--+-++
> | id|first_name|last_name| email|
> +---+--+-++
> | 2| apache| spark|sp...@apache.org|
> +---+--+-++{noformat}
> Note, changing the last line to 
> {code:java}
> inputDF.except(exceptDF.cache()).show()
> {code}
> produces identical output for both Spark 2.3 and 2.2
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-26288) add initRegisteredExecutorsDB in ExternalShuffleService

2018-12-18 Thread weixiuli (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26288?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

weixiuli updated SPARK-26288:
-
Component/s: Spark Core

> add initRegisteredExecutorsDB in ExternalShuffleService
> ---
>
> Key: SPARK-26288
> URL: https://issues.apache.org/jira/browse/SPARK-26288
> Project: Spark
>  Issue Type: New Feature
>  Components: Kubernetes, Shuffle, Spark Core
>Affects Versions: 2.4.0
>Reporter: weixiuli
>Priority: Major
>
> As we all know that spark on Yarn uses DB to record RegisteredExecutors 
> information which can be reloaded and used again when the 
> ExternalShuffleService is restarted .
> The RegisteredExecutors information can't be recorded both in the mode of 
> spark's standalone and spark on k8s , which will cause the 
> RegisteredExecutors information to be lost ,when the ExternalShuffleService 
> is restarted.
> To solve the problem above, a method is proposed and is committed .



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-26403) DataFrame pivot using array column fails with "Unsupported literal type class"

2018-12-18 Thread Huon Wilson (JIRA)
Huon Wilson created SPARK-26403:
---

 Summary: DataFrame pivot using array column fails with 
"Unsupported literal type class"
 Key: SPARK-26403
 URL: https://issues.apache.org/jira/browse/SPARK-26403
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 2.4.0
Reporter: Huon Wilson


Doing a pivot (using the {{pivot(pivotColumn: Column)}} overload) on a column 
containing arrays results in a runtime error:

{code:none}
scala> val df = Seq((1, Seq("a", "x"), 2), (1, Seq("b"), 3), (2, Seq("a", "x"), 
10), (3, Seq(), 100)).toDF("x", "s", "y")
df: org.apache.spark.sql.DataFrame = [x: int, s: array ... 1 more field]

scala> df.show
+---+--+---+
|  x| s|  y|
+---+--+---+
|  1|[a, x]|  2|
|  1|   [b]|  3|
|  2|[a, x]| 10|
|  3|[]|100|
+---+--+---+


scala> df.groupBy("x").pivot("s").agg(collect_list($"y")).show
java.lang.RuntimeException: Unsupported literal type class 
scala.collection.mutable.WrappedArray$ofRef WrappedArray()
  at org.apache.spark.sql.catalyst.expressions.Literal$.apply(literals.scala:78)
  at 
org.apache.spark.sql.RelationalGroupedDataset$$anonfun$pivot$1.apply(RelationalGroupedDataset.scala:419)
  at 
org.apache.spark.sql.RelationalGroupedDataset$$anonfun$pivot$1.apply(RelationalGroupedDataset.scala:419)
  at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
  at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
  at 
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
  at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:35)
  at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
  at scala.collection.AbstractTraversable.map(Traversable.scala:104)
  at 
org.apache.spark.sql.RelationalGroupedDataset.pivot(RelationalGroupedDataset.scala:419)
  at 
org.apache.spark.sql.RelationalGroupedDataset.pivot(RelationalGroupedDataset.scala:397)
  at 
org.apache.spark.sql.RelationalGroupedDataset.pivot(RelationalGroupedDataset.scala:317)
  ... 49 elided
{code}

However, this doesn't seem to be a fundamental limitation with {{pivot}}, as it 
works fine using the {{pivot(pivotColumn: Column, values: Seq[Any])}} overload, 
as long as the arrays are mapped to the {{Array}} type:

{code:none}
scala> val rawValues = df.select("s").distinct.sort("s").collect
rawValues: Array[org.apache.spark.sql.Row] = Array([WrappedArray()], 
[WrappedArray(a, x)], [WrappedArray(b)])

scala> val values = rawValues.map(_.getSeq[String](0).to[Array])
values: Array[Array[String]] = Array(Array(), Array(a, x), Array(b))

scala> df.groupBy("x").pivot("s", values).agg(collect_list($"y")).show
+---+-+--+---+
|  x|   []|[a, x]|[b]|
+---+-+--+---+
|  1|   []|   [2]|[3]|
|  3|[100]|[]| []|
|  2|   []|  [10]| []|
+---+-+--+---+
{code}

It would be nice if {{pivot}} was more resilient to Spark's own representation 
of array columns, and so the first version worked.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-26397) Driver-side only metrics support

2018-12-18 Thread Yuanjian Li (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26397?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yuanjian Li updated SPARK-26397:

Description: 
As the comment in 
[https://github.com/apache/spark/pull/23327#discussion_r242646521,] during the 
work of SPARK-26222 and SPARK-26223, we need the support for driver-side only 
metrics, which will mark the metadata relative metrics as driver-side only and 
will not send them to executor-side.

This issue needs some changes in SparkPlan and SparkPlanInfo, we should also 
check is there any misuse before.

> Driver-side only metrics support
> 
>
> Key: SPARK-26397
> URL: https://issues.apache.org/jira/browse/SPARK-26397
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Affects Versions: 2.4.0
>Reporter: Yuanjian Li
>Priority: Major
> Fix For: 3.0.0
>
>
> As the comment in 
> [https://github.com/apache/spark/pull/23327#discussion_r242646521,] during 
> the work of SPARK-26222 and SPARK-26223, we need the support for driver-side 
> only metrics, which will mark the metadata relative metrics as driver-side 
> only and will not send them to executor-side.
> This issue needs some changes in SparkPlan and SparkPlanInfo, we should also 
> check is there any misuse before.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-26397) Driver-side only metrics support

2018-12-18 Thread Yuanjian Li (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26397?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yuanjian Li updated SPARK-26397:

Environment: (was: As the comment in 
[https://github.com/apache/spark/pull/23327#discussion_r242646521,] during the 
work of SPARK-26222 and SPARK-26223, we need the support for driver-side only 
metrics, which will mark the metadata relative metrics as driver-side only and 
will not send them to executor-side.

This issue needs some changes in SparkPlan and SparkPlanInfo, we should also 
check is there any misuse before.)

> Driver-side only metrics support
> 
>
> Key: SPARK-26397
> URL: https://issues.apache.org/jira/browse/SPARK-26397
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Affects Versions: 2.4.0
>Reporter: Yuanjian Li
>Priority: Major
> Fix For: 3.0.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-26399) Add new stage-level REST APIs and parameters

2018-12-18 Thread Edwina Lu (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26399?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Edwina Lu updated SPARK-26399:
--
Description: 
Add the peak values for the metrics to the stages REST API. Also add a new 
executorSummary REST API, which will return executor summary metrics for a 
specified stage:
{code:java}
curl http://:18080/api/v1/applicationsexecutorSummary{code}

Add parameters to the stages REST API to specify:
*  filtering for task status, and returning tasks that match (for example, 
FAILED tasks).
* task metric quantiles, add adding the task summary if specified
* executor metric quantiles, and adding the executor summary if specified

  was:
Add the peak values for the metrics to the stages REST API. Also add a new 
executorSummary REST API, which will return executor summary metrics for a 
specified stage:
{code:java}
curl http://:18080/api/v1/applicationsexecutorSummary
{code}


> Add new stage-level REST APIs and parameters
> 
>
> Key: SPARK-26399
> URL: https://issues.apache.org/jira/browse/SPARK-26399
> Project: Spark
>  Issue Type: Sub-task
>  Components: Spark Core
>Affects Versions: 2.4.0
>Reporter: Edwina Lu
>Priority: Major
>
> Add the peak values for the metrics to the stages REST API. Also add a new 
> executorSummary REST API, which will return executor summary metrics for a 
> specified stage:
> {code:java}
> curl http://:18080/api/v1/applications/ id>// attempt>/executorSummary{code}
> Add parameters to the stages REST API to specify:
> *  filtering for task status, and returning tasks that match (for example, 
> FAILED tasks).
> * task metric quantiles, add adding the task summary if specified
> * executor metric quantiles, and adding the executor summary if specified



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-26402) Canonicalization on GetStructField

2018-12-18 Thread DB Tsai (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26402?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

DB Tsai updated SPARK-26402:

Summary: Canonicalization on GetStructField  (was: GetStructField with 
different optional names are semantically equal)

> Canonicalization on GetStructField
> --
>
> Key: SPARK-26402
> URL: https://issues.apache.org/jira/browse/SPARK-26402
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Affects Versions: 2.4.0
>Reporter: DB Tsai
>Assignee: DB Tsai
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-26402) Canonicalization on GetStructField

2018-12-18 Thread DB Tsai (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26402?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

DB Tsai updated SPARK-26402:

Description: GetStructField with different optional names should be 
semantically equal. We will use this as building block to compare the nested 
fields used in the plans to be optimized by catalyst optimizer.

> Canonicalization on GetStructField
> --
>
> Key: SPARK-26402
> URL: https://issues.apache.org/jira/browse/SPARK-26402
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Affects Versions: 2.4.0
>Reporter: DB Tsai
>Assignee: DB Tsai
>Priority: Major
>
> GetStructField with different optional names should be semantically equal. We 
> will use this as building block to compare the nested fields used in the 
> plans to be optimized by catalyst optimizer.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-26402) GetStructField with different optional names are semantically equal

2018-12-18 Thread DB Tsai (JIRA)
DB Tsai created SPARK-26402:
---

 Summary: GetStructField with different optional names are 
semantically equal
 Key: SPARK-26402
 URL: https://issues.apache.org/jira/browse/SPARK-26402
 Project: Spark
  Issue Type: Sub-task
  Components: SQL
Affects Versions: 2.4.0
Reporter: DB Tsai
Assignee: DB Tsai






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-25857) Document delegation token code in Spark

2018-12-18 Thread Apache Spark (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-25857?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-25857:


Assignee: Apache Spark

> Document delegation token code in Spark
> ---
>
> Key: SPARK-25857
> URL: https://issues.apache.org/jira/browse/SPARK-25857
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 3.0.0
>Reporter: Marcelo Vanzin
>Assignee: Apache Spark
>Priority: Minor
>
> By this I mean not user documentation, but documenting the functionality 
> provided in the {{org.apache.spark.deploy.security}} and related packages, so 
> that other developers making changes there can refer to it.
> It seems to be a source of confusion every time somebody needs touch that 
> code, so it would be good to have a document explaining how it all works, 
> including how it's hooked up to different resource managers.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-25857) Document delegation token code in Spark

2018-12-18 Thread Apache Spark (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-25857?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-25857:


Assignee: (was: Apache Spark)

> Document delegation token code in Spark
> ---
>
> Key: SPARK-25857
> URL: https://issues.apache.org/jira/browse/SPARK-25857
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 3.0.0
>Reporter: Marcelo Vanzin
>Priority: Minor
>
> By this I mean not user documentation, but documenting the functionality 
> provided in the {{org.apache.spark.deploy.security}} and related packages, so 
> that other developers making changes there can refer to it.
> It seems to be a source of confusion every time somebody needs touch that 
> code, so it would be good to have a document explaining how it all works, 
> including how it's hooked up to different resource managers.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-25857) Document delegation token code in Spark

2018-12-18 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-25857?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16724570#comment-16724570
 ] 

ASF GitHub Bot commented on SPARK-25857:


vanzin opened a new pull request #23348: [SPARK-25857][core] Add developer 
documentation regarding delegation tokens.
URL: https://github.com/apache/spark/pull/23348
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Document delegation token code in Spark
> ---
>
> Key: SPARK-25857
> URL: https://issues.apache.org/jira/browse/SPARK-25857
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 3.0.0
>Reporter: Marcelo Vanzin
>Priority: Minor
>
> By this I mean not user documentation, but documenting the functionality 
> provided in the {{org.apache.spark.deploy.security}} and related packages, so 
> that other developers making changes there can refer to it.
> It seems to be a source of confusion every time somebody needs touch that 
> code, so it would be good to have a document explaining how it all works, 
> including how it's hooked up to different resource managers.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-26401) [k8s] Init container drops necessary config options for pulling jars from azure storage

2018-12-18 Thread Stanis Shkel (JIRA)
Stanis Shkel created SPARK-26401:


 Summary: [k8s] Init container drops necessary config options for 
pulling jars from azure storage
 Key: SPARK-26401
 URL: https://issues.apache.org/jira/browse/SPARK-26401
 Project: Spark
  Issue Type: Bug
  Components: Kubernetes
Affects Versions: 2.3.2
Reporter: Stanis Shkel


I am running spark-submit command that pulls a jar from a remote private azure 
storage account. As far as I understand jar is supposed to be pulled within 
init container of the driver. However, the container doesn't inherit  
"spark.hadoop.fs.azure.account.key.$(STORAGE_ACCT).blob.core.windows.net=$(STORAGE_SECRET)"
 parameter that I pass in when running spark submit.

Here is what I found so far. spark-init container is called via the following 
command 

[https://github.com/apache/spark/blob/branch-2.3/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/entrypoint.sh#L83]

Which in the end turns into the following shell call
{code:bash}

exec /usr/lib/jvm/java-1.8-openjdk/bin/java -cp 
'/opt/spark/conf/:/opt/spark/jars/*' -Xmx1g 
org.apache.spark.deploy.k8s.SparkPodInitContainer 
/etc/spark-init/spark-init.properties

{code}

If I cat out spark-init properties the only parameters that are in there are 

spark.kubernetes.mountDependencies.jarsDownloadDir=/var/spark-data/spark-jars

spark.kubernetes.initContainer.remoteJars=wasbs\://mycontai...@testaccount.blob.core.windows.net/jars/myjar.jar,wasbs\://mycontai...@testaccount.blob.core.windows.net/jars/myjar.jar

spark.kubernetes.mountDependencies.filesDownloadDir=/var/spark-data/spark-files

My guess it's these params 
[https://github.com/apache/spark/blob/branch-2.3/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/initcontainer/BasicInitContainerConfigurationStep.scala#L49]

However the spark.hadoop.fs.azure.account.key is not present in that file nor 
in the environment.

This causes download of the jar fail, the exception is as follows

{code:bash}

Exception in thread "main" org.apache.hadoop.fs.azure.AzureException: 
org.apache.hadoop.fs.azure.AzureException: Container mycontainer in account 
testaccount.blob.core.windows.net not found, and we can't create it using 
anoynomous credentials.
 at 
org.apache.hadoop.fs.azure.AzureNativeFileSystemStore.createAzureStorageSession(AzureNativeFileSystemStore.java:938)
 at 
org.apache.hadoop.fs.azure.AzureNativeFileSystemStore.initialize(AzureNativeFileSystemStore.java:438)
 at 
org.apache.hadoop.fs.azure.NativeAzureFileSystem.initialize(NativeAzureFileSystem.java:1048)
 at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2669)
 at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:94)
 at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2703)
 at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2685)
 at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:373)
 at org.apache.spark.util.Utils$.getHadoopFileSystem(Utils.scala:1910)
 at org.apache.spark.util.Utils$.doFetchFile(Utils.scala:700)
 at org.apache.spark.util.Utils$.fetchFile(Utils.scala:492)
 at 
org.apache.spark.deploy.k8s.FileFetcher.fetchFile(SparkPodInitContainer.scala:91)
 at 
org.apache.spark.deploy.k8s.SparkPodInitContainer$$anonfun$downloadFiles$1$$anonfun$apply$2.apply(SparkPodInitContainer.scala:81)
 at 
org.apache.spark.deploy.k8s.SparkPodInitContainer$$anonfun$downloadFiles$1$$anonfun$apply$2.apply(SparkPodInitContainer.scala:79)
 at 
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
 at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:35)
 at 
org.apache.spark.deploy.k8s.SparkPodInitContainer$$anonfun$downloadFiles$1.apply(SparkPodInitContainer.scala:79)
 at 
org.apache.spark.deploy.k8s.SparkPodInitContainer$$anonfun$downloadFiles$1.apply(SparkPodInitContainer.scala:77)
 at scala.Option.foreach(Option.scala:257)
 at 
org.apache.spark.deploy.k8s.SparkPodInitContainer.downloadFiles(SparkPodInitContainer.scala:77)
 at 
org.apache.spark.deploy.k8s.SparkPodInitContainer.run(SparkPodInitContainer.scala:56)
 at 
org.apache.spark.deploy.k8s.SparkPodInitContainer$.main(SparkPodInitContainer.scala:113)
 at 
org.apache.spark.deploy.k8s.SparkPodInitContainer.main(SparkPodInitContainer.scala)
Caused by: org.apache.hadoop.fs.azure.AzureException: Container qrefinery in 
account jr3e3d.blob.core.windows.net not found, and we can't create it using 
anoynomous credentials.
 at 
org.apache.hadoop.fs.azure.AzureNativeFileSystemStore.connectUsingAnonymousCredentials(AzureNativeFileSystemStore.java:730)
 at 
org.apache.hadoop.fs.azure.AzureNativeFileSystemStore.createAzureStorageSession(AzureNativeFileSystemStore.java:933)
 ... 22 more

{code}

I am certain that the parameter is being passed in to the driver correctly. Due 
to https://issues.apache.org/jira/browse/SPARK-26400 

[jira] [Created] (SPARK-26400) [k8s] Init container silently swallows errors when fetching jars from remote url

2018-12-18 Thread Stanis Shkel (JIRA)
Stanis Shkel created SPARK-26400:


 Summary: [k8s] Init container silently swallows errors when 
fetching jars from remote url
 Key: SPARK-26400
 URL: https://issues.apache.org/jira/browse/SPARK-26400
 Project: Spark
  Issue Type: Bug
  Components: Kubernetes
Affects Versions: 2.3.2
Reporter: Stanis Shkel


I run the following command

{code:bash}

spark-2.3.2-bin-hadoop2.7/bin/spark-submit --name client \
--master "k8s://cluster" \
--deploy-mode cluster \
--conf spark.executor.instances=2 \
--conf spark.executor.memory=5G \
--conf spark.driver.memory=8G \
--conf 
spark.kubernetes.container.image=rego.azurecr.io/spark:spark-2.3.2-hadoop2.7 \
--class au.com.random.DoesntMatter \
"https://fake-link.com/jars/my.jar;

{code}

I expect init container to fail to download jar and get a failure in the init 
stage. Instead I get driver failure with the following message.

{code:bash}

++ id -u
+ myuid=0
++ id -g
+ mygid=0
++ getent passwd 0
+ uidentry=root:x:0:0:root:/root:/bin/ash
+ '[' -z root:x:0:0:root:/root:/bin/ash ']'
+ SPARK_K8S_CMD=driver
+ '[' -z driver ']'
+ shift 1
+ SPARK_CLASSPATH=':/opt/spark/jars/*'
+ env
+ grep SPARK_JAVA_OPT_
+ sed 's/[^=]*=\(.*\)/\1/g'
+ sort -t_ -k4 -n
+ readarray -t SPARK_JAVA_OPTS
+ '[' -n /var/spark-data/spark-jars/my.jar:/var/spark-data/spark-jars/my.jar ']'
+ 
SPARK_CLASSPATH=':/opt/spark/jars/*:/var/spark-data/spark-jars/my.jar:/var/spark-data/spark-jars/my.jar'
+ '[' -n /var/spark-data/spark-files ']'
+ cp -R /var/spark-data/spark-files/. .
+ case "$SPARK_K8S_CMD" in
+ CMD=(${JAVA_HOME}/bin/java "${SPARK_JAVA_OPTS[@]}" -cp "$SPARK_CLASSPATH" 
-Xms$SPARK_DRIVER_MEMORY -Xmx$SPARK_DRIVER_MEMORY 
-Dspark.driver.bindAddress=$SPARK_DRIVER_BIND_ADDRESS $SPARK_DRIVER_CLASS 
$SPARK_DRIVER_ARGS)
+ exec /sbin/tini -s -- /usr/lib/jvm/java-1.8-openjdk/bin/java 
-Dspark.master=k8s://kubernetes:443 
-Dspark.app.id=spark-2f340a028a314e9cb0df8165d887bfb7 
-Dspark.kubernetes.container.image=azure.azurecr.io/spark:spark-2.3.2-hadoop2.7 
-Dspark.submit.deployMode=cluster -Dspark.driver.blockManager.port=7079 
-Dspark.executor.memory=5G 
-Dspark.kubernetes.executor.podNamePrefix=client-f20f30e154a13624a728d6f56d45da3e
 
-Dspark.jars=https://fake-link.com/jars/my.jar,https://fake-link.com/jars/my.jar
 -Dspark.driver.memory=8G -Dspark.driver.port=7078 
-Dspark.kubernetes.driver.pod.name=client-f20f30e154a13624a728d6f56d45da3e-driver
 -Dspark.app.name=client 
-Dspark.kubernetes.initContainer.configMapKey=spark-init.properties 
-Dspark.executor.instances=2 
-Dspark.driver.host=client-f20f30e154a13624a728d6f56d45da3e-driver-svc.default.svc
 
-Dspark.kubernetes.initContainer.configMapName=client-f20f30e154a13624a728d6f56d45da3e-init-config
 -cp 
':/opt/spark/jars/*:/var/spark-data/spark-jars/my.jar:/var/spark-data/spark-jars/my.jar'
 -Xms8G -Xmx8G -Dspark.driver.bindAddress=10.1.0.101 au.com.random.DoesntMatter
Error: Could not find or load main class au.com.random.DoesntMatter

{code}

This happens because spark-init container failed to download the dependencies 
but misreports the status. Here is a log snippet from spark-init container

{code:bash}

++ id -u
+ myuid=0
++ id -g
+ mygid=0
++ getent passwd 0
+ uidentry=root:x:0:0:root:/root:/bin/ash
+ '[' -z root:x:0:0:root:/root:/bin/ash ']'
+ SPARK_K8S_CMD=init
+ '[' -z init ']'
+ shift 1
+ SPARK_CLASSPATH=':/opt/spark/jars/*'
+ env
+ grep SPARK_JAVA_OPT_
+ sed 's/[^=]*=\(.*\)/\1/g'
+ sort -t_ -k4 -n
+ readarray -t SPARK_JAVA_OPTS
+ '[' -n '' ']'
+ '[' -n '' ']'
+ case "$SPARK_K8S_CMD" in
+ CMD=("$SPARK_HOME/bin/spark-class" 
"org.apache.spark.deploy.k8s.SparkPodInitContainer" "$@")
+ exec /sbin/tini -s -- /opt/spark/bin/spark-class 
org.apache.spark.deploy.k8s.SparkPodInitContainer 
/etc/spark-init/spark-init.properties
2018-12-18 21:15:41 INFO SparkPodInitContainer:54 - Starting init-container to 
download Spark application dependencies.
2018-12-18 21:15:41 WARN NativeCodeLoader:62 - Unable to load native-hadoop 
library for your platform... using builtin-java classes where applicable
2018-12-18 21:15:41 INFO SecurityManager:54 - Changing view acls to: root
2018-12-18 21:15:41 INFO SecurityManager:54 - Changing modify acls to: root
2018-12-18 21:15:41 INFO SecurityManager:54 - Changing view acls groups to:
2018-12-18 21:15:41 INFO SecurityManager:54 - Changing modify acls groups to:
2018-12-18 21:15:41 INFO SecurityManager:54 - SecurityManager: authentication 
disabled; ui acls disabled; users with view permissions: Set(root); groups with 
view permissions: Set(); users with modify permissions: Set(root); groups with 
modify permissions: Set()
2018-12-18 21:15:41 INFO SparkPodInitContainer:54 - Downloading remote jars: 
Some(https://fake-link.com/jars/my.jar,https://fake-link.com/jars/my.jar)
2018-12-18 21:15:41 INFO SparkPodInitContainer:54 - Downloading remote files: 
None
2018-12-18 21:15:42 INFO 

[jira] [Resolved] (SPARK-25815) Kerberos Support in Kubernetes resource manager (Client Mode)

2018-12-18 Thread Marcelo Vanzin (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-25815?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Marcelo Vanzin resolved SPARK-25815.

   Resolution: Fixed
Fix Version/s: 3.0.0

Issue resolved by pull request 22911
[https://github.com/apache/spark/pull/22911]

> Kerberos Support in Kubernetes resource manager (Client Mode)
> -
>
> Key: SPARK-25815
> URL: https://issues.apache.org/jira/browse/SPARK-25815
> Project: Spark
>  Issue Type: New Feature
>  Components: Kubernetes
>Affects Versions: 3.0.0
>Reporter: Ilan Filonenko
>Assignee: Marcelo Vanzin
>Priority: Major
> Fix For: 3.0.0
>
>
> Include Kerberos support for Spark on K8S jobs running in client-mode



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-25815) Kerberos Support in Kubernetes resource manager (Client Mode)

2018-12-18 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-25815?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16724486#comment-16724486
 ] 

ASF GitHub Bot commented on SPARK-25815:


asfgit closed pull request #22911: [SPARK-25815][k8s] Support kerberos in 
client mode, keytab-based token renewal.
URL: https://github.com/apache/spark/pull/22911
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala 
b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
index d4055cb6c5853..763bd0a70a035 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
@@ -19,7 +19,7 @@ package org.apache.spark.deploy
 
 import java.io._
 import java.lang.reflect.{InvocationTargetException, Modifier, 
UndeclaredThrowableException}
-import java.net.URL
+import java.net.{URI, URL}
 import java.security.PrivilegedExceptionAction
 import java.text.ParseException
 import java.util.UUID
@@ -334,19 +334,20 @@ private[spark] class SparkSubmit extends Logging {
 val hadoopConf = 
conf.getOrElse(SparkHadoopUtil.newConfiguration(sparkConf))
 val targetDir = Utils.createTempDir()
 
-// assure a keytab is available from any place in a JVM
-if (clusterManager == YARN || clusterManager == LOCAL || isMesosClient || 
isKubernetesCluster) {
-  if (args.principal != null) {
-if (args.keytab != null) {
-  require(new File(args.keytab).exists(), s"Keytab file: 
${args.keytab} does not exist")
-  // Add keytab and principal configurations in sysProps to make them 
available
-  // for later use; e.g. in spark sql, the isolated class loader used 
to talk
-  // to HiveMetastore will use these settings. They will be set as 
Java system
-  // properties and then loaded by SparkConf
-  sparkConf.set(KEYTAB, args.keytab)
-  sparkConf.set(PRINCIPAL, args.principal)
-  UserGroupInformation.loginUserFromKeytab(args.principal, args.keytab)
-}
+// Kerberos is not supported in standalone mode, and keytab support is not 
yet available
+// in Mesos cluster mode.
+if (clusterManager != STANDALONE
+&& !isMesosCluster
+&& args.principal != null
+&& args.keytab != null) {
+  // If client mode, make sure the keytab is just a local path.
+  if (deployMode == CLIENT && Utils.isLocalUri(args.keytab)) {
+args.keytab = new URI(args.keytab).getPath()
+  }
+
+  if (!Utils.isLocalUri(args.keytab)) {
+require(new File(args.keytab).exists(), s"Keytab file: ${args.keytab} 
does not exist")
+UserGroupInformation.loginUserFromKeytab(args.principal, args.keytab)
   }
 }
 
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala
 
b/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala
index 126a6ab801369..f7e3ddecee093 100644
--- 
a/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala
+++ 
b/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.deploy.security
 
 import java.io.File
+import java.net.URI
 import java.security.PrivilegedExceptionAction
 import java.util.concurrent.{ScheduledExecutorService, TimeUnit}
 import java.util.concurrent.atomic.AtomicReference
@@ -71,11 +72,13 @@ private[spark] class HadoopDelegationTokenManager(
   private val providerEnabledConfig = "spark.security.credentials.%s.enabled"
 
   private val principal = sparkConf.get(PRINCIPAL).orNull
-  private val keytab = sparkConf.get(KEYTAB).orNull
+
+  // The keytab can be a local: URI for cluster mode, so translate it to a 
regular path. If it is
+  // needed later on, the code will check that it exists.
+  private val keytab = sparkConf.get(KEYTAB).map { uri => new 
URI(uri).getPath() }.orNull
 
   require((principal == null) == (keytab == null),
 "Both principal and keytab must be defined, or neither.")
-  require(keytab == null || new File(keytab).isFile(), s"Cannot find keytab at 
$keytab.")
 
   private val delegationTokenProviders = loadProviders()
   logDebug("Using the following builtin delegation token providers: " +
@@ -264,6 +267,7 @@ private[spark] class HadoopDelegationTokenManager(
 
   private def doLogin(): UserGroupInformation = {
 logInfo(s"Attempting to login to KDC using principal: $principal")
+require(new File(keytab).isFile(), s"Cannot find keytab at $keytab.")
 val ugi = UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal, 
keytab)
 

[jira] [Assigned] (SPARK-25815) Kerberos Support in Kubernetes resource manager (Client Mode)

2018-12-18 Thread Marcelo Vanzin (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-25815?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Marcelo Vanzin reassigned SPARK-25815:
--

Assignee: Marcelo Vanzin

> Kerberos Support in Kubernetes resource manager (Client Mode)
> -
>
> Key: SPARK-25815
> URL: https://issues.apache.org/jira/browse/SPARK-25815
> Project: Spark
>  Issue Type: New Feature
>  Components: Kubernetes
>Affects Versions: 3.0.0
>Reporter: Ilan Filonenko
>Assignee: Marcelo Vanzin
>Priority: Major
> Fix For: 3.0.0
>
>
> Include Kerberos support for Spark on K8S jobs running in client-mode



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-26399) Add new stage-level REST APIs and parameters

2018-12-18 Thread Edwina Lu (JIRA)
Edwina Lu created SPARK-26399:
-

 Summary: Add new stage-level REST APIs and parameters
 Key: SPARK-26399
 URL: https://issues.apache.org/jira/browse/SPARK-26399
 Project: Spark
  Issue Type: Sub-task
  Components: Spark Core
Affects Versions: 2.4.0
Reporter: Edwina Lu


Add the peak values for the metrics to the stages REST API. Also add a new 
executorSummary REST API, which will return executor summary metrics for a 
specified stage:
{code:java}
curl http://:18080/api/v1/applicationsexecutorSummary
{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-23431) Expose the new executor memory metrics at the stage level

2018-12-18 Thread Edwina Lu (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-23431?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Edwina Lu updated SPARK-23431:
--
Description: 
Collect and show the new executor memory metrics for each stage, to provide 
more information on how memory is used per stage.

Modify the AppStatusListener to track the peak values for JVM used memory, 
execution memory, storage memory, and unified memory for each executor for each 
stage.

This is a subtask for SPARK-23206. Please refer to the design doc for that 
ticket for more details.

  was:
Collect and show the new executor memory metrics for each stage, to provide 
more information on how memory is used per stage.

Modify the AppStatusListener to track the peak values for JVM used memory, 
execution memory, storage memory, and unified memory for each executor for each 
stage.

Add the peak values for the metrics to the stages REST API. Also add a new 
stageSummary REST API, which will return executor summary metrics for a 
specified stage:
{code:java}
curl http://:18080/api/v1/applicationsexecutorSummary{code}
This is a subtask for SPARK-23206. Please refer to the design doc for that 
ticket for more details.


> Expose the new executor memory metrics at the stage level
> -
>
> Key: SPARK-23431
> URL: https://issues.apache.org/jira/browse/SPARK-23431
> Project: Spark
>  Issue Type: Sub-task
>  Components: Spark Core
>Affects Versions: 2.2.1
>Reporter: Edwina Lu
>Priority: Major
>
> Collect and show the new executor memory metrics for each stage, to provide 
> more information on how memory is used per stage.
> Modify the AppStatusListener to track the peak values for JVM used memory, 
> execution memory, storage memory, and unified memory for each executor for 
> each stage.
> This is a subtask for SPARK-23206. Please refer to the design doc for that 
> ticket for more details.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-23431) Expose the new executor memory metrics at the stage level

2018-12-18 Thread Edwina Lu (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-23431?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16724472#comment-16724472
 ] 

Edwina Lu commented on SPARK-23431:
---

Splitting off the new REST APIs into a new subtask.

> Expose the new executor memory metrics at the stage level
> -
>
> Key: SPARK-23431
> URL: https://issues.apache.org/jira/browse/SPARK-23431
> Project: Spark
>  Issue Type: Sub-task
>  Components: Spark Core
>Affects Versions: 2.2.1
>Reporter: Edwina Lu
>Priority: Major
>
> Collect and show the new executor memory metrics for each stage, to provide 
> more information on how memory is used per stage.
> Modify the AppStatusListener to track the peak values for JVM used memory, 
> execution memory, storage memory, and unified memory for each executor for 
> each stage.
> This is a subtask for SPARK-23206. Please refer to the design doc for that 
> ticket for more details.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-25869) Spark on YARN: the original diagnostics is missing when job failed maxAppAttempts times

2018-12-18 Thread Marcelo Vanzin (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-25869?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Marcelo Vanzin reassigned SPARK-25869:
--

Assignee: (was: Marcelo Vanzin)

> Spark on YARN: the original diagnostics is missing when job failed 
> maxAppAttempts times
> ---
>
> Key: SPARK-25869
> URL: https://issues.apache.org/jira/browse/SPARK-25869
> Project: Spark
>  Issue Type: Bug
>  Components: YARN
>Affects Versions: 2.1.1
>Reporter: Yeliang Cang
>Priority: Major
>
> When configure spark on yarn, I submit job using below command:
> {code}
>  spark-submit  --class org.apache.spark.examples.SparkPi     --master yarn    
>  --deploy-mode cluster     --driver-memory 127m  --driver-cores 1   
> --executor-memory 2048m     --executor-cores 1    --num-executors 10  --queue 
> root.mr --conf spark.testing.reservedMemory=1048576 --conf 
> spark.yarn.executor.memoryOverhead=50 --conf 
> spark.yarn.driver.memoryOverhead=50 
> /opt/ZDH/parcels/lib/spark/examples/jars/spark-examples* 1
> {code}
> Apparently, the driver memory is not enough, but this can not be seen in 
> spark client log:
> {code}
> 2018-10-29 19:28:34,658 INFO org.apache.spark.deploy.yarn.Client: Application 
> report for application_1540536615315_0013 (state: ACCEPTED)
> 2018-10-29 19:28:35,660 INFO org.apache.spark.deploy.yarn.Client: Application 
> report for application_1540536615315_0013 (state: RUNNING)
> 2018-10-29 19:28:35,660 INFO org.apache.spark.deploy.yarn.Client:
>  client token: N/A
>  diagnostics: N/A
>  ApplicationMaster host: 10.43.183.143
>  ApplicationMaster RPC port: 0
>  queue: root.mr
>  start time: 1540812501560
>  final status: UNDEFINED
>  tracking URL: http://zdh141:8088/proxy/application_1540536615315_0013/
>  user: mr
> 2018-10-29 19:28:36,663 INFO org.apache.spark.deploy.yarn.Client: Application 
> report for application_1540536615315_0013 (state: FINISHED)
> 2018-10-29 19:28:36,663 INFO org.apache.spark.deploy.yarn.Client:
>  client token: N/A
>  diagnostics: Shutdown hook called before final status was reported.
>  ApplicationMaster host: 10.43.183.143
>  ApplicationMaster RPC port: 0
>  queue: root.mr
>  start time: 1540812501560
>  final status: FAILED
>  tracking URL: http://zdh141:8088/proxy/application_1540536615315_0013/
>  user: mr
> Exception in thread "main" org.apache.spark.SparkException: Application 
> application_1540536615315_0013 finished with failed status
>  at org.apache.spark.deploy.yarn.Client.run(Client.scala:1137)
>  at org.apache.spark.deploy.yarn.Client$.main(Client.scala:1183)
>  at org.apache.spark.deploy.yarn.Client.main(Client.scala)
>  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>  at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>  at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>  at java.lang.reflect.Method.invoke(Method.java:498)
>  at 
> org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:775)
>  at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180)
>  at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
>  at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:119)
>  at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
> 2018-10-29 19:28:36,694 INFO org.apache.spark.util.ShutdownHookManager: 
> Shutdown hook called
> 2018-10-29 19:28:36,695 INFO org.apache.spark.util.ShutdownHookManager: 
> Deleting directory /tmp/spark-96077be5-0dfa-496d-a6a0-96e83393a8d9
> {code}
>  
>  
> Solution: after apply the patch, spark client log can be shown as:
> {code}
> 2018-10-29 19:27:32,962 INFO org.apache.spark.deploy.yarn.Client: Application 
> report for application_1540536615315_0012 (state: RUNNING)
> 2018-10-29 19:27:32,962 INFO org.apache.spark.deploy.yarn.Client:
>  client token: N/A
>  diagnostics: N/A
>  ApplicationMaster host: 10.43.183.143
>  ApplicationMaster RPC port: 0
>  queue: root.mr
>  start time: 1540812436656
>  final status: UNDEFINED
>  tracking URL: http://zdh141:8088/proxy/application_1540536615315_0012/
>  user: mr
> 2018-10-29 19:27:33,964 INFO org.apache.spark.deploy.yarn.Client: Application 
> report for application_1540536615315_0012 (state: FAILED)
> 2018-10-29 19:27:33,964 INFO org.apache.spark.deploy.yarn.Client:
>  client token: N/A
>  diagnostics: Application application_1540536615315_0012 failed 2 times due 
> to AM Container for appattempt_1540536615315_0012_02 exited with 
> exitCode: -104
> For more detailed output, check application tracking 
> page:http://zdh141:8088/cluster/app/application_1540536615315_0012Then, click 
> on links to logs of each attempt.
> Diagnostics: virtual memory used. Killing 

[jira] [Commented] (SPARK-25869) Spark on YARN: the original diagnostics is missing when job failed maxAppAttempts times

2018-12-18 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-25869?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16724453#comment-16724453
 ] 

ASF GitHub Bot commented on SPARK-25869:


vanzin closed pull request #22876: [SPARK-25869] [YARN] the original 
diagnostics is missing when job failed ma…
URL: https://github.com/apache/spark/pull/22876
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index 8f94e3f731007..57f0a7f05b2e5 100644
--- 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -293,6 +293,9 @@ private[spark] class ApplicationMaster(args: 
ApplicationMasterArguments) extends
 }
 
 if (!unregistered) {
+  logInfo("Waiting for " + sparkConf.get("spark.yarn.report.interval", 
"1000").toInt +"ms to unregister am," +
+" so the client can have the right diagnostics msg!")
+  Thread.sleep(sparkConf.get("spark.yarn.report.interval", 
"1000").toInt)
   // we only want to unregister if we don't want the RM to retry
   if (finalStatus == FinalApplicationStatus.SUCCEEDED || 
isLastAttempt) {
 unregister(finalStatus, finalMsg)


 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Spark on YARN: the original diagnostics is missing when job failed 
> maxAppAttempts times
> ---
>
> Key: SPARK-25869
> URL: https://issues.apache.org/jira/browse/SPARK-25869
> Project: Spark
>  Issue Type: Bug
>  Components: YARN
>Affects Versions: 2.1.1
>Reporter: Yeliang Cang
>Priority: Major
>
> When configure spark on yarn, I submit job using below command:
> {code}
>  spark-submit  --class org.apache.spark.examples.SparkPi     --master yarn    
>  --deploy-mode cluster     --driver-memory 127m  --driver-cores 1   
> --executor-memory 2048m     --executor-cores 1    --num-executors 10  --queue 
> root.mr --conf spark.testing.reservedMemory=1048576 --conf 
> spark.yarn.executor.memoryOverhead=50 --conf 
> spark.yarn.driver.memoryOverhead=50 
> /opt/ZDH/parcels/lib/spark/examples/jars/spark-examples* 1
> {code}
> Apparently, the driver memory is not enough, but this can not be seen in 
> spark client log:
> {code}
> 2018-10-29 19:28:34,658 INFO org.apache.spark.deploy.yarn.Client: Application 
> report for application_1540536615315_0013 (state: ACCEPTED)
> 2018-10-29 19:28:35,660 INFO org.apache.spark.deploy.yarn.Client: Application 
> report for application_1540536615315_0013 (state: RUNNING)
> 2018-10-29 19:28:35,660 INFO org.apache.spark.deploy.yarn.Client:
>  client token: N/A
>  diagnostics: N/A
>  ApplicationMaster host: 10.43.183.143
>  ApplicationMaster RPC port: 0
>  queue: root.mr
>  start time: 1540812501560
>  final status: UNDEFINED
>  tracking URL: http://zdh141:8088/proxy/application_1540536615315_0013/
>  user: mr
> 2018-10-29 19:28:36,663 INFO org.apache.spark.deploy.yarn.Client: Application 
> report for application_1540536615315_0013 (state: FINISHED)
> 2018-10-29 19:28:36,663 INFO org.apache.spark.deploy.yarn.Client:
>  client token: N/A
>  diagnostics: Shutdown hook called before final status was reported.
>  ApplicationMaster host: 10.43.183.143
>  ApplicationMaster RPC port: 0
>  queue: root.mr
>  start time: 1540812501560
>  final status: FAILED
>  tracking URL: http://zdh141:8088/proxy/application_1540536615315_0013/
>  user: mr
> Exception in thread "main" org.apache.spark.SparkException: Application 
> application_1540536615315_0013 finished with failed status
>  at org.apache.spark.deploy.yarn.Client.run(Client.scala:1137)
>  at org.apache.spark.deploy.yarn.Client$.main(Client.scala:1183)
>  at org.apache.spark.deploy.yarn.Client.main(Client.scala)
>  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>  at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>  at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>  at java.lang.reflect.Method.invoke(Method.java:498)
>  at 
> 

[jira] [Assigned] (SPARK-25869) Spark on YARN: the original diagnostics is missing when job failed maxAppAttempts times

2018-12-18 Thread Marcelo Vanzin (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-25869?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Marcelo Vanzin reassigned SPARK-25869:
--

Assignee: Marcelo Vanzin

> Spark on YARN: the original diagnostics is missing when job failed 
> maxAppAttempts times
> ---
>
> Key: SPARK-25869
> URL: https://issues.apache.org/jira/browse/SPARK-25869
> Project: Spark
>  Issue Type: Bug
>  Components: YARN
>Affects Versions: 2.1.1
>Reporter: Yeliang Cang
>Assignee: Marcelo Vanzin
>Priority: Major
>
> When configure spark on yarn, I submit job using below command:
> {code}
>  spark-submit  --class org.apache.spark.examples.SparkPi     --master yarn    
>  --deploy-mode cluster     --driver-memory 127m  --driver-cores 1   
> --executor-memory 2048m     --executor-cores 1    --num-executors 10  --queue 
> root.mr --conf spark.testing.reservedMemory=1048576 --conf 
> spark.yarn.executor.memoryOverhead=50 --conf 
> spark.yarn.driver.memoryOverhead=50 
> /opt/ZDH/parcels/lib/spark/examples/jars/spark-examples* 1
> {code}
> Apparently, the driver memory is not enough, but this can not be seen in 
> spark client log:
> {code}
> 2018-10-29 19:28:34,658 INFO org.apache.spark.deploy.yarn.Client: Application 
> report for application_1540536615315_0013 (state: ACCEPTED)
> 2018-10-29 19:28:35,660 INFO org.apache.spark.deploy.yarn.Client: Application 
> report for application_1540536615315_0013 (state: RUNNING)
> 2018-10-29 19:28:35,660 INFO org.apache.spark.deploy.yarn.Client:
>  client token: N/A
>  diagnostics: N/A
>  ApplicationMaster host: 10.43.183.143
>  ApplicationMaster RPC port: 0
>  queue: root.mr
>  start time: 1540812501560
>  final status: UNDEFINED
>  tracking URL: http://zdh141:8088/proxy/application_1540536615315_0013/
>  user: mr
> 2018-10-29 19:28:36,663 INFO org.apache.spark.deploy.yarn.Client: Application 
> report for application_1540536615315_0013 (state: FINISHED)
> 2018-10-29 19:28:36,663 INFO org.apache.spark.deploy.yarn.Client:
>  client token: N/A
>  diagnostics: Shutdown hook called before final status was reported.
>  ApplicationMaster host: 10.43.183.143
>  ApplicationMaster RPC port: 0
>  queue: root.mr
>  start time: 1540812501560
>  final status: FAILED
>  tracking URL: http://zdh141:8088/proxy/application_1540536615315_0013/
>  user: mr
> Exception in thread "main" org.apache.spark.SparkException: Application 
> application_1540536615315_0013 finished with failed status
>  at org.apache.spark.deploy.yarn.Client.run(Client.scala:1137)
>  at org.apache.spark.deploy.yarn.Client$.main(Client.scala:1183)
>  at org.apache.spark.deploy.yarn.Client.main(Client.scala)
>  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>  at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>  at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>  at java.lang.reflect.Method.invoke(Method.java:498)
>  at 
> org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:775)
>  at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180)
>  at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
>  at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:119)
>  at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
> 2018-10-29 19:28:36,694 INFO org.apache.spark.util.ShutdownHookManager: 
> Shutdown hook called
> 2018-10-29 19:28:36,695 INFO org.apache.spark.util.ShutdownHookManager: 
> Deleting directory /tmp/spark-96077be5-0dfa-496d-a6a0-96e83393a8d9
> {code}
>  
>  
> Solution: after apply the patch, spark client log can be shown as:
> {code}
> 2018-10-29 19:27:32,962 INFO org.apache.spark.deploy.yarn.Client: Application 
> report for application_1540536615315_0012 (state: RUNNING)
> 2018-10-29 19:27:32,962 INFO org.apache.spark.deploy.yarn.Client:
>  client token: N/A
>  diagnostics: N/A
>  ApplicationMaster host: 10.43.183.143
>  ApplicationMaster RPC port: 0
>  queue: root.mr
>  start time: 1540812436656
>  final status: UNDEFINED
>  tracking URL: http://zdh141:8088/proxy/application_1540536615315_0012/
>  user: mr
> 2018-10-29 19:27:33,964 INFO org.apache.spark.deploy.yarn.Client: Application 
> report for application_1540536615315_0012 (state: FAILED)
> 2018-10-29 19:27:33,964 INFO org.apache.spark.deploy.yarn.Client:
>  client token: N/A
>  diagnostics: Application application_1540536615315_0012 failed 2 times due 
> to AM Container for appattempt_1540536615315_0012_02 exited with 
> exitCode: -104
> For more detailed output, check application tracking 
> page:http://zdh141:8088/cluster/app/application_1540536615315_0012Then, click 
> on links to logs of each attempt.
> Diagnostics: virtual 

[jira] [Commented] (SPARK-26398) Support building GPU docker images

2018-12-18 Thread Rong Ou (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-26398?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16724436#comment-16724436
 ] 

Rong Ou commented on SPARK-26398:
-

https://github.com/apache/spark/pull/23347

> Support building GPU docker images
> --
>
> Key: SPARK-26398
> URL: https://issues.apache.org/jira/browse/SPARK-26398
> Project: Spark
>  Issue Type: Improvement
>  Components: Kubernetes
>Affects Versions: 2.4.0
>Reporter: Rong Ou
>Priority: Minor
>
> To run Spark on Kubernetes, a user first needs to build docker images using 
> the `bin/docker-image-tool.sh` script. However, this script only supports 
> building images for running on CPUs. As parts of Spark and related libraries 
> (e.g. XGBoost) get accelerated on GPUs, it's desirable to build base images 
> that can take advantage of GPU acceleration.
> This issue only addresses building docker images with CUDA support. Actually 
> accelerating Spark on GPUs is outside the scope, as is supporting other types 
> of GPUs.
> Today if anyone wants to experiment with running Spark on Kubernetes with GPU 
> support, they have to write their own custom `Dockerfile`. By providing an 
> "official" way to build GPU-enabled docker images, we can make it easier to 
> get started.
> For now probably not that many people care about this, but it's a necessary 
> first step towards GPU acceleration for Spark on Kubernetes.
> The risks are minimal as we only need to make minor changes to 
> `bin/docker-image-tool.sh`. The PR is already done and will be attached. 
> Success means anyone can easily build Spark docker images with GPU support.
> Proposed API changes: add an optional  `-g` flag to 
> `bin/docker-image-tool.sh` for building GPU versions of the JVM/Python/R 
> docker images. When the `-g` is omitted, existing behavior is preserved.
> Design sketch: when the `-g` flag is specified, we append `-gpu` to the 
> docker image names, and switch to dockerfiles based on the official CUDA 
> images. Since the CUDA images are based on Ubuntu while the Spark dockerfiles 
> are based on Alpine, steps for setting up additional packages are different, 
> so there are a parallel set of `Dockerfile.gpu` files.
> Alternative: if we are willing to forego Alpine and switch to Ubuntu for the 
> CPU-only images, the two sets of dockerfiles can be unified, and we can just 
> pass in a different base image depending on whether the `-g` flag is present 
> or not.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-26398) Support building GPU docker images

2018-12-18 Thread Rong Ou (JIRA)
Rong Ou created SPARK-26398:
---

 Summary: Support building GPU docker images
 Key: SPARK-26398
 URL: https://issues.apache.org/jira/browse/SPARK-26398
 Project: Spark
  Issue Type: Improvement
  Components: Kubernetes
Affects Versions: 2.4.0
Reporter: Rong Ou


To run Spark on Kubernetes, a user first needs to build docker images using the 
`bin/docker-image-tool.sh` script. However, this script only supports building 
images for running on CPUs. As parts of Spark and related libraries (e.g. 
XGBoost) get accelerated on GPUs, it's desirable to build base images that can 
take advantage of GPU acceleration.

This issue only addresses building docker images with CUDA support. Actually 
accelerating Spark on GPUs is outside the scope, as is supporting other types 
of GPUs.

Today if anyone wants to experiment with running Spark on Kubernetes with GPU 
support, they have to write their own custom `Dockerfile`. By providing an 
"official" way to build GPU-enabled docker images, we can make it easier to get 
started.

For now probably not that many people care about this, but it's a necessary 
first step towards GPU acceleration for Spark on Kubernetes.

The risks are minimal as we only need to make minor changes to 
`bin/docker-image-tool.sh`. The PR is already done and will be attached. 
Success means anyone can easily build Spark docker images with GPU support.

Proposed API changes: add an optional  `-g` flag to `bin/docker-image-tool.sh` 
for building GPU versions of the JVM/Python/R docker images. When the `-g` is 
omitted, existing behavior is preserved.

Design sketch: when the `-g` flag is specified, we append `-gpu` to the docker 
image names, and switch to dockerfiles based on the official CUDA images. Since 
the CUDA images are based on Ubuntu while the Spark dockerfiles are based on 
Alpine, steps for setting up additional packages are different, so there are a 
parallel set of `Dockerfile.gpu` files.

Alternative: if we are willing to forego Alpine and switch to Ubuntu for the 
CPU-only images, the two sets of dockerfiles can be unified, and we can just 
pass in a different base image depending on whether the `-g` flag is present or 
not.

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-25953) install jdk11 on jenkins workers

2018-12-18 Thread shane knapp (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-25953?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16724361#comment-16724361
 ] 

shane knapp commented on SPARK-25953:
-

no, not yet.  i just got back from vacation and am planning on getting to this 
in the next couple of days (before i head out for xmas).

> install jdk11 on jenkins workers
> 
>
> Key: SPARK-25953
> URL: https://issues.apache.org/jira/browse/SPARK-25953
> Project: Spark
>  Issue Type: Sub-task
>  Components: Build
>Affects Versions: 3.0.0
>Reporter: shane knapp
>Assignee: shane knapp
>Priority: Critical
>
> once we pin down exact what we want installed on the jenkins workers, i will 
> add it to our ansible and deploy.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-26394) Annotation error for Utils.timeStringAsMs

2018-12-18 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-26394?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16724325#comment-16724325
 ] 

ASF GitHub Bot commented on SPARK-26394:


srowen closed pull request #23346: [SPARK-26394][core] Fix annotation error for 
Utils.timeStringAsMs
URL: https://github.com/apache/spark/pull/23346
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala 
b/core/src/main/scala/org/apache/spark/util/Utils.scala
index c8b148be84536..8f86b472b9373 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -1085,7 +1085,7 @@ private[spark] object Utils extends Logging {
   }
 
   /**
-   * Convert a time parameter such as (50s, 100ms, or 250us) to microseconds 
for internal use. If
+   * Convert a time parameter such as (50s, 100ms, or 250us) to milliseconds 
for internal use. If
* no suffix is provided, the passed number is assumed to be in ms.
*/
   def timeStringAsMs(str: String): Long = {


 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Annotation error for Utils.timeStringAsMs
> -
>
> Key: SPARK-26394
> URL: https://issues.apache.org/jira/browse/SPARK-26394
> Project: Spark
>  Issue Type: Bug
>  Components: Documentation, Spark Core
>Affects Versions: 2.4.0
>Reporter: Jackey Lee
>Assignee: Jackey Lee
>Priority: Trivial
> Fix For: 2.3.3, 2.4.1, 3.0.0
>
>
> Utils.timeStringAsMs() is parsing time to milliseconds, but in annotation, it 
> says "Convert a time parameter such as (50s, 100ms, or 250us) to microseconds 
> for internal use."
> Thus, microseconds should be changed to milliseconds.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-26394) Annotation error for Utils.timeStringAsMs

2018-12-18 Thread Sean Owen (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26394?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sean Owen resolved SPARK-26394.
---
   Resolution: Fixed
Fix Version/s: 2.4.1
   3.0.0
   2.3.3

Issue resolved by pull request 23346
[https://github.com/apache/spark/pull/23346]

> Annotation error for Utils.timeStringAsMs
> -
>
> Key: SPARK-26394
> URL: https://issues.apache.org/jira/browse/SPARK-26394
> Project: Spark
>  Issue Type: Bug
>  Components: Documentation, Spark Core
>Affects Versions: 2.4.0
>Reporter: Jackey Lee
>Assignee: Jackey Lee
>Priority: Trivial
> Fix For: 2.3.3, 3.0.0, 2.4.1
>
>
> Utils.timeStringAsMs() is parsing time to milliseconds, but in annotation, it 
> says "Convert a time parameter such as (50s, 100ms, or 250us) to microseconds 
> for internal use."
> Thus, microseconds should be changed to milliseconds.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-26382) prefix sorter should handle -0.0

2018-12-18 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-26382?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16724319#comment-16724319
 ] 

ASF GitHub Bot commented on SPARK-26382:


asfgit closed pull request #23334: [SPARK-26382][CORE] prefix comparator should 
handle -0.0
URL: https://github.com/apache/spark/pull/23334
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/PrefixComparators.java
 
b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/PrefixComparators.java
index 0910db22af004..bef1bdadb27aa 100644
--- 
a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/PrefixComparators.java
+++ 
b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/PrefixComparators.java
@@ -69,6 +69,8 @@ public static long computePrefix(byte[] bytes) {
  * details see http://stereopsis.com/radix.html.
  */
 public static long computePrefix(double value) {
+  // normalize -0.0 to 0.0, as they should be equal
+  value = value == -0.0 ? 0.0 : value;
   // Java's doubleToLongBits already canonicalizes all NaN values to the 
smallest possible
   // positive NaN, so there's nothing special we need to do for NaNs.
   long bits = Double.doubleToLongBits(value);
diff --git 
a/core/src/test/scala/org/apache/spark/util/collection/unsafe/sort/PrefixComparatorsSuite.scala
 
b/core/src/test/scala/org/apache/spark/util/collection/unsafe/sort/PrefixComparatorsSuite.scala
index 73546ef1b7a60..38cb37c524594 100644
--- 
a/core/src/test/scala/org/apache/spark/util/collection/unsafe/sort/PrefixComparatorsSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/util/collection/unsafe/sort/PrefixComparatorsSuite.scala
@@ -125,6 +125,7 @@ class PrefixComparatorsSuite extends SparkFunSuite with 
PropertyChecks {
 val nan2Prefix = 
PrefixComparators.DoublePrefixComparator.computePrefix(nan2)
 assert(nan1Prefix === nan2Prefix)
 val doubleMaxPrefix = 
PrefixComparators.DoublePrefixComparator.computePrefix(Double.MaxValue)
+// NaN is greater than the max double value.
 assert(PrefixComparators.DOUBLE.compare(nan1Prefix, doubleMaxPrefix) === 1)
   }
 
@@ -134,22 +135,34 @@ class PrefixComparatorsSuite extends SparkFunSuite with 
PropertyChecks {
 assert(java.lang.Double.doubleToRawLongBits(negativeNan) < 0)
 val prefix = 
PrefixComparators.DoublePrefixComparator.computePrefix(negativeNan)
 val doubleMaxPrefix = 
PrefixComparators.DoublePrefixComparator.computePrefix(Double.MaxValue)
+// -NaN is greater than the max double value.
 assert(PrefixComparators.DOUBLE.compare(prefix, doubleMaxPrefix) === 1)
   }
 
   test("double prefix comparator handles other special values properly") {
-val nullValue = 0L
+// See `SortPrefix.nullValue` for how we deal with nulls for float/double 
type
+val smallestNullPrefix = 0L
+val largestNullPrefix = -1L
 val nan = 
PrefixComparators.DoublePrefixComparator.computePrefix(Double.NaN)
 val posInf = 
PrefixComparators.DoublePrefixComparator.computePrefix(Double.PositiveInfinity)
 val negInf = 
PrefixComparators.DoublePrefixComparator.computePrefix(Double.NegativeInfinity)
 val minValue = 
PrefixComparators.DoublePrefixComparator.computePrefix(Double.MinValue)
 val maxValue = 
PrefixComparators.DoublePrefixComparator.computePrefix(Double.MaxValue)
 val zero = PrefixComparators.DoublePrefixComparator.computePrefix(0.0)
+val minusZero = 
PrefixComparators.DoublePrefixComparator.computePrefix(-0.0)
+
+// null is greater than everything including NaN, when we need to treat it 
as the largest value.
+assert(PrefixComparators.DOUBLE.compare(largestNullPrefix, nan) === 1)
+// NaN is greater than the positive infinity.
 assert(PrefixComparators.DOUBLE.compare(nan, posInf) === 1)
 assert(PrefixComparators.DOUBLE.compare(posInf, maxValue) === 1)
 assert(PrefixComparators.DOUBLE.compare(maxValue, zero) === 1)
 assert(PrefixComparators.DOUBLE.compare(zero, minValue) === 1)
 assert(PrefixComparators.DOUBLE.compare(minValue, negInf) === 1)
-assert(PrefixComparators.DOUBLE.compare(negInf, nullValue) === 1)
+// null is smaller than everything including negative infinity, when we 
need to treat it as
+// the smallest value.
+assert(PrefixComparators.DOUBLE.compare(negInf, smallestNullPrefix) === 1)
+// 0.0 should be equal to -0.0.
+assert(PrefixComparators.DOUBLE.compare(zero, minusZero) === 0)
   }
 }


 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub 

[jira] [Created] (SPARK-26397) Driver-side only metrics support

2018-12-18 Thread Yuanjian Li (JIRA)
Yuanjian Li created SPARK-26397:
---

 Summary: Driver-side only metrics support
 Key: SPARK-26397
 URL: https://issues.apache.org/jira/browse/SPARK-26397
 Project: Spark
  Issue Type: Sub-task
  Components: SQL
Affects Versions: 2.4.0
 Environment: As the comment in 
[https://github.com/apache/spark/pull/23327#discussion_r242646521,] during the 
work of SPARK-26222 and SPARK-26223, we need the support for driver-side only 
metrics, which will mark the metadata relative metrics as driver-side only and 
will not send them to executor-side.

This issue needs some changes in SparkPlan and SparkPlanInfo, we should also 
check is there any misuse before.
Reporter: Yuanjian Li
 Fix For: 3.0.0






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-26394) Annotation error for Utils.timeStringAsMs

2018-12-18 Thread Sean Owen (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26394?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sean Owen reassigned SPARK-26394:
-

Assignee: Jackey Lee

> Annotation error for Utils.timeStringAsMs
> -
>
> Key: SPARK-26394
> URL: https://issues.apache.org/jira/browse/SPARK-26394
> Project: Spark
>  Issue Type: Bug
>  Components: Documentation, Spark Core
>Affects Versions: 2.4.0
>Reporter: Jackey Lee
>Assignee: Jackey Lee
>Priority: Trivial
> Fix For: 2.3.3, 2.4.1, 3.0.0
>
>
> Utils.timeStringAsMs() is parsing time to milliseconds, but in annotation, it 
> says "Convert a time parameter such as (50s, 100ms, or 250us) to microseconds 
> for internal use."
> Thus, microseconds should be changed to milliseconds.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-26394) Annotation error for Utils.timeStringAsMs

2018-12-18 Thread Sean Owen (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26394?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sean Owen updated SPARK-26394:
--
   Priority: Trivial  (was: Minor)
Component/s: Documentation

This doesn't need a JIRA if the 'what' and 'how' are the same

> Annotation error for Utils.timeStringAsMs
> -
>
> Key: SPARK-26394
> URL: https://issues.apache.org/jira/browse/SPARK-26394
> Project: Spark
>  Issue Type: Bug
>  Components: Documentation, Spark Core
>Affects Versions: 2.4.0
>Reporter: Jackey Lee
>Priority: Trivial
>
> Utils.timeStringAsMs() is parsing time to milliseconds, but in annotation, it 
> says "Convert a time parameter such as (50s, 100ms, or 250us) to microseconds 
> for internal use."
> Thus, microseconds should be changed to milliseconds.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-26382) prefix sorter should handle -0.0

2018-12-18 Thread Dongjoon Hyun (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26382?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dongjoon Hyun resolved SPARK-26382.
---
   Resolution: Fixed
Fix Version/s: 3.0.0
   2.4.1

This is resolved via https://github.com/apache/spark/pull/23334 

> prefix sorter should handle -0.0
> 
>
> Key: SPARK-26382
> URL: https://issues.apache.org/jira/browse/SPARK-26382
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.0.0
>Reporter: Wenchen Fan
>Assignee: Wenchen Fan
>Priority: Major
> Fix For: 2.4.1, 3.0.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-26316) Because of the perf degradation in TPC-DS, we currently partial revert SPARK-21052:Add hash map metrics to join,

2018-12-18 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-26316?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16724258#comment-16724258
 ] 

ASF GitHub Bot commented on SPARK-26316:


dongjoon-hyun closed pull request #23319: [SPARK-26316][BRANCH-2.3] Revert hash 
join metrics in spark 21052 that causes performance degradation
URL: https://github.com/apache/spark/pull/23319
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoinExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoinExec.scala
index 1918fcc5482db..13fa926d3366a 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoinExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoinExec.scala
@@ -47,8 +47,7 @@ case class BroadcastHashJoinExec(
   extends BinaryExecNode with HashJoin with CodegenSupport {
 
   override lazy val metrics = Map(
-"numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output 
rows"),
-"avgHashProbe" -> SQLMetrics.createAverageMetric(sparkContext, "avg hash 
probe"))
+"numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output 
rows"))
 
   override def requiredChildDistribution: Seq[Distribution] = {
 val mode = HashedRelationBroadcastMode(buildKeys)
@@ -62,13 +61,12 @@ case class BroadcastHashJoinExec(
 
   protected override def doExecute(): RDD[InternalRow] = {
 val numOutputRows = longMetric("numOutputRows")
-val avgHashProbe = longMetric("avgHashProbe")
 
 val broadcastRelation = buildPlan.executeBroadcast[HashedRelation]()
 streamedPlan.execute().mapPartitions { streamedIter =>
   val hashed = broadcastRelation.value.asReadOnlyCopy()
   
TaskContext.get().taskMetrics().incPeakExecutionMemory(hashed.estimatedSize)
-  join(streamedIter, hashed, numOutputRows, avgHashProbe)
+  join(streamedIter, hashed, numOutputRows)
 }
   }
 
@@ -110,23 +108,6 @@ case class BroadcastHashJoinExec(
 }
   }
 
-  /**
-   * Returns the codes used to add a task completion listener to update avg 
hash probe
-   * at the end of the task.
-   */
-  private def genTaskListener(avgHashProbe: String, relationTerm: String): 
String = {
-val listenerClass = classOf[TaskCompletionListener].getName
-val taskContextClass = classOf[TaskContext].getName
-s"""
-   | $taskContextClass$$.MODULE$$.get().addTaskCompletionListener(new 
$listenerClass() {
-   |   @Override
-   |   public void onTaskCompletion($taskContextClass context) {
-   | $avgHashProbe.set($relationTerm.getAverageProbesPerLookup());
-   |   }
-   | });
- """.stripMargin
-  }
-
   /**
* Returns a tuple of Broadcast of HashedRelation and the variable name for 
it.
*/
@@ -136,15 +117,11 @@ case class BroadcastHashJoinExec(
 val broadcast = ctx.addReferenceObj("broadcast", broadcastRelation)
 val clsName = broadcastRelation.value.getClass.getName
 
-// At the end of the task, we update the avg hash probe.
-val avgHashProbe = metricTerm(ctx, "avgHashProbe")
-
 // Inline mutable state since not many join operations in a task
 val relationTerm = ctx.addMutableState(clsName, "relation",
   v => s"""
  | $v = (($clsName) $broadcast.value()).asReadOnlyCopy();
  | incPeakExecutionMemory($v.estimatedSize());
- | ${genTaskListener(avgHashProbe, v)}
""".stripMargin, forceInline = true)
 (broadcastRelation, relationTerm)
   }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala
index 0396168d3f311..b197bf6c89981 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala
@@ -194,8 +194,7 @@ trait HashJoin {
   protected def join(
   streamedIter: Iterator[InternalRow],
   hashed: HashedRelation,
-  numOutputRows: SQLMetric,
-  avgHashProbe: SQLMetric): Iterator[InternalRow] = {
+  numOutputRows: SQLMetric): Iterator[InternalRow] = {
 
 val joinedIter = joinType match {
   case _: InnerLike =>
@@ -213,10 +212,6 @@ trait HashJoin {
   s"BroadcastHashJoin should not take $x as the JoinType")
 }
 
-// At the end of the task, we update the avg hash probe.
-TaskContext.get().addTaskCompletionListener(_ =>
-  avgHashProbe.set(hashed.getAverageProbesPerLookup))
-
 val resultProj = createResultProjection
 joinedIter.map { r =>
   numOutputRows += 1

[jira] [Updated] (SPARK-26316) Because of the perf degradation in TPC-DS, we currently partial revert SPARK-21052:Add hash map metrics to join,

2018-12-18 Thread Dongjoon Hyun (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26316?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dongjoon Hyun updated SPARK-26316:
--
Fix Version/s: 2.3.3

> Because of the perf degradation in TPC-DS, we currently partial revert 
> SPARK-21052:Add hash map metrics to join,
> 
>
> Key: SPARK-26316
> URL: https://issues.apache.org/jira/browse/SPARK-26316
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.3.0, 2.3.1, 2.3.2, 2.4.0
>Reporter: Ke Jia
>Assignee: Ke Jia
>Priority: Major
> Fix For: 2.3.3, 2.4.1, 3.0.0
>
>
> The code of  
> [L486|https://github.com/apache/spark/blob/1d3dd58d21400b5652b75af7e7e53aad85a31528/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala#L486]
>  and 
> [L487|https://github.com/apache/spark/blob/1d3dd58d21400b5652b75af7e7e53aad85a31528/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala#L487]
>  in  SPARK-21052 cause performance degradation in spark2.3. The result of  
> all queries in TPC-DS with 1TB is in [TPC-DS 
> result|https://docs.google.com/spreadsheets/d/18a5BdOlmm8euTaRodyeWum9yu92mbWWu6JbhGXtr7yE/edit#gid=0]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-26154) Stream-stream joins - left outer join gives inconsistent output

2018-12-18 Thread sandeep katta (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-26154?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16724238#comment-16724238
 ] 

sandeep katta commented on SPARK-26154:
---

CC [~c...@koeninger.org] [~tdas] . this looks like potential bug to me.

can you please look into this issue

> Stream-stream joins - left outer join gives inconsistent output
> ---
>
> Key: SPARK-26154
> URL: https://issues.apache.org/jira/browse/SPARK-26154
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.3.2
> Environment: Spark version - Spark 2.3.2
> OS- Suse 11
>Reporter: Haripriya
>Priority: Major
>
> Stream-stream joins using left outer join gives inconsistent  output 
> The data processed once, is being processed again and gives null value. In 
> Batch 2, the input data  "3" is processed. But again in batch 6, null value 
> is provided for same data
> Steps
> In spark-shell
> {code:java}
> scala> import org.apache.spark.sql.functions.{col, expr}
> import org.apache.spark.sql.functions.{col, expr}
> scala> import org.apache.spark.sql.streaming.Trigger
> import org.apache.spark.sql.streaming.Trigger
> scala> val lines_stream1 = spark.readStream.
>  |   format("kafka").
>  |   option("kafka.bootstrap.servers", "ip:9092").
>  |   option("subscribe", "topic1").
>  |   option("includeTimestamp", true).
>  |   load().
>  |   selectExpr("CAST (value AS String)","CAST(timestamp AS 
> TIMESTAMP)").as[(String,Timestamp)].
>  |   select(col("value") as("data"),col("timestamp") 
> as("recordTime")).
>  |   select("data","recordTime").
>  |   withWatermark("recordTime", "5 seconds ")
> lines_stream1: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = 
> [data: string, recordTime: timestamp]
> scala> val lines_stream2 = spark.readStream.
>  |   format("kafka").
>  |   option("kafka.bootstrap.servers", "ip:9092").
>  |   option("subscribe", "topic2").
>  |   option("includeTimestamp", value = true).
>  |   load().
>  |   selectExpr("CAST (value AS String)","CAST(timestamp AS 
> TIMESTAMP)").as[(String,Timestamp)].
>  |   select(col("value") as("data1"),col("timestamp") 
> as("recordTime1")).
>  |   select("data1","recordTime1").
>  |   withWatermark("recordTime1", "10 seconds ")
> lines_stream2: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = 
> [data1: string, recordTime1: timestamp]
> scala> val query = lines_stream1.join(lines_stream2, expr (
>  |   """
>  | | data == data1 and
>  | | recordTime1 >= recordTime and
>  | | recordTime1 <= recordTime + interval 5 seconds
>  |   """.stripMargin),"left").
>  |   writeStream.
>  |   option("truncate","false").
>  |   outputMode("append").
>  |   format("console").option("checkpointLocation", 
> "/tmp/leftouter/").
>  |   trigger(Trigger.ProcessingTime ("5 seconds")).
>  |   start()
> query: org.apache.spark.sql.streaming.StreamingQuery = 
> org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@1a48f55b
> {code}
> Step2 : Start producing data
> kafka-console-producer.sh --broker-list ip:9092 --topic topic1
>  >1
>  >2
>  >3
>  >4
>  >5
>  >aa
>  >bb
>  >cc
> kafka-console-producer.sh --broker-list ip:9092 --topic topic2
>  >2
>  >2
>  >3
>  >4
>  >5
>  >aa
>  >cc
>  >ee
>  >ee
>  
> Output obtained:
> {code:java}
> Batch: 0
> ---
> ++--+-+---+
> |data|recordTime|data1|recordTime1|
> ++--+-+---+
> ++--+-+---+
> ---
> Batch: 1
> ---
> ++--+-+---+
> |data|recordTime|data1|recordTime1|
> ++--+-+---+
> ++--+-+---+
> ---
> Batch: 2
> ---
> ++---+-+---+
> |data|recordTime |data1|recordTime1|
> ++---+-+---+
> |3   |2018-11-22 20:09:35.053|3|2018-11-22 20:09:36.506|
> |2   |2018-11-22 20:09:31.613|2|2018-11-22 20:09:33.116|
> ++---+-+---+
> ---
> Batch: 3
> ---
> ++---+-+---+
> |data|recordTime |data1|recordTime1|
> ++---+-+---+
> |4   |2018-11-22 

[jira] [Commented] (SPARK-11430) DataFrame's except method does not work, returns 0

2018-12-18 Thread sathiyarajan (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-11430?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16724203#comment-16724203
 ] 

sathiyarajan commented on SPARK-11430:
--

[~ramk256] [~srowen] : is the except issue got fixed in the pyspark 2.4.0 also. 
{{I am getting some errors }}

{{> df1.except(df2).show()
  File "", line 1
    df1.except(df2).show()
 ^
SyntaxError: invalid syntax
>>> df1.exceptAll(df2).show()}}

> DataFrame's except method does not work, returns 0
> --
>
> Key: SPARK-11430
> URL: https://issues.apache.org/jira/browse/SPARK-11430
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 1.5.0
>Reporter: Ram Kandasamy
>Priority: Major
>
> This may or may not be related to this bug here: 
> https://issues.apache.org/jira/browse/SPARK-11427
> But basically, the except method in dataframes should mirror the 
> functionality of the subtract method in RDDs, but it is not doing so.
> Here is an example:
> scala> val firstFile = 
> sqlContext.read.parquet("/Users/ramkandasamy/sparkData/2015-07-25/*").select("id").distinct
> firstFile: org.apache.spark.sql.DataFrame = [id: string]
> scala> val secondFile = 
> sqlContext.read.parquet("/Users/ramkandasamy/sparkData/2015-10-23/*").select("id").distinct
> secondFile: org.apache.spark.sql.DataFrame = [id: string]
> scala> firstFile.count
> res1: Long = 1072046
> scala> secondFile.count
> res2: Long = 3569941
> scala> firstFile.except(secondFile).count
> res3: Long = 0
> scala> firstFile.rdd.subtract(secondFile.rdd).count
> res4: Long = 1072046
> Can anyone help out here? Thanks!



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-26384) CSV schema inferring does not respect spark.sql.legacy.timeParser.enabled

2018-12-18 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-26384?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16724201#comment-16724201
 ] 

ASF GitHub Bot commented on SPARK-26384:


asfgit closed pull request #23345: [SPARK-26384][SQL] Propagate SQL configs for 
CSV schema inferring
URL: https://github.com/apache/spark/pull/23345
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala
index b46dfb94c133e..375cec597166c 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala
@@ -35,6 +35,7 @@ import org.apache.spark.rdd.{BinaryFileRDD, RDD}
 import org.apache.spark.sql.{Dataset, Encoders, SparkSession}
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.csv.{CSVHeaderChecker, CSVInferSchema, 
CSVOptions, UnivocityParser}
+import org.apache.spark.sql.execution.SQLExecution
 import org.apache.spark.sql.execution.datasources._
 import org.apache.spark.sql.execution.datasources.text.TextFileFormat
 import org.apache.spark.sql.types.StructType
@@ -135,7 +136,9 @@ object TextInputCSVDataSource extends CSVDataSource {
   val parser = new CsvParser(parsedOptions.asParserSettings)
   linesWithoutHeader.map(parser.parseLine)
 }
-new CSVInferSchema(parsedOptions).infer(tokenRDD, header)
+SQLExecution.withSQLConfPropagated(csv.sparkSession) {
+  new CSVInferSchema(parsedOptions).infer(tokenRDD, header)
+}
   case _ =>
 // If the first line could not be read, just return the empty schema.
 StructType(Nil)
@@ -208,7 +211,9 @@ object MultiLineCSVDataSource extends CSVDataSource {
 encoding = parsedOptions.charset)
 }
 val sampled = CSVUtils.sample(tokenRDD, parsedOptions)
-new CSVInferSchema(parsedOptions).infer(sampled, header)
+SQLExecution.withSQLConfPropagated(sparkSession) {
+  new CSVInferSchema(parsedOptions).infer(sampled, header)
+}
   case None =>
 // If the first row could not be read, just return the empty schema.
 StructType(Nil)


 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> CSV schema inferring does not respect spark.sql.legacy.timeParser.enabled
> -
>
> Key: SPARK-26384
> URL: https://issues.apache.org/jira/browse/SPARK-26384
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.0.0
>Reporter: Maxim Gekk
>Assignee: Maxim Gekk
>Priority: Major
> Fix For: 3.0.0
>
>
> Starting from the commit 
> [https://github.com/apache/spark/commit/f982ca07e80074bdc1e3b742c5e21cf368e4ede2]
>  , add logging like in the comment 
> https://github.com/apache/spark/pull/23150#discussion_r242021998 and run:
> {code:shell}
> $ ./bin/spark-shell --conf spark.sql.legacy.timeParser.enabled=true
> {code}
> and in the shell:
> {code:scala}
> scala> spark.conf.get("spark.sql.legacy.timeParser.enabled")
> res0: String = true
> scala> 
> Seq("2010|10|10").toDF.repartition(1).write.mode("overwrite").text("/tmp/foo")
> scala> spark.read.option("inferSchema", "true").option("header", 
> "false").option("timestampFormat", "|MM|dd").csv("/tmp/foo").printSchema()
> 18/12/17 12:11:47 ERROR TimestampFormatter: Iso8601TimestampFormatter is 
> being used
> root
>  |-- _c0: timestamp (nullable = true)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-26384) CSV schema inferring does not respect spark.sql.legacy.timeParser.enabled

2018-12-18 Thread Hyukjin Kwon (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26384?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hyukjin Kwon reassigned SPARK-26384:


Assignee: Maxim Gekk

> CSV schema inferring does not respect spark.sql.legacy.timeParser.enabled
> -
>
> Key: SPARK-26384
> URL: https://issues.apache.org/jira/browse/SPARK-26384
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.0.0
>Reporter: Maxim Gekk
>Assignee: Maxim Gekk
>Priority: Major
> Fix For: 3.0.0
>
>
> Starting from the commit 
> [https://github.com/apache/spark/commit/f982ca07e80074bdc1e3b742c5e21cf368e4ede2]
>  , add logging like in the comment 
> https://github.com/apache/spark/pull/23150#discussion_r242021998 and run:
> {code:shell}
> $ ./bin/spark-shell --conf spark.sql.legacy.timeParser.enabled=true
> {code}
> and in the shell:
> {code:scala}
> scala> spark.conf.get("spark.sql.legacy.timeParser.enabled")
> res0: String = true
> scala> 
> Seq("2010|10|10").toDF.repartition(1).write.mode("overwrite").text("/tmp/foo")
> scala> spark.read.option("inferSchema", "true").option("header", 
> "false").option("timestampFormat", "|MM|dd").csv("/tmp/foo").printSchema()
> 18/12/17 12:11:47 ERROR TimestampFormatter: Iso8601TimestampFormatter is 
> being used
> root
>  |-- _c0: timestamp (nullable = true)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-26384) CSV schema inferring does not respect spark.sql.legacy.timeParser.enabled

2018-12-18 Thread Hyukjin Kwon (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26384?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hyukjin Kwon resolved SPARK-26384.
--
   Resolution: Fixed
Fix Version/s: 3.0.0

Issue resolved by pull request 23345
[https://github.com/apache/spark/pull/23345]

> CSV schema inferring does not respect spark.sql.legacy.timeParser.enabled
> -
>
> Key: SPARK-26384
> URL: https://issues.apache.org/jira/browse/SPARK-26384
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.0.0
>Reporter: Maxim Gekk
>Assignee: Maxim Gekk
>Priority: Major
> Fix For: 3.0.0
>
>
> Starting from the commit 
> [https://github.com/apache/spark/commit/f982ca07e80074bdc1e3b742c5e21cf368e4ede2]
>  , add logging like in the comment 
> https://github.com/apache/spark/pull/23150#discussion_r242021998 and run:
> {code:shell}
> $ ./bin/spark-shell --conf spark.sql.legacy.timeParser.enabled=true
> {code}
> and in the shell:
> {code:scala}
> scala> spark.conf.get("spark.sql.legacy.timeParser.enabled")
> res0: String = true
> scala> 
> Seq("2010|10|10").toDF.repartition(1).write.mode("overwrite").text("/tmp/foo")
> scala> spark.read.option("inferSchema", "true").option("header", 
> "false").option("timestampFormat", "|MM|dd").csv("/tmp/foo").printSchema()
> 18/12/17 12:11:47 ERROR TimestampFormatter: Iso8601TimestampFormatter is 
> being used
> root
>  |-- _c0: timestamp (nullable = true)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-17383) improvement LabelPropagation of graphx lib

2018-12-18 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-17383?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16724154#comment-16724154
 ] 

ASF GitHub Bot commented on SPARK-17383:


srowen closed pull request #14940: [SPARK-17383][GRAPHX] Improvement 
LabelPropagaton, and reduce label shake and disconnection of communities
URL: https://github.com/apache/spark/pull/14940
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/graphx/src/main/scala/org/apache/spark/graphx/lib/LabelPropagation.scala 
b/graphx/src/main/scala/org/apache/spark/graphx/lib/LabelPropagation.scala
index fc7547a2c7c27..31a9414ae47ca 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/lib/LabelPropagation.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/lib/LabelPropagation.scala
@@ -58,7 +58,7 @@ object LabelPropagation {
   }.toMap
 }
 def vertexProgram(vid: VertexId, attr: Long, message: Map[VertexId, 
Long]): VertexId = {
-  if (message.isEmpty) attr else message.maxBy(_._2)._1
+  (Map(attr -> 1L) ++ message).maxBy(m => (m._2, m._1))._1
 }
 val initialMessage = Map[VertexId, Long]()
 Pregel(lpaGraph, initialMessage, maxIterations = maxSteps)(


 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> improvement LabelPropagation of graphx lib
> --
>
> Key: SPARK-17383
> URL: https://issues.apache.org/jira/browse/SPARK-17383
> Project: Spark
>  Issue Type: Improvement
>  Components: GraphX
>Affects Versions: 2.1.0
>Reporter: XiaoSen Lee
>Priority: Major
>
> In the labelPropagation of graphx lib, node is initialized with a unique
> label and at every step each node adopts the label that most of its neighbors 
> currently have, but ignore the label it currently have. I think it is 
> unreasonable, because the labe a node had is also useful. When a node trend 
> to has a stable label, this means there is an association between two 
> iterations, so a node not only affected by its neighbors, but also its 
> current label.
> so I change the code, and use both the label of its neighbors and itself.
> This iterative process densely connected groups of nodes form a consensus on 
> a unique label to form
> communities. But the communities of the LabelPropagation often discontinuous.
> Because when the label that most of its neighbors currents have are many,e.g, 
> node "0" has 6 neigbors labed {"1","1","2","2","3","3"},it maybe randomly 
> select a label. in order to get a stable label of communities, and prevent 
> the randomness, so I chose the max lable of node.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-17383) improvement LabelPropagation of graphx lib

2018-12-18 Thread Sean Owen (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-17383?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sean Owen resolved SPARK-17383.
---
Resolution: Won't Fix

> improvement LabelPropagation of graphx lib
> --
>
> Key: SPARK-17383
> URL: https://issues.apache.org/jira/browse/SPARK-17383
> Project: Spark
>  Issue Type: Improvement
>  Components: GraphX
>Affects Versions: 2.1.0
>Reporter: XiaoSen Lee
>Priority: Major
>
> In the labelPropagation of graphx lib, node is initialized with a unique
> label and at every step each node adopts the label that most of its neighbors 
> currently have, but ignore the label it currently have. I think it is 
> unreasonable, because the labe a node had is also useful. When a node trend 
> to has a stable label, this means there is an association between two 
> iterations, so a node not only affected by its neighbors, but also its 
> current label.
> so I change the code, and use both the label of its neighbors and itself.
> This iterative process densely connected groups of nodes form a consensus on 
> a unique label to form
> communities. But the communities of the LabelPropagation often discontinuous.
> Because when the label that most of its neighbors currents have are many,e.g, 
> node "0" has 6 neigbors labed {"1","1","2","2","3","3"},it maybe randomly 
> select a label. in order to get a stable label of communities, and prevent 
> the randomness, so I chose the max lable of node.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-26396) Kafka consumer cache overflow since 2.4.x

2018-12-18 Thread Kaspar Tint (JIRA)
Kaspar Tint created SPARK-26396:
---

 Summary: Kafka consumer cache overflow since 2.4.x
 Key: SPARK-26396
 URL: https://issues.apache.org/jira/browse/SPARK-26396
 Project: Spark
  Issue Type: Bug
  Components: Structured Streaming
Affects Versions: 2.4.0
 Environment: Spark 2.4 standalone client mode
Reporter: Kaspar Tint


We are experiencing an issue where the Kafka consumer cache seems to overflow 
constantly upon starting the application. This issue appeared after upgrading 
to Spark 2.4.

We would get constant warnings like this:
{code:java}
18/12/18 07:03:29 WARN KafkaDataConsumer: KafkaConsumer cache hitting max 
capacity of 180, removing consumer for 
CacheKey(spark-kafka-source-6f66e0d2-beaf-4ff2-ade8-8996611de6ae--1081651087-executor,kafka-topic-76)
18/12/18 07:03:32 WARN KafkaDataConsumer: KafkaConsumer cache hitting max 
capacity of 180, removing consumer for 
CacheKey(spark-kafka-source-6f66e0d2-beaf-4ff2-ade8-8996611de6ae--1081651087-executor,kafka-topic-30)
18/12/18 07:03:32 WARN KafkaDataConsumer: KafkaConsumer cache hitting max 
capacity of 180, removing consumer for 
CacheKey(spark-kafka-source-f41d1f9e-1700-4994-9d26-2b9c0ee57881--215746753-executor,kafka-topic-57)
18/12/18 07:03:32 WARN KafkaDataConsumer: KafkaConsumer cache hitting max 
capacity of 180, removing consumer for 
CacheKey(spark-kafka-source-f41d1f9e-1700-4994-9d26-2b9c0ee57881--215746753-executor,kafka-topic-43)
{code}

This application is running 4 different Spark Structured Streaming queries 
against the same Kafka topic that has 90 partitions. We used to run it with 
just the default settings so it defaulted to cache size 64 on Spark 2.3 but now 
we tried to put it to 180 or 360. With 360 we will have a lot less noise about 
the overflow but resource need will increase substantially.




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-26395) Spark Thrift server memory leak

2018-12-18 Thread Konstantinos Andrikopoulos (JIRA)
Konstantinos Andrikopoulos created SPARK-26395:
--

 Summary: Spark Thrift server memory leak
 Key: SPARK-26395
 URL: https://issues.apache.org/jira/browse/SPARK-26395
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Affects Versions: 2.3.2
Reporter: Konstantinos Andrikopoulos


We are running Thrift Server in standalone mode and we have observed that the 
heap of the driver is constantly increasing. After analysing the heap dump the 
issue seems to be that the ElementTrackingStore is constantly increasing due to 
the addition of RDDOperationGraphWrapper objects that are not cleaned up.

The ElementTrackingStore defines the addTrigger method were you are able to set 
thresholds in order to perform cleanup but in practice it is used for  
ExecutorSummaryWrapper, JobDataWrapper and StageDataWrapper classes by using 
the following spark properties 
 * spark.ui.retainedDeadExecutors
 * spark.ui.retainedJobs
 * spark.ui.retainedStages

So the  RDDOperationGraphWrapper which is been added using the onJobStart 
method of  AppStatusListener class [kvstore.write(uigraph) #line 291]

in not cleaned up and it constantly increases causing a memory leak



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-26336) left_anti join with Na Values

2018-12-18 Thread Marco Gaido (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-26336?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16724098#comment-16724098
 ] 

Marco Gaido commented on SPARK-26336:
-

[~csevilla] the point is always the same, ie. the presence of {{NULL}} 
(Python's None is SQL's NULL). And {{NULL = NULL}} returns {{NULL}}, not 
{{true}}. This is how every DB works. You can try it in MySQL, Postgres, 
whatever you prefer.

> left_anti join with Na Values
> -
>
> Key: SPARK-26336
> URL: https://issues.apache.org/jira/browse/SPARK-26336
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Affects Versions: 2.2.0
>Reporter: Carlos
>Priority: Major
>
> When I'm joining two dataframes with data that haves NA values, the left_anti 
> join don't work as well, cause don't detect registers with NA values.
> Example:  
> {code:java}
> from pyspark.sql import SparkSession
> from pyspark.sql.functions import *
> spark = SparkSession.builder.appName('test').enableHiveSupport().getOrCreate()
> data = [(1,"Test"),(2,"Test"),(3,None)]
> df1 = spark.createDataFrame(data,("id","columndata"))
> df2 = spark.createDataFrame(data,("id","columndata"))
> df_joined = df1.join(df2, df1.columns,'left_anti'){code}
> df_joined have data, when two dataframe are the same.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-26336) left_anti join with Na Values

2018-12-18 Thread Carlos (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-26336?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16724081#comment-16724081
 ] 

Carlos commented on SPARK-26336:


[~mgaido] I think I choose a bad objects to example that.

 

data1 = {

'id':1,

'name':'Carlos'

'surname':'Sevilla'

'address':None

'Country':'ESP'

}

data2 = {

'id':1,

'name':'Carlos'

'surname':'Sevilla'

'address':None

'Country':'ESP'

}

 

That 2 variables, contains the SAME data.

If I try to left_anti (with inner don't works too), he must return None 
results, none rows, cause both dataframe have exactly the same data.

 

 

 

 

 

 

> left_anti join with Na Values
> -
>
> Key: SPARK-26336
> URL: https://issues.apache.org/jira/browse/SPARK-26336
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Affects Versions: 2.2.0
>Reporter: Carlos
>Priority: Major
>
> When I'm joining two dataframes with data that haves NA values, the left_anti 
> join don't work as well, cause don't detect registers with NA values.
> Example:  
> {code:java}
> from pyspark.sql import SparkSession
> from pyspark.sql.functions import *
> spark = SparkSession.builder.appName('test').enableHiveSupport().getOrCreate()
> data = [(1,"Test"),(2,"Test"),(3,None)]
> df1 = spark.createDataFrame(data,("id","columndata"))
> df2 = spark.createDataFrame(data,("id","columndata"))
> df_joined = df1.join(df2, df1.columns,'left_anti'){code}
> df_joined have data, when two dataframe are the same.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24680) spark.executorEnv.JAVA_HOME does not take effect in Standalone mode

2018-12-18 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24680?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16724047#comment-16724047
 ] 

ASF GitHub Bot commented on SPARK-24680:


srowen closed pull request #21663: [SPARK-24680][Deploy]Support 
spark.executorEnv.JAVA_HOME in Standalone mode
URL: https://github.com/apache/spark/pull/21663
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/launcher/src/main/java/org/apache/spark/launcher/AbstractCommandBuilder.java 
b/launcher/src/main/java/org/apache/spark/launcher/AbstractCommandBuilder.java
index ce24400f557cd..56edceb17bfb8 100644
--- 
a/launcher/src/main/java/org/apache/spark/launcher/AbstractCommandBuilder.java
+++ 
b/launcher/src/main/java/org/apache/spark/launcher/AbstractCommandBuilder.java
@@ -91,14 +91,18 @@
*/
   List buildJavaCommand(String extraClassPath) throws IOException {
 List cmd = new ArrayList<>();
-String envJavaHome;
 
-if (javaHome != null) {
-  cmd.add(join(File.separator, javaHome, "bin", "java"));
-} else if ((envJavaHome = System.getenv("JAVA_HOME")) != null) {
-cmd.add(join(File.separator, envJavaHome, "bin", "java"));
-} else {
-cmd.add(join(File.separator, System.getProperty("java.home"), "bin", 
"java"));
+String[] candidateJavaHomes = new String[] {
+  javaHome,
+  childEnv.get("JAVA_HOME"),
+  System.getenv("JAVA_HOME"),
+  System.getProperty("java.home")
+};
+for (String javaHome : candidateJavaHomes) {
+  if (javaHome != null) {
+cmd.add(join(File.separator, javaHome, "bin", "java"));
+break;
+  }
 }
 
 // Load extra JAVA_OPTS from conf/java-opts, if it exists.


 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> spark.executorEnv.JAVA_HOME does not take effect in Standalone mode
> ---
>
> Key: SPARK-24680
> URL: https://issues.apache.org/jira/browse/SPARK-24680
> Project: Spark
>  Issue Type: Bug
>  Components: Deploy
>Affects Versions: 2.1.1, 2.2.1, 2.3.1
>Reporter: StanZhai
>Assignee: StanZhai
>Priority: Minor
> Fix For: 3.0.0
>
>
> spark.executorEnv.JAVA_HOME does not take effect when a Worker starting an 
> Executor process in Standalone mode.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-25953) install jdk11 on jenkins workers

2018-12-18 Thread Sean Owen (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-25953?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16724048#comment-16724048
 ] 

Sean Owen commented on SPARK-25953:
---

Was this resolved? I think we can just install the latest JDK 11.

> install jdk11 on jenkins workers
> 
>
> Key: SPARK-25953
> URL: https://issues.apache.org/jira/browse/SPARK-25953
> Project: Spark
>  Issue Type: Sub-task
>  Components: Build
>Affects Versions: 3.0.0
>Reporter: shane knapp
>Assignee: shane knapp
>Priority: Critical
>
> once we pin down exact what we want installed on the jenkins workers, i will 
> add it to our ansible and deploy.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-26336) left_anti join with Na Values

2018-12-18 Thread Marco Gaido (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-26336?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16724052#comment-16724052
 ] 

Marco Gaido commented on SPARK-26336:
-

That's correct because NULLs do not match. The usual implementation of ANTIJOIN 
in other DBs (eg. Postgres) is to do a left join and filter for the column on 
the right side being NULL. If you do so in your example 1 row is returned.

> left_anti join with Na Values
> -
>
> Key: SPARK-26336
> URL: https://issues.apache.org/jira/browse/SPARK-26336
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Affects Versions: 2.2.0
>Reporter: Carlos
>Priority: Major
>
> When I'm joining two dataframes with data that haves NA values, the left_anti 
> join don't work as well, cause don't detect registers with NA values.
> Example:  
> {code:java}
> from pyspark.sql import SparkSession
> from pyspark.sql.functions import *
> spark = SparkSession.builder.appName('test').enableHiveSupport().getOrCreate()
> data = [(1,"Test"),(2,"Test"),(3,None)]
> df1 = spark.createDataFrame(data,("id","columndata"))
> df2 = spark.createDataFrame(data,("id","columndata"))
> df_joined = df1.join(df2, df1.columns,'left_anti'){code}
> df_joined have data, when two dataframe are the same.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-17939) Spark-SQL Nullability: Optimizations vs. Enforcement Clarification

2018-12-18 Thread Sean Owen (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-17939?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sean Owen updated SPARK-17939:
--
  Priority: Major  (was: Critical)
Issue Type: Improvement  (was: Bug)

> Spark-SQL Nullability: Optimizations vs. Enforcement Clarification
> --
>
> Key: SPARK-17939
> URL: https://issues.apache.org/jira/browse/SPARK-17939
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.0.0
>Reporter: Aleksander Eskilson
>Priority: Major
>
> The notion of Nullability of of StructFields in DataFrames and Datasets 
> creates some confusion. As has been pointed out previously [1], Nullability 
> is a hint to the Catalyst optimizer, and is not meant to be a type-level 
> enforcement. Allowing null fields can also help the reader successfully parse 
> certain types of more loosely-typed data, like JSON and CSV, where null 
> values are common, rather than just failing. 
> There's already been some movement to clarify the meaning of Nullable in the 
> API, but also some requests for a (perhaps completely separate) type-level 
> implementation of Nullable that can act as an enforcement contract.
> This bug is logged here to discuss and clarify this issue.
> [1] - 
> [https://issues.apache.org/jira/browse/SPARK-11319|https://issues.apache.org/jira/browse/SPARK-11319?focusedCommentId=15014535=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-15014535]
> [2] - https://github.com/apache/spark/pull/11785



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-20712) [SPARK 2.1 REGRESSION][SQL] Spark can't read Hive table when column type has length greater than 4000 bytes

2018-12-18 Thread Sean Owen (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-20712?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sean Owen resolved SPARK-20712.
---
Resolution: Cannot Reproduce

> [SPARK 2.1 REGRESSION][SQL] Spark can't read Hive table when column type has 
> length greater than 4000 bytes
> ---
>
> Key: SPARK-20712
> URL: https://issues.apache.org/jira/browse/SPARK-20712
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.1.1, 2.1.2, 2.2.0, 2.3.0
>Reporter: Maciej Bryński
>Priority: Critical
>
> Hi,
> I have following issue.
> I'm trying to read a table from hive when one of the column is nested so it's 
> schema has length longer than 4000 bytes.
> Everything worked on Spark 2.0.2. On 2.1.1 I'm getting Exception:
> {code}
> >> spark.read.table("SOME_TABLE")
> Traceback (most recent call last):
>   File "", line 1, in 
>   File "/opt/spark-2.1.1/python/pyspark/sql/readwriter.py", line 259, in table
> return self._df(self._jreader.table(tableName))
>   File 
> "/opt/spark-2.1.1/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 
> 1133, in __call__
>   File "/opt/spark-2.1.1/python/pyspark/sql/utils.py", line 63, in deco
> return f(*a, **kw)
>   File "/opt/spark-2.1.1/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py", 
> line 319, in get_return_value
> py4j.protocol.Py4JJavaError: An error occurred while calling o71.table.
> : org.apache.spark.SparkException: Cannot recognize hive type string: 
> SOME_VERY_LONG_FIELD_TYPE
> at 
> org.apache.spark.sql.hive.client.HiveClientImpl.org$apache$spark$sql$hive$client$HiveClientImpl$$fromHiveColumn(HiveClientImpl.scala:789)
> at 
> org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$getTableOption$1$$anonfun$apply$11$$anonfun$7.apply(HiveClientImpl.scala:365)
> at 
> org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$getTableOption$1$$anonfun$apply$11$$anonfun$7.apply(HiveClientImpl.scala:365)
> at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at scala.collection.Iterator$class.foreach(Iterator.scala:893)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> at 
> scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
> at scala.collection.AbstractTraversable.map(Traversable.scala:104)
> at 
> org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$getTableOption$1$$anonfun$apply$11.apply(HiveClientImpl.scala:365)
> at 
> org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$getTableOption$1$$anonfun$apply$11.apply(HiveClientImpl.scala:361)
> at scala.Option.map(Option.scala:146)
> at 
> org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$getTableOption$1.apply(HiveClientImpl.scala:361)
> at 
> org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$getTableOption$1.apply(HiveClientImpl.scala:359)
> at 
> org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$withHiveState$1.apply(HiveClientImpl.scala:279)
> at 
> org.apache.spark.sql.hive.client.HiveClientImpl.liftedTree1$1(HiveClientImpl.scala:226)
> at 
> org.apache.spark.sql.hive.client.HiveClientImpl.retryLocked(HiveClientImpl.scala:225)
> at 
> org.apache.spark.sql.hive.client.HiveClientImpl.withHiveState(HiveClientImpl.scala:268)
> at 
> org.apache.spark.sql.hive.client.HiveClientImpl.getTableOption(HiveClientImpl.scala:359)
> at 
> org.apache.spark.sql.hive.client.HiveClient$class.getTable(HiveClient.scala:74)
> at 
> org.apache.spark.sql.hive.client.HiveClientImpl.getTable(HiveClientImpl.scala:78)
> at 
> org.apache.spark.sql.hive.HiveExternalCatalog$$anonfun$org$apache$spark$sql$hive$HiveExternalCatalog$$getRawTable$1.apply(HiveExternalCatalog.scala:118)
> at 
> org.apache.spark.sql.hive.HiveExternalCatalog$$anonfun$org$apache$spark$sql$hive$HiveExternalCatalog$$getRawTable$1.apply(HiveExternalCatalog.scala:118)
> at 
> org.apache.spark.sql.hive.HiveExternalCatalog.withClient(HiveExternalCatalog.scala:97)
> at 
> org.apache.spark.sql.hive.HiveExternalCatalog.org$apache$spark$sql$hive$HiveExternalCatalog$$getRawTable(HiveExternalCatalog.scala:117)
> at 
> org.apache.spark.sql.hive.HiveExternalCatalog$$anonfun$getTable$1.apply(HiveExternalCatalog.scala:628)
> at 
> org.apache.spark.sql.hive.HiveExternalCatalog$$anonfun$getTable$1.apply(HiveExternalCatalog.scala:628)
> at 
> 

[jira] [Updated] (SPARK-26381) Pickle Serialization Error Causing Crash

2018-12-18 Thread Sean Owen (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26381?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sean Owen updated SPARK-26381:
--
Priority: Major  (was: Critical)

What is failing to serialize here?

> Pickle Serialization Error Causing Crash
> 
>
> Key: SPARK-26381
> URL: https://issues.apache.org/jira/browse/SPARK-26381
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Affects Versions: 2.3.1, 2.4.0
> Environment: Tested on two environments:
>  * Spark 2.4.0 - single machine only
>  * Spark 2.3.1 - YARN installation with 5 nodes and files on HDFS
> The error occurs in both environments.
>Reporter: Ryan
>Priority: Major
>
> There is a pickle serialization error when I try and use AllenNLP for doing 
> NER within a Spark worker - it is causing a crash. When running on just the 
> Spark driver or in a standalone program, everything works as expected.
>  
> {code:java}
> Caused by: org.apache.spark.api.python.PythonException: Traceback (most 
> recent call last): 
>  File 
> "/data/disk12/yarn/local/usercache/raclancy/appcache/application_1543437939000_1040/container_1543437939000_1040_01_02/pyspark.zip/pyspark/worker.py",
>  line 217, in main 
>    func, profiler, deserializer, serializer = read_command(pickleSer, infile) 
>  File 
> "/data/disk12/yarn/local/usercache/raclancy/appcache/application_1543437939000_1040/container_1543437939000_1040_01_02/pyspark.zip/pyspark/worker.py",
>  line 61, in read_command 
>    command = serializer.loads(command.value) 
>  File 
> "/data/disk12/yarn/local/usercache/raclancy/appcache/application_1543437939000_1040/container_1543437939000_1040_01_02/pyspark.zip/pyspark/serializers.py",
>  line 559, in loads 
>    return pickle.loads(obj, encoding=encoding) 
> TypeError: __init__() missing 3 required positional arguments: 
> 'non_padded_namespaces', 'padding_token', and 'oov_token' 
>    at 
> org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:298)
>  
>    at 
> org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:438) 
>    at 
> org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:421) 
>    at 
> org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:252)
>  
>    at 
> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
>  
>    at scala.collection.Iterator$class.foreach(Iterator.scala:893) 
>    at 
> org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
>  
>    at 
> scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59) 
>    at 
> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104) 
>    at 
> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48) 
>    at 
> scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310) 
>    at 
> org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28) 
>    at 
> scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302) 
>    at 
> org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
>  
>    at 
> scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289) 
>    at 
> org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
>  
>    at 
> org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:939) 
>    at 
> org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:939) 
>    at 
> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2074)
>  
>    at 
> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2074)
>  
>    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) 
>    at org.apache.spark.scheduler.Task.run(Task.scala:109) 
>    at 
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345) 
>    at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>  
>    at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>  
>    ... 1 more
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-26377) java.lang.IllegalStateException: No current assignment for partition

2018-12-18 Thread Sean Owen (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26377?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sean Owen updated SPARK-26377:
--
Priority: Major  (was: Critical)

> java.lang.IllegalStateException: No current assignment for partition
> 
>
> Key: SPARK-26377
> URL: https://issues.apache.org/jira/browse/SPARK-26377
> Project: Spark
>  Issue Type: Bug
>  Components: DStreams
>Affects Versions: 2.2.1
>Reporter: pavan
>Priority: Major
>
> Hi,
>    I am using sparkkafkaDirectStream with subscriberPattern with initial 
> offsets for topics and a pattern. On running the SparkJob on the job server  
> i am getting the following exception.The job is terminated. 
> Kafka Params:
> "bootstrap.servers" -> credentials.getBrokers,
>  "key.deserializer" -> classOf[StringDeserializer],
>  "value.deserializer" -> classOf[ByteArrayDeserializer],
>  "enable.auto.commit" -> (false: java.lang.Boolean)
> "group.id" -> "abc"
> API:
> KafkaUtils.createDirectStream(streamingContext, PreferConsistent, 
> SubscribePattern[K, V](regexPattern, allKafkaParams, offsets), 
> perPartitionConfig)
>  
> Error Log:
> { "duration": "33.523 secs", "classPath": 
> "com.appiot.dataingestion.DataIngestionJob", "startTime": 
> "2018-12-15T18:28:08.207Z", "context": 
> "c~1d750906-1fa7-44f9-a258-04963ac53150~9dc097e3-bf0f-432c-9c27-68c41a4009cd",
>  "result": 
> { "message": "java.lang.IllegalStateException: No current assignment for 
> partition com-cibigdata2.v1.iot.raw_timeseries-0", "errorClass": 
> "java.lang.RuntimeException", "stack": "java.lang.RuntimeException: 
> java.lang.IllegalStateException: No current assignment for partition 
> com-cibigdata2.v1.iot.raw_timeseries-0\n\tat 
> org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:269)\n\tat
>  
> org.apache.kafka.clients.consumer.internals.SubscriptionState.seek(SubscriptionState.java:294)\n\tat
>  
> org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1249)\n\tat
>  
> org.apache.spark.streaming.kafka010.SubscribePattern$$anonfun$onStart$4.apply(ConsumerStrategy.scala:160)\n\tat
>  
> org.apache.spark.streaming.kafka010.SubscribePattern$$anonfun$onStart$4.apply(ConsumerStrategy.scala:159)\n\tat
>  scala.collection.Iterator$class.foreach(Iterator.scala:893)\n\tat 
> scala.collection.AbstractIterator.foreach(Iterator.scala:1336)\n\tat 
> scala.collection.IterableLike$class.foreach(IterableLike.scala:72)\n\tat 
> scala.collection.AbstractIterable.foreach(Iterable.scala:54)\n\tat 
> org.apache.spark.streaming.kafka010.SubscribePattern.onStart(ConsumerStrategy.scala:159)\n\tat
>  
> org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.consumer(DirectKafkaInputDStream.scala:72)\n\tat
>  
> org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.start(DirectKafkaInputDStream.scala:242)\n\tat
>  
> org.apache.spark.streaming.DStreamGraph$$anonfun$start$7.apply(DStreamGraph.scala:54)\n\tat
>  
> org.apache.spark.streaming.DStreamGraph$$anonfun$start$7.apply(DStreamGraph.scala:54)\n\tat
>  
> scala.collection.parallel.mutable.ParArray$ParArrayIterator.foreach_quick(ParArray.scala:143)\n\tat
>  
> scala.collection.parallel.mutable.ParArray$ParArrayIterator.foreach(ParArray.scala:136)\n\tat
>  
> scala.collection.parallel.ParIterableLike$Foreach.leaf(ParIterableLike.scala:972)\n\tat
>  
> scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:49)\n\tat
>  
> scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48)\n\tat 
> scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48)\n\tat 
> scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:51)\n\tat 
> scala.collection.parallel.ParIterableLike$Foreach.tryLeaf(ParIterableLike.scala:969)\n\tat
>  
> scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:152)\n\tat
>  
> scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:443)\n\tat
>  
> scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160)\n\tat
>  scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)\n\tat 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)\n\tat
>  
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)\n\tat
>  
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)\n\tat
>  ... run in separate thread using org.apache.spark.util.ThreadUtils ... 
> ()\n\tat 
> org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:578)\n\tat
>  
> org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:572)\n\tat
>  
> com.sap.appiot.dataingestion.DataIngestionJob$.runJob(DataIngestionJob.scala:182)\n\tat
>  
> 

[jira] [Assigned] (SPARK-24680) spark.executorEnv.JAVA_HOME does not take effect in Standalone mode

2018-12-18 Thread Sean Owen (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-24680?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sean Owen reassigned SPARK-24680:
-

Assignee: StanZhai

> spark.executorEnv.JAVA_HOME does not take effect in Standalone mode
> ---
>
> Key: SPARK-24680
> URL: https://issues.apache.org/jira/browse/SPARK-24680
> Project: Spark
>  Issue Type: Bug
>  Components: Deploy
>Affects Versions: 2.1.1, 2.2.1, 2.3.1
>Reporter: StanZhai
>Assignee: StanZhai
>Priority: Minor
> Fix For: 3.0.0
>
>
> spark.executorEnv.JAVA_HOME does not take effect when a Worker starting an 
> Executor process in Standalone mode.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-24680) spark.executorEnv.JAVA_HOME does not take effect in Standalone mode

2018-12-18 Thread Sean Owen (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-24680?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sean Owen resolved SPARK-24680.
---
   Resolution: Fixed
Fix Version/s: 3.0.0

Issue resolved by pull request 21663
[https://github.com/apache/spark/pull/21663]

> spark.executorEnv.JAVA_HOME does not take effect in Standalone mode
> ---
>
> Key: SPARK-24680
> URL: https://issues.apache.org/jira/browse/SPARK-24680
> Project: Spark
>  Issue Type: Bug
>  Components: Deploy
>Affects Versions: 2.1.1, 2.2.1, 2.3.1
>Reporter: StanZhai
>Assignee: StanZhai
>Priority: Minor
> Fix For: 3.0.0
>
>
> spark.executorEnv.JAVA_HOME does not take effect when a Worker starting an 
> Executor process in Standalone mode.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-26394) Annotation error for Utils.timeStringAsMs

2018-12-18 Thread Apache Spark (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26394?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-26394:


Assignee: Apache Spark

> Annotation error for Utils.timeStringAsMs
> -
>
> Key: SPARK-26394
> URL: https://issues.apache.org/jira/browse/SPARK-26394
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.4.0
>Reporter: Jackey Lee
>Assignee: Apache Spark
>Priority: Minor
>
> Utils.timeStringAsMs() is parsing time to milliseconds, but in annotation, it 
> says "Convert a time parameter such as (50s, 100ms, or 250us) to microseconds 
> for internal use."
> Thus, microseconds should be changed to milliseconds.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-26394) Annotation error for Utils.timeStringAsMs

2018-12-18 Thread Apache Spark (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26394?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-26394:


Assignee: (was: Apache Spark)

> Annotation error for Utils.timeStringAsMs
> -
>
> Key: SPARK-26394
> URL: https://issues.apache.org/jira/browse/SPARK-26394
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.4.0
>Reporter: Jackey Lee
>Priority: Minor
>
> Utils.timeStringAsMs() is parsing time to milliseconds, but in annotation, it 
> says "Convert a time parameter such as (50s, 100ms, or 250us) to microseconds 
> for internal use."
> Thus, microseconds should be changed to milliseconds.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-26394) Annotation error for Utils.timeStringAsMs

2018-12-18 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-26394?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16724040#comment-16724040
 ] 

ASF GitHub Bot commented on SPARK-26394:


stczwd opened a new pull request #23346: [SPARK-26394][core] Fix annotation 
error for Utils.timeStringAsMs
URL: https://github.com/apache/spark/pull/23346
 
 
   ## What changes were proposed in this pull request?
   
   Change microseconds to milliseconds in annotation of Utils.timeStringAsMs.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Annotation error for Utils.timeStringAsMs
> -
>
> Key: SPARK-26394
> URL: https://issues.apache.org/jira/browse/SPARK-26394
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.4.0
>Reporter: Jackey Lee
>Priority: Minor
>
> Utils.timeStringAsMs() is parsing time to milliseconds, but in annotation, it 
> says "Convert a time parameter such as (50s, 100ms, or 250us) to microseconds 
> for internal use."
> Thus, microseconds should be changed to milliseconds.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-26081) Do not write empty files by text datasources

2018-12-18 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-26081?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16724034#comment-16724034
 ] 

ASF GitHub Bot commented on SPARK-26081:


asfgit closed pull request #23341: [SPARK-26081][SQL][FOLLOW-UP] Use foreach 
instead of misuse of map (for Unit)
URL: https://github.com/apache/spark/pull/23341
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala
index f7d8a9e1042d5..f4f139d180058 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala
@@ -189,5 +189,5 @@ private[csv] class CsvOutputWriter(
 gen.write(row)
   }
 
-  override def close(): Unit = univocityGenerator.map(_.close())
+  override def close(): Unit = univocityGenerator.foreach(_.close())
 }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala
index 3042133ee43aa..40f55e7068010 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala
@@ -190,5 +190,5 @@ private[json] class JsonOutputWriter(
 gen.writeLineEnding()
   }
 
-  override def close(): Unit = jacksonGenerator.map(_.close())
+  override def close(): Unit = jacksonGenerator.foreach(_.close())
 }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala
index 01948ab25d63c..0607f7b3c0d4a 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala
@@ -153,7 +153,7 @@ class TextOutputWriter(
   private var outputStream: Option[OutputStream] = None
 
   override def write(row: InternalRow): Unit = {
-val os = outputStream.getOrElse{
+val os = outputStream.getOrElse {
   val newStream = CodecStreams.createOutputStream(context, new Path(path))
   outputStream = Some(newStream)
   newStream
@@ -167,6 +167,6 @@ class TextOutputWriter(
   }
 
   override def close(): Unit = {
-outputStream.map(_.close())
+outputStream.foreach(_.close())
   }
 }


 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Do not write empty files by text datasources
> 
>
> Key: SPARK-26081
> URL: https://issues.apache.org/jira/browse/SPARK-26081
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.4.0
>Reporter: Maxim Gekk
>Assignee: Maxim Gekk
>Priority: Minor
>  Labels: release-notes
> Fix For: 3.0.0
>
>
> Text based datasources like CSV, JSON and Text produces empty files for empty 
> partitions. This introduces additional overhead while opening and reading 
> such files back. In current implementation of OutputWriter, the output stream 
> are created eagerly even no records are written to the stream. So, creation 
> can be postponed up to the first write.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24630) SPIP: Support SQLStreaming in Spark

2018-12-18 Thread Jackey Lee (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24630?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16724011#comment-16724011
 ] 

Jackey Lee commented on SPARK-24630:


[~jackylk] Sorry for the late reply.

I haven't considered about manipulating streaming job, which is currently 
running directly after starting until the end of application, similar to a 
Command. SQLStreaming can also be easily supported if methods are later able to 
manipulate and process current Command execution. Can you show me how to deal 
with it?

Current, SQLStreaming support Table API, thus we can use table API to show/desc 
stream tables.

> SPIP: Support SQLStreaming in Spark
> ---
>
> Key: SPARK-24630
> URL: https://issues.apache.org/jira/browse/SPARK-24630
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>Affects Versions: 2.2.0, 2.2.1
>Reporter: Jackey Lee
>Priority: Minor
>  Labels: SQLStreaming
> Attachments: SQLStreaming SPIP.pdf
>
>
> At present, KafkaSQL, Flink SQL(which is actually based on Calcite), 
> SQLStream, StormSQL all provide a stream type SQL interface, with which users 
> with little knowledge about streaming,  can easily develop a flow system 
> processing model. In Spark, we can also support SQL API based on 
> StructStreamig.
> To support for SQL Streaming, there are two key points: 
> 1, Analysis should be able to parse streaming type SQL. 
> 2, Analyzer should be able to map metadata information to the corresponding 
> Relation. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-26394) Annotation error for Utils.timeStringAsMs

2018-12-18 Thread Jackey Lee (JIRA)
Jackey Lee created SPARK-26394:
--

 Summary: Annotation error for Utils.timeStringAsMs
 Key: SPARK-26394
 URL: https://issues.apache.org/jira/browse/SPARK-26394
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Affects Versions: 2.4.0
Reporter: Jackey Lee


Utils.timeStringAsMs() is parsing time to milliseconds, but in annotation, it 
says "Convert a time parameter such as (50s, 100ms, or 250us) to microseconds 
for internal use."

Thus, microseconds should be changed to milliseconds.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-19228) inferSchema function processed csv date column as string and "dateFormat" DataSource option is ignored

2018-12-18 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-19228?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16723960#comment-16723960
 ] 

ASF GitHub Bot commented on SPARK-19228:


HyukjinKwon closed pull request #21363: [SPARK-19228][SQL] Migrate on Java 8 
time from FastDateFormat for meet the ISO8601
URL: https://github.com/apache/spark/pull/21363
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala
index 80f15053005ff..9eaf6a2862a0f 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala
@@ -19,6 +19,8 @@ package org.apache.spark.sql.catalyst.util
 
 import java.sql.{Date, Timestamp}
 import java.text.{DateFormat, SimpleDateFormat}
+import java.time.LocalDateTime
+import java.time.temporal.ChronoField
 import java.util.{Calendar, Locale, TimeZone}
 import java.util.concurrent.ConcurrentHashMap
 import java.util.function.{Function => JFunction}
@@ -143,6 +145,12 @@ object DateTimeUtils {
 millisLocal - getOffsetFromLocalMillis(millisLocal, timeZone)
   }
 
+  def dateTimeToMicroseconds(localDateTime: LocalDateTime, timeZone: 
TimeZone): Long = {
+val microOfSecond = localDateTime.getLong(ChronoField.MICRO_OF_SECOND)
+val epochSecond = 
localDateTime.atZone(timeZone.toZoneId).toInstant.getEpochSecond
+epochSecond * 100L + microOfSecond
+  }
+
   def dateToString(days: SQLDate): String =
 getThreadLocalDateFormat.format(toJavaDate(days))
 
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala
index cbf6106697f30..cd1b7395b97d5 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala
@@ -19,11 +19,14 @@ package org.apache.spark.sql.catalyst.util
 
 import java.sql.{Date, Timestamp}
 import java.text.SimpleDateFormat
+import java.time.LocalDateTime
+import java.time.format.DateTimeFormatter
 import java.util.{Calendar, Locale, TimeZone}
 
 import org.apache.spark.SparkFunSuite
 import org.apache.spark.sql.catalyst.util.DateTimeUtils._
 import org.apache.spark.unsafe.types.UTF8String
+import org.junit.Assert.assertEquals
 
 class DateTimeUtilsSuite extends SparkFunSuite {
 
@@ -645,6 +648,18 @@ class DateTimeUtilsSuite extends SparkFunSuite {
 }
   }
 
+  test("Java 8 LocalDateTime to microseconds") {
+val nanos = "2015-05-09 00:10:23.999750987"
+var formatter = DateTimeFormatter.ofPattern("-MM-dd 
HH:mm:ss.S")
+val localDateTimeInNanos = LocalDateTime.parse(nanos, formatter)
+val timeInMicros = dateTimeToMicroseconds(localDateTimeInNanos, 
TimeZonePST)
+assertEquals(1431155423999750L, timeInMicros)
+val micros = "2015-05-09 00:10:23.999750"
+formatter = DateTimeFormatter.ofPattern("-MM-dd HH:mm:ss.SS")
+val localDateTimeInMicros = LocalDateTime.parse(micros, formatter)
+assertEquals(timeInMicros, dateTimeToMicroseconds(localDateTimeInMicros, 
TimeZonePST))
+  }
+
   test("daysToMillis and millisToDays") {
 val c = Calendar.getInstance(TimeZonePST)
 
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchema.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchema.scala
index a585cbed2551b..6239f5666cd4f 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchema.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchema.scala
@@ -90,6 +90,7 @@ private[csv] object CSVInferSchema {
   // DecimalTypes have different precisions and scales, so we try to 
find the common type.
   findTightestCommonType(typeSoFar, tryParseDecimal(field, 
options)).getOrElse(StringType)
 case DoubleType => tryParseDouble(field, options)
+case DateType => tryParseDate(field, options)
 case TimestampType => tryParseTimestamp(field, options)
 case BooleanType => tryParseBoolean(field, options)
 case StringType => StringType
@@ -140,14 +141,23 @@ private[csv] object CSVInferSchema {
   private def tryParseDouble(field: String, options: CSVOptions): DataType = {
 if ((allCatch opt field.toDouble).isDefined || isInfOrNan(field, options)) 
{
   DoubleType
+} else 

[jira] [Updated] (SPARK-19228) inferSchema function processed csv date column as string and "dateFormat" DataSource option is ignored

2018-12-18 Thread Hyukjin Kwon (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-19228?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hyukjin Kwon updated SPARK-19228:
-
Labels:   (was: easyfix)

> inferSchema function processed csv date column as string and "dateFormat" 
> DataSource option is ignored
> --
>
> Key: SPARK-19228
> URL: https://issues.apache.org/jira/browse/SPARK-19228
> Project: Spark
>  Issue Type: Bug
>  Components: Input/Output, SQL
>Affects Versions: 2.1.0
>Reporter: Sergey Rubtsov
>Priority: Major
>   Original Estimate: 6h
>  Remaining Estimate: 6h
>
> Current FastDateFormat parser can't properly parse date and timestamp and 
> does not meet the ISO8601.
>  For example, I need to process user.csv like this:
> {code:java}
> id,project,started,ended
> sergey.rubtsov,project0,12/12/2012,10/10/2015
> {code}
> When I add date format options:
> {code:java}
> Dataset users = spark.read().format("csv").option("mode", 
> "PERMISSIVE").option("header", "true")
> .option("inferSchema", 
> "true").option("dateFormat", 
> "dd/MM/").load("src/main/resources/user.csv");
>   users.printSchema();
> {code}
> expected scheme should be
> {code:java}
> root
>  |-- id: string (nullable = true)
>  |-- project: string (nullable = true)
>  |-- started: date (nullable = true)
>  |-- ended: date (nullable = true)
> {code}
> but the actual result is:
> {code:java}
> root
>  |-- id: string (nullable = true)
>  |-- project: string (nullable = true)
>  |-- started: string (nullable = true)
>  |-- ended: string (nullable = true)
> {code}
> This mean that date processed as string and "dateFormat" option is ignored.
>  If I add option
> {code:java}
> .option("timestampFormat", "dd/MM/")
> {code}
> result is:
> {code:java}
> root
>  |-- id: string (nullable = true)
>  |-- project: string (nullable = true)
>  |-- started: timestamp (nullable = true)
>  |-- ended: timestamp (nullable = true)
> {code}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-26393) Different behaviors of date_add when calling it inside expr

2018-12-18 Thread Ahmed Kamal` (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26393?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ahmed Kamal` updated SPARK-26393:
-
Description: 
When Calling date_add from pyspark.sql.functions directly without using expr, 
like this : 
{code:java}
df.withColumn("added", F.date_add(F.to_date(F.lit('1998-9-26')), 
F.col('days'))).toPandas(){code}
It will raise Error : `TypeError: Column is not iterable`

because it only taking a number not a column 

but when i try to use it inside an expr, like this :
{code:java}
df.withColumn("added", F.expr("date_add(to_date('1998-9-26'), 
days)")).toPandas(){code}
It will work fine.

Shouldn't it behave the same way ? 

and i think its logical to accept a column  here as well.

A python Notebook to demonstrate :

[https://gist.github.com/AhmedKamal20/fec10337e815baa44f115d307e3b07eb]

  was:
When Calling date_add from pyspark.sql.functions directly without using expr, 
like this : 
{code:java}
df.withColumn("added", F.date_add(F.to_date(F.lit('1998-9-26')), 
F.col('days'))).toPandas(){code}
It will raise Error : `TypeError: Column is not iterable`

because it only taking a number not a column 

but when i try to use it inside an expr, like this :

 
{code:java}
df.withColumn("added", F.expr("date_add(to_date('1998-9-26'), 
days)")).toPandas()
{code}
 

it will work fine.

 

Shouldn't it behave the same way ? 

and i thin its logical to accept a column  here as well.

 

A python Notebook to demonstrate :

https://gist.github.com/AhmedKamal20/fec10337e815baa44f115d307e3b07eb


> Different behaviors of date_add when calling it inside expr
> ---
>
> Key: SPARK-26393
> URL: https://issues.apache.org/jira/browse/SPARK-26393
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Affects Versions: 2.3.2
>Reporter: Ahmed Kamal`
>Priority: Minor
>
> When Calling date_add from pyspark.sql.functions directly without using expr, 
> like this : 
> {code:java}
> df.withColumn("added", F.date_add(F.to_date(F.lit('1998-9-26')), 
> F.col('days'))).toPandas(){code}
> It will raise Error : `TypeError: Column is not iterable`
> because it only taking a number not a column 
> but when i try to use it inside an expr, like this :
> {code:java}
> df.withColumn("added", F.expr("date_add(to_date('1998-9-26'), 
> days)")).toPandas(){code}
> It will work fine.
> Shouldn't it behave the same way ? 
> and i think its logical to accept a column  here as well.
> A python Notebook to demonstrate :
> [https://gist.github.com/AhmedKamal20/fec10337e815baa44f115d307e3b07eb]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-26393) Different behaviors of date_add when calling it inside expr

2018-12-18 Thread Ahmed Kamal` (JIRA)
Ahmed Kamal` created SPARK-26393:


 Summary: Different behaviors of date_add when calling it inside 
expr
 Key: SPARK-26393
 URL: https://issues.apache.org/jira/browse/SPARK-26393
 Project: Spark
  Issue Type: Bug
  Components: PySpark
Affects Versions: 2.3.2
Reporter: Ahmed Kamal`


When Calling date_add from pyspark.sql.functions directly without using expr, 
like this : 
{code:java}
df.withColumn("added", F.date_add(F.to_date(F.lit('1998-9-26')), 
F.col('days'))).toPandas(){code}
It will raise Error : `TypeError: Column is not iterable`

because it only taking a number not a column 

but when i try to use it inside an expr, like this :

 
{code:java}
df.withColumn("added", F.expr("date_add(to_date('1998-9-26'), 
days)")).toPandas()
{code}
 

it will work fine.

 

Shouldn't it behave the same way ? 

and i thin its logical to accept a column  here as well.

 

A python Notebook to demonstrate :

https://gist.github.com/AhmedKamal20/fec10337e815baa44f115d307e3b07eb



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-26377) java.lang.IllegalStateException: No current assignment for partition

2018-12-18 Thread pavan (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-26377?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16723908#comment-16723908
 ] 

pavan commented on SPARK-26377:
---

Hi Hyun,

         I dint tried 2.4.0. I am raising this as a bug because as i am 
continuously facing the issue. Will try it. 

Thanks,

Pavan

> java.lang.IllegalStateException: No current assignment for partition
> 
>
> Key: SPARK-26377
> URL: https://issues.apache.org/jira/browse/SPARK-26377
> Project: Spark
>  Issue Type: Bug
>  Components: DStreams
>Affects Versions: 2.2.1
>Reporter: pavan
>Priority: Critical
>
> Hi,
>    I am using sparkkafkaDirectStream with subscriberPattern with initial 
> offsets for topics and a pattern. On running the SparkJob on the job server  
> i am getting the following exception.The job is terminated. 
> Kafka Params:
> "bootstrap.servers" -> credentials.getBrokers,
>  "key.deserializer" -> classOf[StringDeserializer],
>  "value.deserializer" -> classOf[ByteArrayDeserializer],
>  "enable.auto.commit" -> (false: java.lang.Boolean)
> "group.id" -> "abc"
> API:
> KafkaUtils.createDirectStream(streamingContext, PreferConsistent, 
> SubscribePattern[K, V](regexPattern, allKafkaParams, offsets), 
> perPartitionConfig)
>  
> Error Log:
> { "duration": "33.523 secs", "classPath": 
> "com.appiot.dataingestion.DataIngestionJob", "startTime": 
> "2018-12-15T18:28:08.207Z", "context": 
> "c~1d750906-1fa7-44f9-a258-04963ac53150~9dc097e3-bf0f-432c-9c27-68c41a4009cd",
>  "result": 
> { "message": "java.lang.IllegalStateException: No current assignment for 
> partition com-cibigdata2.v1.iot.raw_timeseries-0", "errorClass": 
> "java.lang.RuntimeException", "stack": "java.lang.RuntimeException: 
> java.lang.IllegalStateException: No current assignment for partition 
> com-cibigdata2.v1.iot.raw_timeseries-0\n\tat 
> org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:269)\n\tat
>  
> org.apache.kafka.clients.consumer.internals.SubscriptionState.seek(SubscriptionState.java:294)\n\tat
>  
> org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1249)\n\tat
>  
> org.apache.spark.streaming.kafka010.SubscribePattern$$anonfun$onStart$4.apply(ConsumerStrategy.scala:160)\n\tat
>  
> org.apache.spark.streaming.kafka010.SubscribePattern$$anonfun$onStart$4.apply(ConsumerStrategy.scala:159)\n\tat
>  scala.collection.Iterator$class.foreach(Iterator.scala:893)\n\tat 
> scala.collection.AbstractIterator.foreach(Iterator.scala:1336)\n\tat 
> scala.collection.IterableLike$class.foreach(IterableLike.scala:72)\n\tat 
> scala.collection.AbstractIterable.foreach(Iterable.scala:54)\n\tat 
> org.apache.spark.streaming.kafka010.SubscribePattern.onStart(ConsumerStrategy.scala:159)\n\tat
>  
> org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.consumer(DirectKafkaInputDStream.scala:72)\n\tat
>  
> org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.start(DirectKafkaInputDStream.scala:242)\n\tat
>  
> org.apache.spark.streaming.DStreamGraph$$anonfun$start$7.apply(DStreamGraph.scala:54)\n\tat
>  
> org.apache.spark.streaming.DStreamGraph$$anonfun$start$7.apply(DStreamGraph.scala:54)\n\tat
>  
> scala.collection.parallel.mutable.ParArray$ParArrayIterator.foreach_quick(ParArray.scala:143)\n\tat
>  
> scala.collection.parallel.mutable.ParArray$ParArrayIterator.foreach(ParArray.scala:136)\n\tat
>  
> scala.collection.parallel.ParIterableLike$Foreach.leaf(ParIterableLike.scala:972)\n\tat
>  
> scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:49)\n\tat
>  
> scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48)\n\tat 
> scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48)\n\tat 
> scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:51)\n\tat 
> scala.collection.parallel.ParIterableLike$Foreach.tryLeaf(ParIterableLike.scala:969)\n\tat
>  
> scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:152)\n\tat
>  
> scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:443)\n\tat
>  
> scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160)\n\tat
>  scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)\n\tat 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)\n\tat
>  
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)\n\tat
>  
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)\n\tat
>  ... run in separate thread using org.apache.spark.util.ThreadUtils ... 
> ()\n\tat 
> org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:578)\n\tat
>  
> 

[jira] [Updated] (SPARK-26377) java.lang.IllegalStateException: No current assignment for partition

2018-12-18 Thread pavan (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26377?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

pavan updated SPARK-26377:
--
Description: 
Hi,

   I am using sparkkafkaDirectStream with subscriberPattern with initial 
offsets for topics and a pattern. On running the SparkJob on the job server  i 
am getting the following exception.The job is terminated. 

Kafka Params:

"bootstrap.servers" -> credentials.getBrokers,
 "key.deserializer" -> classOf[StringDeserializer],
 "value.deserializer" -> classOf[ByteArrayDeserializer],
 "enable.auto.commit" -> (false: java.lang.Boolean)

"group.id" -> "abc"

API:

KafkaUtils.createDirectStream(streamingContext, PreferConsistent, 
SubscribePattern[K, V](regexPattern, allKafkaParams, offsets), 
perPartitionConfig)

 

Error Log:

{ "duration": "33.523 secs", "classPath": 
"com.appiot.dataingestion.DataIngestionJob", "startTime": 
"2018-12-15T18:28:08.207Z", "context": 
"c~1d750906-1fa7-44f9-a258-04963ac53150~9dc097e3-bf0f-432c-9c27-68c41a4009cd", 
"result": 

{ "message": "java.lang.IllegalStateException: No current assignment for 
partition com-cibigdata2.v1.iot.raw_timeseries-0", "errorClass": 
"java.lang.RuntimeException", "stack": "java.lang.RuntimeException: 
java.lang.IllegalStateException: No current assignment for partition 
com-cibigdata2.v1.iot.raw_timeseries-0\n\tat 
org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:269)\n\tat
 
org.apache.kafka.clients.consumer.internals.SubscriptionState.seek(SubscriptionState.java:294)\n\tat
 
org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1249)\n\tat
 
org.apache.spark.streaming.kafka010.SubscribePattern$$anonfun$onStart$4.apply(ConsumerStrategy.scala:160)\n\tat
 
org.apache.spark.streaming.kafka010.SubscribePattern$$anonfun$onStart$4.apply(ConsumerStrategy.scala:159)\n\tat
 scala.collection.Iterator$class.foreach(Iterator.scala:893)\n\tat 
scala.collection.AbstractIterator.foreach(Iterator.scala:1336)\n\tat 
scala.collection.IterableLike$class.foreach(IterableLike.scala:72)\n\tat 
scala.collection.AbstractIterable.foreach(Iterable.scala:54)\n\tat 
org.apache.spark.streaming.kafka010.SubscribePattern.onStart(ConsumerStrategy.scala:159)\n\tat
 
org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.consumer(DirectKafkaInputDStream.scala:72)\n\tat
 
org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.start(DirectKafkaInputDStream.scala:242)\n\tat
 
org.apache.spark.streaming.DStreamGraph$$anonfun$start$7.apply(DStreamGraph.scala:54)\n\tat
 
org.apache.spark.streaming.DStreamGraph$$anonfun$start$7.apply(DStreamGraph.scala:54)\n\tat
 
scala.collection.parallel.mutable.ParArray$ParArrayIterator.foreach_quick(ParArray.scala:143)\n\tat
 
scala.collection.parallel.mutable.ParArray$ParArrayIterator.foreach(ParArray.scala:136)\n\tat
 
scala.collection.parallel.ParIterableLike$Foreach.leaf(ParIterableLike.scala:972)\n\tat
 
scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:49)\n\tat
 scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48)\n\tat 
scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48)\n\tat 
scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:51)\n\tat 
scala.collection.parallel.ParIterableLike$Foreach.tryLeaf(ParIterableLike.scala:969)\n\tat
 
scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:152)\n\tat
 
scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:443)\n\tat
 scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160)\n\tat 
scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)\n\tat 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)\n\tat
 scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)\n\tat 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)\n\tat
 ... run in separate thread using org.apache.spark.util.ThreadUtils ... 
()\n\tat 
org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:578)\n\tat
 
org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:572)\n\tat
 
com.sap.appiot.dataingestion.DataIngestionJob$.runJob(DataIngestionJob.scala:182)\n\tat
 
com.sap.appiot.dataingestion.DataIngestionJob$.runJob(DataIngestionJob.scala:24)\n\tat
 spark.jobserver.SparkJobBase$class.runJob(SparkJob.scala:31)\n\tat 
com.appiot.dataingestion.DataIngestionJob$.runJob(DataIngestionJob.scala:24)\n\tat
 
com.appiot.dataingestion.DataIngestionJob$.runJob(DataIngestionJob.scala:24)\n\tat
 
spark.jobserver.JobManagerActor$$anonfun$getJobFuture$4.apply(JobManagerActor.scala:594)\n\tat
 
scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)\n\tat
 
scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)\n\tat
 

[jira] [Assigned] (SPARK-26384) CSV schema inferring does not respect spark.sql.legacy.timeParser.enabled

2018-12-18 Thread Apache Spark (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26384?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-26384:


Assignee: Apache Spark

> CSV schema inferring does not respect spark.sql.legacy.timeParser.enabled
> -
>
> Key: SPARK-26384
> URL: https://issues.apache.org/jira/browse/SPARK-26384
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.0.0
>Reporter: Maxim Gekk
>Assignee: Apache Spark
>Priority: Major
>
> Starting from the commit 
> [https://github.com/apache/spark/commit/f982ca07e80074bdc1e3b742c5e21cf368e4ede2]
>  , add logging like in the comment 
> https://github.com/apache/spark/pull/23150#discussion_r242021998 and run:
> {code:shell}
> $ ./bin/spark-shell --conf spark.sql.legacy.timeParser.enabled=true
> {code}
> and in the shell:
> {code:scala}
> scala> spark.conf.get("spark.sql.legacy.timeParser.enabled")
> res0: String = true
> scala> 
> Seq("2010|10|10").toDF.repartition(1).write.mode("overwrite").text("/tmp/foo")
> scala> spark.read.option("inferSchema", "true").option("header", 
> "false").option("timestampFormat", "|MM|dd").csv("/tmp/foo").printSchema()
> 18/12/17 12:11:47 ERROR TimestampFormatter: Iso8601TimestampFormatter is 
> being used
> root
>  |-- _c0: timestamp (nullable = true)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-26384) CSV schema inferring does not respect spark.sql.legacy.timeParser.enabled

2018-12-18 Thread Apache Spark (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26384?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-26384:


Assignee: (was: Apache Spark)

> CSV schema inferring does not respect spark.sql.legacy.timeParser.enabled
> -
>
> Key: SPARK-26384
> URL: https://issues.apache.org/jira/browse/SPARK-26384
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.0.0
>Reporter: Maxim Gekk
>Priority: Major
>
> Starting from the commit 
> [https://github.com/apache/spark/commit/f982ca07e80074bdc1e3b742c5e21cf368e4ede2]
>  , add logging like in the comment 
> https://github.com/apache/spark/pull/23150#discussion_r242021998 and run:
> {code:shell}
> $ ./bin/spark-shell --conf spark.sql.legacy.timeParser.enabled=true
> {code}
> and in the shell:
> {code:scala}
> scala> spark.conf.get("spark.sql.legacy.timeParser.enabled")
> res0: String = true
> scala> 
> Seq("2010|10|10").toDF.repartition(1).write.mode("overwrite").text("/tmp/foo")
> scala> spark.read.option("inferSchema", "true").option("header", 
> "false").option("timestampFormat", "|MM|dd").csv("/tmp/foo").printSchema()
> 18/12/17 12:11:47 ERROR TimestampFormatter: Iso8601TimestampFormatter is 
> being used
> root
>  |-- _c0: timestamp (nullable = true)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-26384) CSV schema inferring does not respect spark.sql.legacy.timeParser.enabled

2018-12-18 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-26384?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16723886#comment-16723886
 ] 

ASF GitHub Bot commented on SPARK-26384:


MaxGekk opened a new pull request #23345: [SPARK-26384][SQL] Propagate SQL 
configs for CSV schema inferring
URL: https://github.com/apache/spark/pull/23345
 
 
   ## What changes were proposed in this pull request?
   
   Currently, SQL configs are not propagated to executors while schema 
inferring in CSV datasource. For example, changing of 
`spark.sql.legacy.timeParser.enabled` does not impact on inferring timestamp 
types. In the PR, I propose to fix the issue by wrapping schema inferring 
action using `SQLExecution.withSQLConfPropagated`.
   
   ## How was this patch tested?
   
   Added logging to `TimestampFormatter`:
   ```patch
   -object TimestampFormatter {
   +object TimestampFormatter extends Logging {
  def apply(format: String, timeZone: TimeZone, locale: Locale): 
TimestampFormatter = {
if (SQLConf.get.legacyTimeParserEnabled) {
   +  logError("LegacyFallbackTimestampFormatter is being used")
  new LegacyFallbackTimestampFormatter(format, timeZone, locale)
} else {
   +  logError("Iso8601TimestampFormatter is being used")
  new Iso8601TimestampFormatter(format, timeZone, locale)
}
  }
   ```
   and run the command in `spark-shell`:
   ```shell
   $ ./bin/spark-shell --conf spark.sql.legacy.timeParser.enabled=true
   ```
   ```scala
   scala> 
Seq("2010|10|10").toDF.repartition(1).write.mode("overwrite").text("/tmp/foo")
   scala> spark.read.option("inferSchema", "true").option("header", 
"false").option("timestampFormat", "|MM|dd").csv("/tmp/foo").printSchema()
   18/12/18 10:47:27 ERROR TimestampFormatter: LegacyFallbackTimestampFormatter 
is being used
   root
|-- _c0: timestamp (nullable = true)
   ```
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> CSV schema inferring does not respect spark.sql.legacy.timeParser.enabled
> -
>
> Key: SPARK-26384
> URL: https://issues.apache.org/jira/browse/SPARK-26384
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.0.0
>Reporter: Maxim Gekk
>Priority: Major
>
> Starting from the commit 
> [https://github.com/apache/spark/commit/f982ca07e80074bdc1e3b742c5e21cf368e4ede2]
>  , add logging like in the comment 
> https://github.com/apache/spark/pull/23150#discussion_r242021998 and run:
> {code:shell}
> $ ./bin/spark-shell --conf spark.sql.legacy.timeParser.enabled=true
> {code}
> and in the shell:
> {code:scala}
> scala> spark.conf.get("spark.sql.legacy.timeParser.enabled")
> res0: String = true
> scala> 
> Seq("2010|10|10").toDF.repartition(1).write.mode("overwrite").text("/tmp/foo")
> scala> spark.read.option("inferSchema", "true").option("header", 
> "false").option("timestampFormat", "|MM|dd").csv("/tmp/foo").printSchema()
> 18/12/17 12:11:47 ERROR TimestampFormatter: Iso8601TimestampFormatter is 
> being used
> root
>  |-- _c0: timestamp (nullable = true)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-26392) Cancel pending allocate requests by taking locality preference into account

2018-12-18 Thread Apache Spark (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26392?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-26392:


Assignee: (was: Apache Spark)

> Cancel pending allocate requests by taking locality preference into account
> ---
>
> Key: SPARK-26392
> URL: https://issues.apache.org/jira/browse/SPARK-26392
> Project: Spark
>  Issue Type: Improvement
>  Components: YARN
>Affects Versions: 2.4.0
>Reporter: wuyi
>Priority: Minor
>  Labels: patch
> Fix For: 2.4.1
>
>
> Right now, we cancel pending allocate requests by its sending order. I thing 
> we can take 
> locality preference into account when do this to perfom least impact on task 
> locality preference.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-26392) Cancel pending allocate requests by taking locality preference into account

2018-12-18 Thread Apache Spark (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26392?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-26392:


Assignee: Apache Spark

> Cancel pending allocate requests by taking locality preference into account
> ---
>
> Key: SPARK-26392
> URL: https://issues.apache.org/jira/browse/SPARK-26392
> Project: Spark
>  Issue Type: Improvement
>  Components: YARN
>Affects Versions: 2.4.0
>Reporter: wuyi
>Assignee: Apache Spark
>Priority: Minor
>  Labels: patch
> Fix For: 2.4.1
>
>
> Right now, we cancel pending allocate requests by its sending order. I thing 
> we can take 
> locality preference into account when do this to perfom least impact on task 
> locality preference.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-26392) Cancel pending allocate requests by taking locality preference into account

2018-12-18 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-26392?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16723855#comment-16723855
 ] 

ASF GitHub Bot commented on SPARK-26392:


Ngone51 opened a new pull request #23344: [SPARK-26392][YARN] Cancel pending 
allocate requests by taking locality preference into a…
URL: https://github.com/apache/spark/pull/23344
 
 
   …ccount
   
   ## What changes were proposed in this pull request?
   
   Right now, we cancel pending allocate requests by its sending order. I thing 
we can take 
   
   locality preference into account when do this to perfom least impact on task 
locality preference.
   
   ## How was this patch tested?
   
   N.A.
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Cancel pending allocate requests by taking locality preference into account
> ---
>
> Key: SPARK-26392
> URL: https://issues.apache.org/jira/browse/SPARK-26392
> Project: Spark
>  Issue Type: Improvement
>  Components: YARN
>Affects Versions: 2.4.0
>Reporter: wuyi
>Priority: Minor
>  Labels: patch
> Fix For: 2.4.1
>
>
> Right now, we cancel pending allocate requests by its sending order. I thing 
> we can take 
> locality preference into account when do this to perfom least impact on task 
> locality preference.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-26392) Cancel pending allocate requests by taking locality preference into account

2018-12-18 Thread wuyi (JIRA)
wuyi created SPARK-26392:


 Summary: Cancel pending allocate requests by taking locality 
preference into account
 Key: SPARK-26392
 URL: https://issues.apache.org/jira/browse/SPARK-26392
 Project: Spark
  Issue Type: Improvement
  Components: YARN
Affects Versions: 2.4.0
Reporter: wuyi
 Fix For: 2.4.1


Right now, we cancel pending allocate requests by its sending order. I thing we 
can take 

locality preference into account when do this to perfom least impact on task 
locality preference.

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-26385) YARN - Spark Stateful Structured streaming HDFS_DELEGATION_TOKEN not found in cache

2018-12-18 Thread T M (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26385?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

T M updated SPARK-26385:

Description: 
 

Hello,

 

I have Spark Structured Streaming job which is runnning on YARN(Hadoop 2.6.0, 
Spark 2.4.0). After 25-26 hours, my job stops working with following error:
{code:java}
2018-12-16 22:35:17 ERROR 
org.apache.spark.internal.Logging$class.logError(Logging.scala:91): Query 
TestQuery[id = a61ce197-1d1b-4e82-a7af-60162953488b, runId = 
a56878cf-dfc7-4f6a-ad48-02cf738ccc2f] terminated with error 
org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.token.SecretManager$InvalidToken):
 token (token for REMOVED: HDFS_DELEGATION_TOKEN owner=REMOVED, renewer=yarn, 
realUser=, issueDate=1544903057122, maxDate=1545507857122, 
sequenceNumber=10314, masterKeyId=344) can't be found in cache at 
org.apache.hadoop.ipc.Client.call(Client.java:1470) at 
org.apache.hadoop.ipc.Client.call(Client.java:1401) at 
org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:232)
 at com.sun.proxy.$Proxy9.getFileInfo(Unknown Source) at 
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getFileInfo(ClientNamenodeProtocolTranslatorPB.java:752)
 at sun.reflect.GeneratedMethodAccessor5.invoke(Unknown Source) at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:498) at 
org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:187)
 at 
org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
 at com.sun.proxy.$Proxy10.getFileInfo(Unknown Source) at 
org.apache.hadoop.hdfs.DFSClient.getFileInfo(DFSClient.java:1977) at 
org.apache.hadoop.fs.Hdfs.getFileStatus(Hdfs.java:133) at 
org.apache.hadoop.fs.FileContext$14.next(FileContext.java:1120) at 
org.apache.hadoop.fs.FileContext$14.next(FileContext.java:1116) at 
org.apache.hadoop.fs.FSLinkResolver.resolve(FSLinkResolver.java:90) at 
org.apache.hadoop.fs.FileContext.getFileStatus(FileContext.java:1116) at 
org.apache.hadoop.fs.FileContext$Util.exists(FileContext.java:1581) at 
org.apache.spark.sql.execution.streaming.FileContextBasedCheckpointFileManager.exists(CheckpointFileManager.scala:326)
 at 
org.apache.spark.sql.execution.streaming.HDFSMetadataLog.get(HDFSMetadataLog.scala:142)
 at 
org.apache.spark.sql.execution.streaming.HDFSMetadataLog.add(HDFSMetadataLog.scala:110)
 at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$1.apply$mcV$sp(MicroBatchExecution.scala:544)
 at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$1.apply(MicroBatchExecution.scala:542)
 at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$1.apply(MicroBatchExecution.scala:542)
 at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution.withProgressLocked(MicroBatchExecution.scala:554)
 at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch(MicroBatchExecution.scala:542)
 at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:198)
 at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166)
 at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166)
 at 
org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351)
 at 
org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
 at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:166)
 at 
org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
 at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:160)
 at 
org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:279)
 at 
org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:189){code}
 

^It is important to notice that I tried usual fix for this kind of problems:^

 
{code:java}
--conf "spark.hadoop.fs.hdfs.impl.disable.cache=true"
 
{code}

  was:
 

Hello,

 

I have Spark Structured Streaming job which is runnning on YARN(Hadoop 2.6.0, 
Spark 2.4.0). After 25-26 hours, my 

[jira] [Created] (SPARK-26391) Spark Streaming Kafka with Offset Gaps

2018-12-18 Thread Rishabh (JIRA)
Rishabh created SPARK-26391:
---

 Summary: Spark Streaming Kafka with Offset Gaps
 Key: SPARK-26391
 URL: https://issues.apache.org/jira/browse/SPARK-26391
 Project: Spark
  Issue Type: Question
  Components: Spark Core, Structured Streaming
Affects Versions: 2.4.0
Reporter: Rishabh


I have an app that uses Kafka Streaming to pull data from `input` topic and 
push to `output` topic with `processing.guarantee=exactly_once`. Due to 
`exactly_once` gaps (transaction markers) are created in Kafka. Let's call this 
app `kafka-streamer`.

Now I've another app that listens to this output topic (actually they are 
multiple topics with a Pattern/Regex) and processes the data using 
[https://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html]. 
Let's call this app `spark-streamer`.

Due to the gaps, the first thing that happens is spark streaming fails. To fix 
this I enabled `spark.streaming.kafka.allowNonConsecutiveOffsets=true` in the 
spark config before creating the StreamingContext. Now let's look at the issues 
that were faced when I start `spark-streamer`:
 # Even though there are new offsets to be polled/consumed, it requires another 
message push to the topic partition to be able to start processing. If I start 
the app (and there are messages in queue to be polled) and don't push any 
topic, the code will timeout after default 120ms and throw an exception.
 # It doesn't fetch the last record. It fetches the record till second-last. 
This means to poll/process the last record, another message has to be pushed. 
This is a problem for us since `spark-streamer` is listening to multiple topics 
(based on a pattern) and there might be a topic where throughput is low but the 
data should still make it to Spark for processing.
 # In general if no data/message is pushed then it'll die after 120ms default 
timeout for polling.

Now in the limited amount of time I had, I tried going through the 
spark-streaming-kafka code and was only able to find an answer to the third 
problem which is this - 
[https://github.com/apache/spark/blob/master/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaDataConsumer.scala#L178]

My questions are:
 # Why do we throw an exception in `compactedNext()` if no data is polled ?
 # I wasn't able to figure out why the first and second issue happened, would 
be great if somebody can point out a solution or reason behind the behaviour ?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org