[jira] [Comment Edited] (SPARK-24815) Structured Streaming should support dynamic allocation

2024-02-05 Thread Krystal Mitchell (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-24815?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17763940#comment-17763940
 ] 

Krystal Mitchell edited comment on SPARK-24815 at 2/5/24 3:32 PM:
--

Thank you [~pavan0831]. This draft PR will have a significant impact some of 
the projects we are currently working on. Can't wait to see it over the line.


was (Author: JIRAUSER302183):
Thank you [~pavan0831]. This draft PR will impact some of the projects we are 
currently working on. 

> Structured Streaming should support dynamic allocation
> --
>
> Key: SPARK-24815
> URL: https://issues.apache.org/jira/browse/SPARK-24815
> Project: Spark
>  Issue Type: Improvement
>  Components: Scheduler, Spark Core, Structured Streaming
>Affects Versions: 2.3.1
>Reporter: Karthik Palaniappan
>Priority: Minor
>  Labels: pull-request-available
>
> For batch jobs, dynamic allocation is very useful for adding and removing 
> containers to match the actual workload. On multi-tenant clusters, it ensures 
> that a Spark job is taking no more resources than necessary. In cloud 
> environments, it enables autoscaling.
> However, if you set spark.dynamicAllocation.enabled=true and run a structured 
> streaming job, the batch dynamic allocation algorithm kicks in. It requests 
> more executors if the task backlog is a certain size, and removes executors 
> if they idle for a certain period of time.
> Quick thoughts:
> 1) Dynamic allocation should be pluggable, rather than hardcoded to a 
> particular implementation in SparkContext.scala (this should be a separate 
> JIRA).
> 2) We should make a structured streaming algorithm that's separate from the 
> batch algorithm. Eventually, continuous processing might need its own 
> algorithm.
> 3) Spark should print a warning if you run a structured streaming job when 
> Core's dynamic allocation is enabled



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24815) Structured Streaming should support dynamic allocation

2023-09-11 Thread Krystal Mitchell (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-24815?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17763940#comment-17763940
 ] 

Krystal Mitchell commented on SPARK-24815:
--

Thank you [~pavan0831]. This draft PR will impact some of the projects we are 
currently working on. 

> Structured Streaming should support dynamic allocation
> --
>
> Key: SPARK-24815
> URL: https://issues.apache.org/jira/browse/SPARK-24815
> Project: Spark
>  Issue Type: Improvement
>  Components: Scheduler, Spark Core, Structured Streaming
>Affects Versions: 2.3.1
>Reporter: Karthik Palaniappan
>Priority: Minor
>  Labels: pull-request-available
>
> For batch jobs, dynamic allocation is very useful for adding and removing 
> containers to match the actual workload. On multi-tenant clusters, it ensures 
> that a Spark job is taking no more resources than necessary. In cloud 
> environments, it enables autoscaling.
> However, if you set spark.dynamicAllocation.enabled=true and run a structured 
> streaming job, the batch dynamic allocation algorithm kicks in. It requests 
> more executors if the task backlog is a certain size, and removes executors 
> if they idle for a certain period of time.
> Quick thoughts:
> 1) Dynamic allocation should be pluggable, rather than hardcoded to a 
> particular implementation in SparkContext.scala (this should be a separate 
> JIRA).
> 2) We should make a structured streaming algorithm that's separate from the 
> batch algorithm. Eventually, continuous processing might need its own 
> algorithm.
> 3) Spark should print a warning if you run a structured streaming job when 
> Core's dynamic allocation is enabled



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-32151) Kafka does not allow Partition Rebalance Handling

2020-07-16 Thread Ed Mitchell (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-32151?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17159330#comment-17159330
 ] 

Ed Mitchell commented on SPARK-32151:
-

Yeah - I think you're right. It's going to be in the write ahead logs or the 
checkpoint.

> Kafka does not allow Partition Rebalance Handling
> -
>
> Key: SPARK-32151
> URL: https://issues.apache.org/jira/browse/SPARK-32151
> Project: Spark
>  Issue Type: Improvement
>  Components: DStreams
>Affects Versions: 2.4.5
>Reporter: Ed Mitchell
>Priority: Minor
>
> When a consumer group rebalance occurs when the Spark driver is using the 
> Subscribe or Subscribe Pattern ConsumerStrategy, driver's offsets are cleared 
> when partitions are revoked and then reassigned.
> While this doesn't happen in the normal rebalance scenario of more consumers 
> joining the group (though it could), it does happen when the partition leader 
> is reelected because of a Kafka node being stopped or decommissioned.
> This seems to only occur when users specify their own offsets and do not use 
> Kafka as the persistent store of offsets (they use their own database, and 
> possibly if using checkpointing).
> This could probably affect Structured Streaming.
> This presents itself as an "NoOffsetForPartitionException":
> {code:java}
> 20/05/13 01:37:00 ERROR JobScheduler: Error generating jobs for time 
> 158933382 ms
> org.apache.kafka.clients.consumer.NoOffsetForPartitionException: Undefined 
> offset with no reset policy for partitions: [production-ad-metrics-1, 
> production-ad-metrics-2, production-ad-metrics-0, production-ad-metrics-5, 
> production-ad-metrics-6, production-ad-metrics-3, production-ad-metrics-4, 
> production-ad-metrics-7]
>   at 
> org.apache.kafka.clients.consumer.internals.SubscriptionState.resetMissingPositions(SubscriptionState.java:391)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.updateFetchPositions(KafkaConsumer.java:2185)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1222)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1181)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1115)
>   at 
> org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.paranoidPoll(DirectKafkaInputDStream.scala:172)
>   at 
> org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.latestOffsets(DirectKafkaInputDStream.scala:191)
>   at 
> org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:228)
>   at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342)
>   at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342)
>   at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
>   at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341)
>   at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341)
>   at 
> org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416)
>   at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:336)
>   at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:334)
>   at scala.Option.orElse(Option.scala:289)
>   at 
> org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:331)
>   at 
> org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:48)
>   at 
> org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:122)
>   at 
> org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:121)
>   at 
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
>   at 
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
>   at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>   at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
>   at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
>   at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104)
>   at 
> org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:121)
>   at 
> org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:249)
>   at 
> org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:247)
>   at scala.util.Try$.apply(Try.scala:192)
>   at 
> org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:247)

[jira] [Commented] (SPARK-32151) Kafka does not allow Partition Rebalance Handling

2020-07-16 Thread Ed Mitchell (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-32151?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17159294#comment-17159294
 ] 

Ed Mitchell commented on SPARK-32151:
-

I could do that if I wanted to start back at the beginning or the end of the 
topic, but in this case, I would like it to restart back at the offsets defined 
by my datastore.

> Kafka does not allow Partition Rebalance Handling
> -
>
> Key: SPARK-32151
> URL: https://issues.apache.org/jira/browse/SPARK-32151
> Project: Spark
>  Issue Type: Improvement
>  Components: DStreams
>Affects Versions: 2.4.5
>Reporter: Ed Mitchell
>Priority: Minor
>
> When a consumer group rebalance occurs when the Spark driver is using the 
> Subscribe or Subscribe Pattern ConsumerStrategy, driver's offsets are cleared 
> when partitions are revoked and then reassigned.
> While this doesn't happen in the normal rebalance scenario of more consumers 
> joining the group (though it could), it does happen when the partition leader 
> is reelected because of a Kafka node being stopped or decommissioned.
> This seems to only occur when users specify their own offsets and do not use 
> Kafka as the persistent store of offsets (they use their own database, and 
> possibly if using checkpointing).
> This could probably affect Structured Streaming.
> This presents itself as an "NoOffsetForPartitionException":
> {code:java}
> 20/05/13 01:37:00 ERROR JobScheduler: Error generating jobs for time 
> 158933382 ms
> org.apache.kafka.clients.consumer.NoOffsetForPartitionException: Undefined 
> offset with no reset policy for partitions: [production-ad-metrics-1, 
> production-ad-metrics-2, production-ad-metrics-0, production-ad-metrics-5, 
> production-ad-metrics-6, production-ad-metrics-3, production-ad-metrics-4, 
> production-ad-metrics-7]
>   at 
> org.apache.kafka.clients.consumer.internals.SubscriptionState.resetMissingPositions(SubscriptionState.java:391)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.updateFetchPositions(KafkaConsumer.java:2185)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1222)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1181)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1115)
>   at 
> org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.paranoidPoll(DirectKafkaInputDStream.scala:172)
>   at 
> org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.latestOffsets(DirectKafkaInputDStream.scala:191)
>   at 
> org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:228)
>   at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342)
>   at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342)
>   at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
>   at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341)
>   at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341)
>   at 
> org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416)
>   at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:336)
>   at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:334)
>   at scala.Option.orElse(Option.scala:289)
>   at 
> org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:331)
>   at 
> org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:48)
>   at 
> org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:122)
>   at 
> org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:121)
>   at 
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
>   at 
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
>   at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>   at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
>   at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
>   at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104)
>   at 
> org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:121)
>   at 
> org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:249)
>   at 
> org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:247)
>   at scala.util.Try$.apply(Try.scala:192)
>   at 
> 

[jira] [Created] (SPARK-32151) Kafka does not allow Partition Rebalance Handling

2020-07-01 Thread Ed Mitchell (Jira)
Ed Mitchell created SPARK-32151:
---

 Summary: Kafka does not allow Partition Rebalance Handling
 Key: SPARK-32151
 URL: https://issues.apache.org/jira/browse/SPARK-32151
 Project: Spark
  Issue Type: Improvement
  Components: DStreams
Affects Versions: 2.4.5
Reporter: Ed Mitchell


When a consumer group rebalance occurs when the Spark driver is using the 
Subscribe or Subscribe Pattern ConsumerStrategy, driver's offsets are cleared 
when partitions are revoked and then reassigned.

While this doesn't happen in the normal rebalance scenario of more consumers 
joining the group (though it could), it does happen when the partition leader 
is reelected because of a Kafka node being stopped or decommissioned.

This seems to only occur when users specify their own offsets and do not use 
Kafka as the persistent store of offsets (they use their own database, and 
possibly if using checkpointing).

This could probably affect Structured Streaming.

This presents itself as an "NoOffsetForPartitionException":
{noformat}
20/05/13 01:37:00 ERROR JobScheduler: Error generating jobs for time 
158933382 
msorg.apache.kafka.clients.consumer.NoOffsetForPartitionException: Undefined 
offset with no reset policy for partitions: [production-ad-metrics-1, 
production-ad-metrics-2, production-ad-metrics-0, production-ad-metrics-5, 
production-ad-metrics-6, production-ad-metrics-3, production-ad-metrics-4, 
production-ad-metrics-7]  at 
org.apache.kafka.clients.consumer.internals.SubscriptionState.resetMissingPositions(SubscriptionState.java:391)
  at 
org.apache.kafka.clients.consumer.KafkaConsumer.updateFetchPositions(KafkaConsumer.java:2185)
  at 
org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1222)
  at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1181)  
at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1115)  
at 
org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.paranoidPoll(DirectKafkaInputDStream.scala:172)
  at 
org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.latestOffsets(DirectKafkaInputDStream.scala:191)
  at 
org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:228)
  at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342)
  at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342)
  at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)  at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341)
  at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341)
  at 
org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416)
  at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:336)
  at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:334)
  at scala.Option.orElse(Option.scala:289)  at 
org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:331)  at 
org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:48)
  at 
org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:122)
  at 
org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:121)
  at 
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
  at 
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
  at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)  
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)  at 
scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)  at 
scala.collection.AbstractTraversable.flatMap(Traversable.scala:104)  at 
org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:121)  
at 
org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:249)
  at 
org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:247)
  at scala.util.Try$.apply(Try.scala:192)  at 
org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:247)
  at 
org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:183)
  at 
org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:89)
  at 
org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:88)
  at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49){noformat}
This can be fixed by allowing the user to specify an
{code:java}

[jira] [Updated] (SPARK-32151) Kafka does not allow Partition Rebalance Handling

2020-07-01 Thread Ed Mitchell (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-32151?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ed Mitchell updated SPARK-32151:

Description: 
When a consumer group rebalance occurs when the Spark driver is using the 
Subscribe or Subscribe Pattern ConsumerStrategy, driver's offsets are cleared 
when partitions are revoked and then reassigned.

While this doesn't happen in the normal rebalance scenario of more consumers 
joining the group (though it could), it does happen when the partition leader 
is reelected because of a Kafka node being stopped or decommissioned.

This seems to only occur when users specify their own offsets and do not use 
Kafka as the persistent store of offsets (they use their own database, and 
possibly if using checkpointing).

This could probably affect Structured Streaming.

This presents itself as an "NoOffsetForPartitionException":
{code:java}
20/05/13 01:37:00 ERROR JobScheduler: Error generating jobs for time 
158933382 ms
org.apache.kafka.clients.consumer.NoOffsetForPartitionException: Undefined 
offset with no reset policy for partitions: [production-ad-metrics-1, 
production-ad-metrics-2, production-ad-metrics-0, production-ad-metrics-5, 
production-ad-metrics-6, production-ad-metrics-3, production-ad-metrics-4, 
production-ad-metrics-7]
  at 
org.apache.kafka.clients.consumer.internals.SubscriptionState.resetMissingPositions(SubscriptionState.java:391)
  at 
org.apache.kafka.clients.consumer.KafkaConsumer.updateFetchPositions(KafkaConsumer.java:2185)
  at 
org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1222)
  at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1181)
  at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1115)
  at 
org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.paranoidPoll(DirectKafkaInputDStream.scala:172)
  at 
org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.latestOffsets(DirectKafkaInputDStream.scala:191)
  at 
org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:228)
  at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342)
  at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342)
  at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
  at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341)
  at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341)
  at 
org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416)
  at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:336)
  at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:334)
  at scala.Option.orElse(Option.scala:289)
  at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:331)
  at 
org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:48)
  at 
org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:122)
  at 
org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:121)
  at 
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
  at 
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
  at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
  at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
  at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
  at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104)
  at 
org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:121)
  at 
org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:249)
  at 
org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:247)
  at scala.util.Try$.apply(Try.scala:192)
  at 
org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:247)
  at 
org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:183)
  at 
org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:89)
  at 
org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:88)
  at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
{code}
This can be fixed by allowing the user to specify an
{code:java}
org.apache.kafka.clients.consumer.ConsumerRebalanceListener{code}
in the KafkaConsumer#subscribe method.

The documentation for ConsumerRebalanceListener states that you can use 

[jira] [Commented] (SPARK-30055) Allow configurable restart policy of driver and executor pods

2020-04-20 Thread Ed Mitchell (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-30055?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17087857#comment-17087857
 ] 

Ed Mitchell commented on SPARK-30055:
-

I agree with this. Having Never defaulted limits the flexibility that allows 
Kubernetes to restart pods if they run out of memory or terminate in some 
undefined way.

You can also access logs of previously restarted containers by doing: 
{noformat}
kubectl -n  logs  --previous{noformat}
I understand not wanting to set "Always" to the Executor pod, to allow Spark to 
control graceful termination of executors, but shouldn't we at least set it to 
"OnFailure", to allow OOMKilled executors to come back up?

As far as the driver is concerned, our client mode setup has the driver pod 
living as a deployment, which means the restart policy is Always. No reason we 
can't allow Always or OnFailure in the driver restart policy imo.

> Allow configurable restart policy of driver and executor pods
> -
>
> Key: SPARK-30055
> URL: https://issues.apache.org/jira/browse/SPARK-30055
> Project: Spark
>  Issue Type: Improvement
>  Components: Kubernetes
>Affects Versions: 3.1.0
>Reporter: Kevin Hogeland
>Priority: Major
>
> The current Kubernetes scheduler hard-codes the restart policy for all pods 
> to be "Never". To restart a failed application, all pods have to be deleted 
> and rescheduled, which is very slow and clears any caches the processes may 
> have built. Spark should allow a configurable restart policy for both drivers 
> and executors for immediate restart of crashed/killed drivers/executors as 
> long as the pods are not evicted. (This is not about eviction resilience, 
> that's described in this issue: SPARK-23980)
> Also, as far as I can tell, there's no reason the executors should be set to 
> never restart. Should that be configurable or should it just be changed to 
> Always?
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-31482) spark.kubernetes.driver.podTemplateFile Configuration not used by the job

2020-04-19 Thread Ed Mitchell (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-31482?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17087195#comment-17087195
 ] 

Ed Mitchell commented on SPARK-31482:
-

Yeah. It's actually there but there's some unescaped HTML that was fixed in a 
later commit:

[https://github.com/apache/spark/commit/44e314edb4b86ca3a8622124539073397dbe68de#diff-b5527f236b253e0d9f5db5164bdb43e9]

 

> spark.kubernetes.driver.podTemplateFile Configuration not used by the job
> -
>
> Key: SPARK-31482
> URL: https://issues.apache.org/jira/browse/SPARK-31482
> Project: Spark
>  Issue Type: Bug
>  Components: Kubernetes
>Affects Versions: 3.0.0
>Reporter: Pradeep Misra
>Priority: Blocker
>
> Spark 3.0 - Running Spark Submit as below and point to a MinKube cluster
> {code:java}
> bin/spark-submit \
>  --master k8s://https://192.168.99.102:8443 \
>  --deploy-mode cluster \
>  --name spark-pi \
>  --class org.apache.spark.examples.SparkPi \
>  --conf spark.kubernetes.driver.podTemplateFile=../driver_1E.template \
>  --conf spark.kubernetes.executor.podTemplateFile=../executor.template \
>  --conf spark.kubernetes.container.image=spark:spark3 \
>  local:///opt/spark/examples/jars/spark-examples_2.12-3.0.0-preview2.jar 1
> {code}
>  
> Spark Binaries - spark-3.0.0-preview2-bin-hadoop2.7.tgz
> Driver Template - 
> {code:java}
> apiVersion: v1
> kind: Pod
> metadata:
>   labels:
>     spark-app-id: my-custom-id
>   annotations:
>     spark-driver-cpu: 1
>     spark-driver-mem: 1
>     spark-executor-cpu: 1
>     spark-executor-mem: 1
>     spark-executor-count: 1
> spec:
>   schedulerName: spark-scheduler{code}
>  Executor Template
>  
> {code:java}
> apiVersion: v1
> kind: Pod
> metadata:
>   labels:
>     spark-app-id: my-custom-id
> spec:
>   schedulerName: spark-scheduler{code}
> Kubernetes Pods Launched - Two Executor Pods were launched which was default
> {code:java}
> spark-pi-e608e7718f11cc69-driver   1/1     Running     0          10s
> spark-pi-e608e7718f11cc69-exec-1   1/1     Running     0          5s
> spark-pi-e608e7718f11cc69-exec-2   1/1     Running     0          5s{code}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-31482) spark.kubernetes.driver.podTemplateFile Configuration not used by the job

2020-04-19 Thread Ed Mitchell (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-31482?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17087150#comment-17087150
 ] 

Ed Mitchell commented on SPARK-31482:
-

I'm not sure where the bug is here TBH. I don't see anything in the 
documentation or the Spark code that implies that setting annotations on the 
Driver Pod is a valid substitute for setting "spark.executor.memory", 
"spark.executor.instances", or "spark.executor.cores" on the Spark Submit 
command.

> spark.kubernetes.driver.podTemplateFile Configuration not used by the job
> -
>
> Key: SPARK-31482
> URL: https://issues.apache.org/jira/browse/SPARK-31482
> Project: Spark
>  Issue Type: Bug
>  Components: Kubernetes
>Affects Versions: 3.0.0
>Reporter: Pradeep Misra
>Priority: Blocker
>
> Spark 3.0 - Running Spark Submit as below and point to a MinKube cluster
> {code:java}
> bin/spark-submit \
>  --master k8s://https://192.168.99.102:8443 \
>  --deploy-mode cluster \
>  --name spark-pi \
>  --class org.apache.spark.examples.SparkPi \
>  --conf spark.kubernetes.driver.podTemplateFile=../driver_1E.template \
>  --conf spark.kubernetes.executor.podTemplateFile=../executor.template \
>  --conf spark.kubernetes.container.image=spark:spark3 \
>  local:///opt/spark/examples/jars/spark-examples_2.12-3.0.0-preview2.jar 1
> {code}
>  
> Spark Binaries - spark-3.0.0-preview2-bin-hadoop2.7.tgz
> Driver Template - 
> {code:java}
> apiVersion: v1
> kind: Pod
> metadata:
>   labels:
>     spark-app-id: my-custom-id
>   annotations:
>     spark-driver-cpu: 1
>     spark-driver-mem: 1
>     spark-executor-cpu: 1
>     spark-executor-mem: 1
>     spark-executor-count: 1
> spec:
>   schedulerName: spark-scheduler{code}
>  Executor Template
>  
> {code:java}
> apiVersion: v1
> kind: Pod
> metadata:
>   labels:
>     spark-app-id: my-custom-id
> spec:
>   schedulerName: spark-scheduler{code}
> Kubernetes Pods Launched - Two Executor Pods were launched which was default
> {code:java}
> spark-pi-e608e7718f11cc69-driver   1/1     Running     0          10s
> spark-pi-e608e7718f11cc69-exec-1   1/1     Running     0          5s
> spark-pi-e608e7718f11cc69-exec-2   1/1     Running     0          5s{code}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-23420) Datasource loading not handling paths with regex chars.

2018-02-16 Thread Mitchell (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23420?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16367529#comment-16367529
 ] 

Mitchell commented on SPARK-23420:
--

Yes, I agree there appears to be no way currently for a user to distinguish a 
path to be treated normally vs. one to be treated as a glob. I think having two 
separate methods for specifying, or an option to specify how it should be 
treated. This probably isn't a common situation to have files/paths with these 
characters in them, but it's possible and should be able to be done.

> Datasource loading not handling paths with regex chars.
> ---
>
> Key: SPARK-23420
> URL: https://issues.apache.org/jira/browse/SPARK-23420
> Project: Spark
>  Issue Type: Bug
>  Components: Input/Output
>Affects Versions: 2.2.1
>Reporter: Mitchell
>Priority: Major
>
> Greetings, during some recent testing I ran across an issue attempting to 
> load files with regex chars like []()* etc. in them. The files are valid in 
> the various storages and the normal hadoop APIs all function properly 
> accessing them.
> When my code is executed, I get the following stack trace.
> 8/02/14 04:52:46 ERROR yarn.ApplicationMaster: User class threw exception: 
> java.io.IOException: Illegal file pattern: Unmatched closing ')' near index 
> 130 
> A_VERY_LONG_DIRECTORY_FOLDER_THAT_INCLUDES_MULTIBYTE_AND_SPECIAL_CHARACTERS_abcdefghijklmnopqrst_0123456789_~@#\$%\^&\(\)-_=\+[(?:]);',\._???_???_???_??
>  ^ java.io.IOException: Illegal file pattern: Unmatched closing ')' near 
> index 130 
> A_VERY_LONG_DIRECTORY_FOLDER_THAT_INCLUDES_MULTIBYTE_AND_SPECIAL_CHARACTERS_abcdefghijklmnopqrst_0123456789_~@#\$%\^&\(\)-_=\+[(?:]);',\._???_???_???_??
>  ^ at org.apache.hadoop.fs.GlobFilter.init(GlobFilter.java:71) at 
> org.apache.hadoop.fs.GlobFilter.(GlobFilter.java:50) at 
> org.apache.hadoop.fs.Globber.doGlob(Globber.java:210) at 
> org.apache.hadoop.fs.Globber.glob(Globber.java:149) at 
> org.apache.hadoop.fs.FileSystem.globStatus(FileSystem.java:1955) at 
> org.apache.hadoop.fs.s3a.S3AFileSystem.globStatus(S3AFileSystem.java:2477) at 
> org.apache.spark.deploy.SparkHadoopUtil.globPath(SparkHadoopUtil.scala:234) 
> at 
> org.apache.spark.deploy.SparkHadoopUtil.globPathIfNecessary(SparkHadoopUtil.scala:244)
>  at 
> org.apache.spark.sql.execution.datasources.DataSource$.org$apache$spark$sql$execution$datasources$DataSource$$checkAndGlobPathIfNecessary(DataSource.scala:618)
>  at 
> org.apache.spark.sql.execution.datasources.DataSource$$anonfun$14.apply(DataSource.scala:350)
>  at 
> org.apache.spark.sql.execution.datasources.DataSource$$anonfun$14.apply(DataSource.scala:350)
>  at 
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
>  at 
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
>  at scala.collection.immutable.List.foreach(List.scala:381) at 
> scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241) at 
> scala.collection.immutable.List.flatMap(List.scala:344) at 
> org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:349)
>  at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:178) at 
> org.apache.spark.sql.DataFrameReader.csv(DataFrameReader.scala:533) at 
> org.apache.spark.sql.DataFrameReader.csv(DataFrameReader.scala:412) at 
> com.sap.profile.SparkProfileTask.main(SparkProfileTask.java:95) at 
> sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>  at java.lang.reflect.Method.invoke(Method.java:498) at 
> org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:635)
>  Caused by: java.util.regex.PatternSyntaxException: Unmatched closing ')' 
> near index 130 
> A_VERY_LONG_DIRECTORY_FOLDER_THAT_INCLUDES_MULTIBYTE_AND_SPECIAL_CHARACTERS_abcdefghijklmnopqrst_0123456789_~@#\$%\^&\(\)-_=\+[(?:]);',\._???_???_???_??
>  ^ at java.util.regex.Pattern.error(Pattern.java:1955) at 
> java.util.regex.Pattern.compile(Pattern.java:1700) at 
> java.util.regex.Pattern.(Pattern.java:1351) at 
> java.util.regex.Pattern.compile(Pattern.java:1054) at 
> org.apache.hadoop.fs.GlobPattern.set(GlobPattern.java:156) at 
> org.apache.hadoop.fs.GlobPattern.(GlobPattern.java:42) at 
> org.apache.hadoop.fs.GlobFilter.init(GlobFilter.java:67) ... 25 more 18/02/14 
> 04:52:46 INFO yarn.ApplicationMaster: Final app status: FAILED, exitCode: 15, 
> (reason: User class threw exception: java.io.IOException: Illegal file 
> pattern: Unmatched closing ')' near index 

[jira] [Commented] (SPARK-23420) Datasource loading not handling paths with regex chars.

2018-02-14 Thread Mitchell (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23420?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16364284#comment-16364284
 ] 

Mitchell commented on SPARK-23420:
--

Sean, I'm a little confused by your response. From what I've seen, the 
datasource API does not correctly handle setting a fully URI encoded path, 
otherwise I would be doing that. As such, I am stuck with an unencoded path 
which in this case obviously has these characters in it. Even for a simple file 
with a space this does not work if passed encoded.

 

file: "/tmp/space file.csv"

Dataset input = sqlContext.read().option("header", "true").option("sep", 
",").option("quote", "\"").option("charset", "utf8").option("escape", 
"\\").csv("hdfs:///tmp/space%20file.csv"); --> File not found

Dataset input = sqlContext.read().option("header", "true").option("sep", 
",").option("quote", "\"").option("charset", "utf8").option("escape", 
"\\").csv("hdfs:///tmp/space file.csv"); -->Works fine

> Datasource loading not handling paths with regex chars.
> ---
>
> Key: SPARK-23420
> URL: https://issues.apache.org/jira/browse/SPARK-23420
> Project: Spark
>  Issue Type: Bug
>  Components: Input/Output
>Affects Versions: 2.2.1
>Reporter: Mitchell
>Priority: Major
>
> Greetings, during some recent testing I ran across an issue attempting to 
> load files with regex chars like []()* etc. in them. The files are valid in 
> the various storages and the normal hadoop APIs all function properly 
> accessing them.
> When my code is executed, I get the following stack trace.
> 8/02/14 04:52:46 ERROR yarn.ApplicationMaster: User class threw exception: 
> java.io.IOException: Illegal file pattern: Unmatched closing ')' near index 
> 130 
> A_VERY_LONG_DIRECTORY_FOLDER_THAT_INCLUDES_MULTIBYTE_AND_SPECIAL_CHARACTERS_abcdefghijklmnopqrst_0123456789_~@#\$%\^&\(\)-_=\+[(?:]);',\._???_???_???_??
>  ^ java.io.IOException: Illegal file pattern: Unmatched closing ')' near 
> index 130 
> A_VERY_LONG_DIRECTORY_FOLDER_THAT_INCLUDES_MULTIBYTE_AND_SPECIAL_CHARACTERS_abcdefghijklmnopqrst_0123456789_~@#\$%\^&\(\)-_=\+[(?:]);',\._???_???_???_??
>  ^ at org.apache.hadoop.fs.GlobFilter.init(GlobFilter.java:71) at 
> org.apache.hadoop.fs.GlobFilter.(GlobFilter.java:50) at 
> org.apache.hadoop.fs.Globber.doGlob(Globber.java:210) at 
> org.apache.hadoop.fs.Globber.glob(Globber.java:149) at 
> org.apache.hadoop.fs.FileSystem.globStatus(FileSystem.java:1955) at 
> org.apache.hadoop.fs.s3a.S3AFileSystem.globStatus(S3AFileSystem.java:2477) at 
> org.apache.spark.deploy.SparkHadoopUtil.globPath(SparkHadoopUtil.scala:234) 
> at 
> org.apache.spark.deploy.SparkHadoopUtil.globPathIfNecessary(SparkHadoopUtil.scala:244)
>  at 
> org.apache.spark.sql.execution.datasources.DataSource$.org$apache$spark$sql$execution$datasources$DataSource$$checkAndGlobPathIfNecessary(DataSource.scala:618)
>  at 
> org.apache.spark.sql.execution.datasources.DataSource$$anonfun$14.apply(DataSource.scala:350)
>  at 
> org.apache.spark.sql.execution.datasources.DataSource$$anonfun$14.apply(DataSource.scala:350)
>  at 
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
>  at 
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
>  at scala.collection.immutable.List.foreach(List.scala:381) at 
> scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241) at 
> scala.collection.immutable.List.flatMap(List.scala:344) at 
> org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:349)
>  at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:178) at 
> org.apache.spark.sql.DataFrameReader.csv(DataFrameReader.scala:533) at 
> org.apache.spark.sql.DataFrameReader.csv(DataFrameReader.scala:412) at 
> com.sap.profile.SparkProfileTask.main(SparkProfileTask.java:95) at 
> sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>  at java.lang.reflect.Method.invoke(Method.java:498) at 
> org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:635)
>  Caused by: java.util.regex.PatternSyntaxException: Unmatched closing ')' 
> near index 130 
> A_VERY_LONG_DIRECTORY_FOLDER_THAT_INCLUDES_MULTIBYTE_AND_SPECIAL_CHARACTERS_abcdefghijklmnopqrst_0123456789_~@#\$%\^&\(\)-_=\+[(?:]);',\._???_???_???_??
>  ^ at java.util.regex.Pattern.error(Pattern.java:1955) at 
> java.util.regex.Pattern.compile(Pattern.java:1700) at 
> java.util.regex.Pattern.(Pattern.java:1351) at 
> 

[jira] [Commented] (SPARK-23420) Datasource loading not handling paths with regex chars.

2018-02-14 Thread Mitchell (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23420?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16364190#comment-16364190
 ] 

Mitchell commented on SPARK-23420:
--

Greetings Marco, thanks for the response. Without pulling master, I don't 
believe that this is fixed in master.

When looking in the source code, the SparkHadoopUtils class attempts to do a 
glob if it contains any of ... {}[]*?\\. Our file has these characters...but 
they are in no way meant to be treated as a glob. It's during the subsequent 
glob that we have our failure. It seems to me that this would not have been 
fixed.

def isGlobPath(pattern: Path): Boolean = {
 pattern.toString.exists("{}[]*?\\".toSet.contains)
 }

Please let me know if there's a specific commit, pull request, or other issue 
that I could look at which might pertain to this.

> Datasource loading not handling paths with regex chars.
> ---
>
> Key: SPARK-23420
> URL: https://issues.apache.org/jira/browse/SPARK-23420
> Project: Spark
>  Issue Type: Bug
>  Components: Input/Output
>Affects Versions: 2.2.1
>Reporter: Mitchell
>Priority: Major
>
> Greetings, during some recent testing I ran across an issue attempting to 
> load files with regex chars like []()* etc. in them. The files are valid in 
> the various storages and the normal hadoop APIs all function properly 
> accessing them.
> When my code is executed, I get the following stack trace.
> 8/02/14 04:52:46 ERROR yarn.ApplicationMaster: User class threw exception: 
> java.io.IOException: Illegal file pattern: Unmatched closing ')' near index 
> 130 
> A_VERY_LONG_DIRECTORY_FOLDER_THAT_INCLUDES_MULTIBYTE_AND_SPECIAL_CHARACTERS_abcdefghijklmnopqrst_0123456789_~@#\$%\^&\(\)-_=\+[(?:]);',\._???_???_???_??
>  ^ java.io.IOException: Illegal file pattern: Unmatched closing ')' near 
> index 130 
> A_VERY_LONG_DIRECTORY_FOLDER_THAT_INCLUDES_MULTIBYTE_AND_SPECIAL_CHARACTERS_abcdefghijklmnopqrst_0123456789_~@#\$%\^&\(\)-_=\+[(?:]);',\._???_???_???_??
>  ^ at org.apache.hadoop.fs.GlobFilter.init(GlobFilter.java:71) at 
> org.apache.hadoop.fs.GlobFilter.(GlobFilter.java:50) at 
> org.apache.hadoop.fs.Globber.doGlob(Globber.java:210) at 
> org.apache.hadoop.fs.Globber.glob(Globber.java:149) at 
> org.apache.hadoop.fs.FileSystem.globStatus(FileSystem.java:1955) at 
> org.apache.hadoop.fs.s3a.S3AFileSystem.globStatus(S3AFileSystem.java:2477) at 
> org.apache.spark.deploy.SparkHadoopUtil.globPath(SparkHadoopUtil.scala:234) 
> at 
> org.apache.spark.deploy.SparkHadoopUtil.globPathIfNecessary(SparkHadoopUtil.scala:244)
>  at 
> org.apache.spark.sql.execution.datasources.DataSource$.org$apache$spark$sql$execution$datasources$DataSource$$checkAndGlobPathIfNecessary(DataSource.scala:618)
>  at 
> org.apache.spark.sql.execution.datasources.DataSource$$anonfun$14.apply(DataSource.scala:350)
>  at 
> org.apache.spark.sql.execution.datasources.DataSource$$anonfun$14.apply(DataSource.scala:350)
>  at 
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
>  at 
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
>  at scala.collection.immutable.List.foreach(List.scala:381) at 
> scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241) at 
> scala.collection.immutable.List.flatMap(List.scala:344) at 
> org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:349)
>  at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:178) at 
> org.apache.spark.sql.DataFrameReader.csv(DataFrameReader.scala:533) at 
> org.apache.spark.sql.DataFrameReader.csv(DataFrameReader.scala:412) at 
> com.sap.profile.SparkProfileTask.main(SparkProfileTask.java:95) at 
> sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>  at java.lang.reflect.Method.invoke(Method.java:498) at 
> org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:635)
>  Caused by: java.util.regex.PatternSyntaxException: Unmatched closing ')' 
> near index 130 
> A_VERY_LONG_DIRECTORY_FOLDER_THAT_INCLUDES_MULTIBYTE_AND_SPECIAL_CHARACTERS_abcdefghijklmnopqrst_0123456789_~@#\$%\^&\(\)-_=\+[(?:]);',\._???_???_???_??
>  ^ at java.util.regex.Pattern.error(Pattern.java:1955) at 
> java.util.regex.Pattern.compile(Pattern.java:1700) at 
> java.util.regex.Pattern.(Pattern.java:1351) at 
> java.util.regex.Pattern.compile(Pattern.java:1054) at 
> org.apache.hadoop.fs.GlobPattern.set(GlobPattern.java:156) at 
> org.apache.hadoop.fs.GlobPattern.(GlobPattern.java:42) at 
> 

[jira] [Created] (SPARK-23420) Datasource loading not handling paths with regex chars.

2018-02-13 Thread Mitchell (JIRA)
Mitchell created SPARK-23420:


 Summary: Datasource loading not handling paths with regex chars.
 Key: SPARK-23420
 URL: https://issues.apache.org/jira/browse/SPARK-23420
 Project: Spark
  Issue Type: Bug
  Components: Input/Output
Affects Versions: 2.2.1
Reporter: Mitchell


Greetings, during some recent testing I ran across an issue attempting to load 
files with regex chars like []()* etc. in them. The files are valid in the 
various storages and the normal hadoop APIs all function properly accessing 
them.

When my code is executed, I get the following stack trace.

8/02/14 04:52:46 ERROR yarn.ApplicationMaster: User class threw exception: 
java.io.IOException: Illegal file pattern: Unmatched closing ')' near index 130 
A_VERY_LONG_DIRECTORY_FOLDER_THAT_INCLUDES_MULTIBYTE_AND_SPECIAL_CHARACTERS_abcdefghijklmnopqrst_0123456789_~@#\$%\^&\(\)-_=\+[(?:]);',\._???_???_???_??
 ^ java.io.IOException: Illegal file pattern: Unmatched closing ')' near index 
130 
A_VERY_LONG_DIRECTORY_FOLDER_THAT_INCLUDES_MULTIBYTE_AND_SPECIAL_CHARACTERS_abcdefghijklmnopqrst_0123456789_~@#\$%\^&\(\)-_=\+[(?:]);',\._???_???_???_??
 ^ at org.apache.hadoop.fs.GlobFilter.init(GlobFilter.java:71) at 
org.apache.hadoop.fs.GlobFilter.(GlobFilter.java:50) at 
org.apache.hadoop.fs.Globber.doGlob(Globber.java:210) at 
org.apache.hadoop.fs.Globber.glob(Globber.java:149) at 
org.apache.hadoop.fs.FileSystem.globStatus(FileSystem.java:1955) at 
org.apache.hadoop.fs.s3a.S3AFileSystem.globStatus(S3AFileSystem.java:2477) at 
org.apache.spark.deploy.SparkHadoopUtil.globPath(SparkHadoopUtil.scala:234) at 
org.apache.spark.deploy.SparkHadoopUtil.globPathIfNecessary(SparkHadoopUtil.scala:244)
 at 
org.apache.spark.sql.execution.datasources.DataSource$.org$apache$spark$sql$execution$datasources$DataSource$$checkAndGlobPathIfNecessary(DataSource.scala:618)
 at 
org.apache.spark.sql.execution.datasources.DataSource$$anonfun$14.apply(DataSource.scala:350)
 at 
org.apache.spark.sql.execution.datasources.DataSource$$anonfun$14.apply(DataSource.scala:350)
 at 
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
 at 
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
 at scala.collection.immutable.List.foreach(List.scala:381) at 
scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241) at 
scala.collection.immutable.List.flatMap(List.scala:344) at 
org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:349)
 at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:178) at 
org.apache.spark.sql.DataFrameReader.csv(DataFrameReader.scala:533) at 
org.apache.spark.sql.DataFrameReader.csv(DataFrameReader.scala:412) at 
com.sap.profile.SparkProfileTask.main(SparkProfileTask.java:95) at 
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:498) at 
org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:635)
 Caused by: java.util.regex.PatternSyntaxException: Unmatched closing ')' near 
index 130 
A_VERY_LONG_DIRECTORY_FOLDER_THAT_INCLUDES_MULTIBYTE_AND_SPECIAL_CHARACTERS_abcdefghijklmnopqrst_0123456789_~@#\$%\^&\(\)-_=\+[(?:]);',\._???_???_???_??
 ^ at java.util.regex.Pattern.error(Pattern.java:1955) at 
java.util.regex.Pattern.compile(Pattern.java:1700) at 
java.util.regex.Pattern.(Pattern.java:1351) at 
java.util.regex.Pattern.compile(Pattern.java:1054) at 
org.apache.hadoop.fs.GlobPattern.set(GlobPattern.java:156) at 
org.apache.hadoop.fs.GlobPattern.(GlobPattern.java:42) at 
org.apache.hadoop.fs.GlobFilter.init(GlobFilter.java:67) ... 25 more 18/02/14 
04:52:46 INFO yarn.ApplicationMaster: Final app status: FAILED, exitCode: 15, 
(reason: User class threw exception: java.io.IOException: Illegal file pattern: 
Unmatched closing ')' near index 130 
A_VERY_LONG_DIRECTORY_FOLDER_THAT_INCLUDES_MULTIBYTE_AND_SPECIAL_CHARACTERS_abcdefghijklmnopqrst_0123456789_~@#\$%\^&\(\)-_=\+[(?:]);',\._???_???_???_??
 ^) 18/02/14 04:52:46 INFO spark.SparkContext: Invoking stop() from shutdown 
hook

 

Code is as follows ...

Dataset input = sqlContext.read().option("header", "true").option("sep", 
",").option("quote", "\"").option("charset", "utf8").option("escape",