Unsubscribe

2022-07-28 Thread Karthik Jayaraman



[ spark-streaming ] - Data Locality issue

2020-02-04 Thread Karthik Srinivas
Hi,

I am using spark 2.3.2, i am facing issues due to data locality, even after
giving spark.locality.wait.rack=200, locality_level is always RACK_LOCAL,
can someone help me with this.

Thank you


Data locality

2020-02-04 Thread Karthik Srinivas
Hi all,

I am using spark 2.3.2, i am facing issues due to data locality, even after
giving spark.locality.wait.rack=200, locality_level is always RACK_LOCAL,
can someone help me with this.

Thank you


Re: [Structured Streaming] Avoiding multiple streaming queries

2018-07-24 Thread Karthik Reddy Vadde
On Thu, Jul 12, 2018 at 10:23 AM Arun Mahadevan  wrote:

> Yes ForeachWriter [1] could be an option If you want to write to different
> sinks. You can put your custom logic to split the data into different sinks.
>
> The drawback here is that you cannot plugin existing sinks like Kafka and
> you need to write the custom logic yourself and you cannot scale the
> partitions for the sinks independently.
>
> [1]
> https://spark.apache.org/docs/2.1.2/api/java/org/apache/spark/sql/ForeachWriter.html
>
> From: chandan prakash 
> Date: Thursday, July 12, 2018 at 2:38 AM
> To: Tathagata Das , "ymaha...@snappydata.io"
> , "priy...@asperasoft.com" ,
> "user @spark" 
> Subject: Re: [Structured Streaming] Avoiding multiple streaming queries
>
> Hi,
> Did anyone of you thought  about writing a custom foreach sink writer
> which can decided which record should go to which sink (based on some
> marker in record, which we can possibly annotate during transformation) and
> then accordingly write to specific sink.
> This will mean that:
> 1. every custom sink writer will have connections to as many sinks as many
> there are types of sink where records can go.
> 2.  every record will be read once in the single query but can be written
> to multiple sinks
>
> Do you guys see any drawback in this approach ?
> One drawback off course there is that sink is supposed to write the
> records as they are but we are inducing some intelligence here in the sink.
> Apart from that any other issues do you see with this approach?
>
> Regards,
> Chandan
>
>
> On Thu, Feb 15, 2018 at 7:41 AM Tathagata Das 
> wrote:
>
>> Of course, you can write to multiple Kafka topics from a single query. If
>> your dataframe that you want to write has a column named "topic" (along
>> with "key", and "value" columns), it will write the contents of a row to
>> the topic in that row. This automatically works. So the only thing you need
>> to figure out is how to generate the value of that column.
>>
>> This is documented -
>> https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html#writing-data-to-kafka
>>
>> Or am i misunderstanding the problem?
>>
>> TD
>>
>>
>>
>>
>> On Tue, Feb 13, 2018 at 10:45 AM, Yogesh Mahajan 
>> wrote:
>>
>>> I had a similar issue and i think that’s where the structured streaming
>>> design lacks.
>>> Seems like Question#2 in your email is a viable workaround for you.
>>>
>>> In my case, I have a custom Sink backed by an efficient in-memory column
>>> store suited for fast ingestion.
>>>
>>> I have a Kafka stream coming from one topic, and I need to classify the
>>> stream based on schema.
>>> For example, a Kafka topic can have three different types of schema
>>> messages and I would like to ingest into the three different column
>>> tables(having different schema) using my custom Sink implementation.
>>>
>>> Right now only(?) option I have is to create three streaming queries
>>> reading the same topic and ingesting to respective column tables using
>>> their Sink implementations.
>>> These three streaming queries create underlying three
>>> IncrementalExecutions and three KafkaSources, and three queries reading the
>>> same data from the same Kafka topic.
>>> Even with CachedKafkaConsumers at partition level, this is not an
>>> efficient way to handle a simple streaming use case.
>>>
>>> One workaround to overcome this limitation is to have same schema for
>>> all the messages in a Kafka partition, unfortunately this is not in our
>>> control and customers cannot change it due to their dependencies on other
>>> subsystems.
>>>
>>> Thanks,
>>> http://www.snappydata.io/blog 
>>>
>>> On Mon, Feb 12, 2018 at 5:54 PM, Priyank Shrivastava <
>>> priy...@asperasoft.com> wrote:
>>>
 I have a structured streaming query which sinks to Kafka.  This query
 has a complex aggregation logic.


 I would like to sink the output DF of this query to
 multiple Kafka topics each partitioned on a different ‘key’ column.  I
 don’t want to have multiple Kafka sinks for each of the
 different Kafka topics because that would mean running multiple streaming
 queries - one for each Kafka topic, especially since my aggregation logic
 is complex.


 Questions:

 1.  Is there a way to output the results of a structured streaming
 query to multiple Kafka topics each with a different key column but without
 having to execute multiple streaming queries?


 2.  If not,  would it be efficient to cascade the multiple queries such
 that the first query does the complex aggregation and writes output
 to Kafka and then the other queries just read the output of the first query
 and write their topics to Kafka thus avoiding doing the complex aggregation
 again?


 Thanks in advance for any help.


 Priyank



>>>
>>
>
> --
> Chandan Prakash
>
>


[Beginner] Kafka 0.11 header support in Spark Structured Streaming

2018-02-27 Thread Karthik Jayaraman
Hi all,

I am using Spark 2.2.1 Structured Streaming to read messages from Kafka. I 
would like to know how to access the Kafka headers programmatically ? Since the 
Kafka message header support is introduced in Kafka 0.11 
(https://issues.apache.org/jira/browse/KAFKA-4208 
<https://issues.apache.org/jira/browse/KAFKA-4208>), is it supported in Spark. 
? If yes, can anyone point me to an example ?

- Karthik 

unsubscribe

2018-01-16 Thread karthik
unsubscribe


Re: [Spark Streaming] Streaming Dynamic Allocation is broken (at least on YARN)

2017-09-08 Thread Karthik Palaniappan
For posterity, I found the root cause and filed a JIRA: 
https://issues.apache.org/jira/browse/SPARK-21960. I plan to open a pull 
request with the minor fix.

From: Karthik Palaniappan
Sent: Friday, September 1, 2017 9:49 AM
To: Akhil Das
Cc: user@spark.apache.org; t...@databricks.com
Subject: Re: [Spark Streaming] Streaming Dynamic Allocation is broken (at least 
on YARN)

Any ideas @Tathagata? I'd be happy to contribute a patch if you can point me in 
the right direction.

From: Karthik Palaniappan <karthik...@hotmail.com>
Sent: Friday, August 25, 2017 9:15 AM
To: Akhil Das
Cc: user@spark.apache.org; t...@databricks.com
Subject: RE: [Spark Streaming] Streaming Dynamic Allocation is broken (at least 
on YARN)

You have to set spark.executor.instances=0 in a streaming application with 
dynamic allocation: 
https://github.com/tdas/spark/blob/master/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManager.scala#L207.
 I originally had it set to a positive value, and explicitly set it to 0 after 
hitting that error.

Setting executor cores > 1 seems like reasonable advice in general, but that 
shouldn’t be my issue here, right?

From: Akhil Das<mailto:ak...@hacked.work>
Sent: Thursday, August 24, 2017 2:34 AM
To: Karthik Palaniappan<mailto:karthik...@hotmail.com>
Cc: user@spark.apache.org<mailto:user@spark.apache.org>; 
t...@databricks.com<mailto:t...@databricks.com>
Subject: Re: [Spark Streaming] Streaming Dynamic Allocation is broken (at least 
on YARN)

Have you tried setting spark.executor.instances=0 to a positive non-zero value? 
Also, since its a streaming application set executor cores > 1.

On Wed, Aug 23, 2017 at 3:38 AM, Karthik Palaniappan 
<karthik...@hotmail.com<mailto:karthik...@hotmail.com>> wrote:

I ran the HdfsWordCount example using this command:

spark-submit run-example \
  --conf spark.streaming.dynamicAllocation.enabled=true \
  --conf spark.executor.instances=0 \
  --conf spark.dynamicAllocation.enabled=false \
  --conf spark.master=yarn \
  --conf spark.submit.deployMode=client \
  org.apache.spark.examples.streaming.HdfsWordCount /foo

I tried it on both Spark 2.1.1 (through HDP 2.6) and Spark 2.2.0 (through 
Google Dataproc 1.2), and I get the same message repeatedly that Spark cannot 
allocate any executors.

17/08/22 19:34:57 INFO org.spark_project.jetty.util.log: Logging initialized 
@1694ms
17/08/22 19:34:57 INFO org.spark_project.jetty.server.Server: 
jetty-9.3.z-SNAPSHOT
17/08/22 19:34:57 INFO org.spark_project.jetty.server.Server: Started @1756ms
17/08/22 19:34:57 INFO org.spark_project.jetty.server.AbstractConnector: 
Started 
ServerConnector@578782d6{HTTP/1.1,[http/1.1]}{0.0.0.0:4040<http://0.0.0.0:4040>}
17/08/22 19:34:58 INFO 
com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase: GHFS version: 
1.6.1-hadoop2
17/08/22 19:34:58 INFO org.apache.hadoop.yarn.client.RMProxy: Connecting to 
ResourceManager at hadoop-m/10.240.1.92:8032<http://10.240.1.92:8032>
17/08/22 19:35:00 INFO org.apache.hadoop.yarn.client.api.impl.YarnClientImpl: 
Submitted application application_1503036971561_0022
17/08/22 19:35:04 WARN org.apache.spark.streaming.StreamingContext: Dynamic 
Allocation is enabled for this application. Enabling Dynamic allocation for 
Spark Streaming applications can cause data loss if Write Ahead Log is not 
enabled for non-replayable sources like Flume. See the programming guide for 
details on how to enable the Write Ahead Log.
17/08/22 19:35:21 WARN org.apache.spark.scheduler.cluster.YarnScheduler: 
Initial job has not accepted any resources; check your cluster UI to ensure 
that workers are registered and have sufficient resources
17/08/22 19:35:36 WARN org.apache.spark.scheduler.cluster.YarnScheduler: 
Initial job has not accepted any resources; check your cluster UI to ensure 
that workers are registered and have sufficient resources
17/08/22 19:35:51 WARN org.apache.spark.scheduler.cluster.YarnScheduler: 
Initial job has not accepted any resources; check your cluster UI to ensure 
that workers are registered and have sufficient resources

I confirmed that the YARN cluster has enough memory for dozens of executors, 
and verified that the application allocates executors when using Core's 
spark.dynamicAllocation.enabled=true, and leaving 
spark.streaming.dynamicAllocation.enabled=false.

Is streaming dynamic allocation actually supported? Sean Owen suggested it 
might have been experimental: https://issues.apache.org/jira/browse/SPARK-21792.



--
Cheers!




Re: [Spark Streaming] Streaming Dynamic Allocation is broken (at least on YARN)

2017-09-01 Thread Karthik Palaniappan
Any ideas @Tathagata? I'd be happy to contribute a patch if you can point me in 
the right direction.

From: Karthik Palaniappan <karthik...@hotmail.com>
Sent: Friday, August 25, 2017 9:15 AM
To: Akhil Das
Cc: user@spark.apache.org; t...@databricks.com
Subject: RE: [Spark Streaming] Streaming Dynamic Allocation is broken (at least 
on YARN)

You have to set spark.executor.instances=0 in a streaming application with 
dynamic allocation: 
https://github.com/tdas/spark/blob/master/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManager.scala#L207.
 I originally had it set to a positive value, and explicitly set it to 0 after 
hitting that error.

Setting executor cores > 1 seems like reasonable advice in general, but that 
shouldn’t be my issue here, right?

From: Akhil Das<mailto:ak...@hacked.work>
Sent: Thursday, August 24, 2017 2:34 AM
To: Karthik Palaniappan<mailto:karthik...@hotmail.com>
Cc: user@spark.apache.org<mailto:user@spark.apache.org>; 
t...@databricks.com<mailto:t...@databricks.com>
Subject: Re: [Spark Streaming] Streaming Dynamic Allocation is broken (at least 
on YARN)

Have you tried setting spark.executor.instances=0 to a positive non-zero value? 
Also, since its a streaming application set executor cores > 1.

On Wed, Aug 23, 2017 at 3:38 AM, Karthik Palaniappan 
<karthik...@hotmail.com<mailto:karthik...@hotmail.com>> wrote:

I ran the HdfsWordCount example using this command:

spark-submit run-example \
  --conf spark.streaming.dynamicAllocation.enabled=true \
  --conf spark.executor.instances=0 \
  --conf spark.dynamicAllocation.enabled=false \
  --conf spark.master=yarn \
  --conf spark.submit.deployMode=client \
  org.apache.spark.examples.streaming.HdfsWordCount /foo

I tried it on both Spark 2.1.1 (through HDP 2.6) and Spark 2.2.0 (through 
Google Dataproc 1.2), and I get the same message repeatedly that Spark cannot 
allocate any executors.

17/08/22 19:34:57 INFO org.spark_project.jetty.util.log: Logging initialized 
@1694ms
17/08/22 19:34:57 INFO org.spark_project.jetty.server.Server: 
jetty-9.3.z-SNAPSHOT
17/08/22 19:34:57 INFO org.spark_project.jetty.server.Server: Started @1756ms
17/08/22 19:34:57 INFO org.spark_project.jetty.server.AbstractConnector: 
Started 
ServerConnector@578782d6{HTTP/1.1,[http/1.1]}{0.0.0.0:4040<http://0.0.0.0:4040>}
17/08/22 19:34:58 INFO 
com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase: GHFS version: 
1.6.1-hadoop2
17/08/22 19:34:58 INFO org.apache.hadoop.yarn.client.RMProxy: Connecting to 
ResourceManager at hadoop-m/10.240.1.92:8032<http://10.240.1.92:8032>
17/08/22 19:35:00 INFO org.apache.hadoop.yarn.client.api.impl.YarnClientImpl: 
Submitted application application_1503036971561_0022
17/08/22 19:35:04 WARN org.apache.spark.streaming.StreamingContext: Dynamic 
Allocation is enabled for this application. Enabling Dynamic allocation for 
Spark Streaming applications can cause data loss if Write Ahead Log is not 
enabled for non-replayable sources like Flume. See the programming guide for 
details on how to enable the Write Ahead Log.
17/08/22 19:35:21 WARN org.apache.spark.scheduler.cluster.YarnScheduler: 
Initial job has not accepted any resources; check your cluster UI to ensure 
that workers are registered and have sufficient resources
17/08/22 19:35:36 WARN org.apache.spark.scheduler.cluster.YarnScheduler: 
Initial job has not accepted any resources; check your cluster UI to ensure 
that workers are registered and have sufficient resources
17/08/22 19:35:51 WARN org.apache.spark.scheduler.cluster.YarnScheduler: 
Initial job has not accepted any resources; check your cluster UI to ensure 
that workers are registered and have sufficient resources

I confirmed that the YARN cluster has enough memory for dozens of executors, 
and verified that the application allocates executors when using Core's 
spark.dynamicAllocation.enabled=true, and leaving 
spark.streaming.dynamicAllocation.enabled=false.

Is streaming dynamic allocation actually supported? Sean Owen suggested it 
might have been experimental: https://issues.apache.org/jira/browse/SPARK-21792.



--
Cheers!




Re: [Streaming][Structured Streaming] Understanding dynamic allocation in streaming jobs

2017-08-25 Thread Karthik Palaniappan
I definitely agree that dynamic allocation is useful, that's why I asked the 
question :p


More specifically, does spark plan to solve the problems with DRA for 
structured streaming mentioned in that Cloudera article?


If folks can give me pointers on where to start, I'd be happy to implement 
something similar to what spark streaming did.


From: cbowden 
Sent: Thursday, August 24, 2017 7:01 PM
To: user@spark.apache.org
Subject: Re: [Streaming][Structured Streaming] Understanding dynamic allocation 
in streaming jobs

You can leverage dynamic resource allocation with structured streaming.
Certainly there's an argument trivial jobs won't benefit. Certainly there's
an argument important jobs should have fixed resources for stable end to end
latency.

Few scenarios come to mind with benefits:
- I want my application to automatically leverage more resources if my
environment changes, eg. kafka topic partitions were increased at runtime
- I am not building a toy application and my driver is managing many
streaming queries with fair scheduling enabled where not every streaming
query has strict latency requirements
- My source's underlying rdd representing the dataframe provided by getbatch
is volatile, eg. #partitions batch to batch






--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Streaming-Structured-Streaming-Understanding-dynamic-allocation-in-streaming-jobs-tp29091p29104.html
Apache Spark User List - [Streaming][Structured Streaming] Understanding 
dynamic allocation in streaming 
jobs
apache-spark-user-list.1001560.n3.nabble.com
[Streaming][Structured Streaming] Understanding dynamic allocation in streaming 
jobs. I'm trying to understand dynamic allocation in Spark Streaming and 
Structured Streaming. It seems if you...



Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



RE: [Spark Streaming] Streaming Dynamic Allocation is broken (at least on YARN)

2017-08-25 Thread Karthik Palaniappan
You have to set spark.executor.instances=0 in a streaming application with 
dynamic allocation: 
https://github.com/tdas/spark/blob/master/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManager.scala#L207.
 I originally had it set to a positive value, and explicitly set it to 0 after 
hitting that error.

Setting executor cores > 1 seems like reasonable advice in general, but that 
shouldn’t be my issue here, right?

From: Akhil Das<mailto:ak...@hacked.work>
Sent: Thursday, August 24, 2017 2:34 AM
To: Karthik Palaniappan<mailto:karthik...@hotmail.com>
Cc: user@spark.apache.org<mailto:user@spark.apache.org>; 
t...@databricks.com<mailto:t...@databricks.com>
Subject: Re: [Spark Streaming] Streaming Dynamic Allocation is broken (at least 
on YARN)

Have you tried setting spark.executor.instances=0 to a positive non-zero value? 
Also, since its a streaming application set executor cores > 1.

On Wed, Aug 23, 2017 at 3:38 AM, Karthik Palaniappan 
<karthik...@hotmail.com<mailto:karthik...@hotmail.com>> wrote:

I ran the HdfsWordCount example using this command:

spark-submit run-example \
  --conf spark.streaming.dynamicAllocation.enabled=true \
  --conf spark.executor.instances=0 \
  --conf spark.dynamicAllocation.enabled=false \
  --conf spark.master=yarn \
  --conf spark.submit.deployMode=client \
  org.apache.spark.examples.streaming.HdfsWordCount /foo

I tried it on both Spark 2.1.1 (through HDP 2.6) and Spark 2.2.0 (through 
Google Dataproc 1.2), and I get the same message repeatedly that Spark cannot 
allocate any executors.

17/08/22 19:34:57 INFO org.spark_project.jetty.util.log: Logging initialized 
@1694ms
17/08/22 19:34:57 INFO org.spark_project.jetty.server.Server: 
jetty-9.3.z-SNAPSHOT
17/08/22 19:34:57 INFO org.spark_project.jetty.server.Server: Started @1756ms
17/08/22 19:34:57 INFO org.spark_project.jetty.server.AbstractConnector: 
Started 
ServerConnector@578782d6{HTTP/1.1,[http/1.1]}{0.0.0.0:4040<http://0.0.0.0:4040>}
17/08/22 19:34:58 INFO 
com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase: GHFS version: 
1.6.1-hadoop2
17/08/22 19:34:58 INFO org.apache.hadoop.yarn.client.RMProxy: Connecting to 
ResourceManager at hadoop-m/10.240.1.92:8032<http://10.240.1.92:8032>
17/08/22 19:35:00 INFO org.apache.hadoop.yarn.client.api.impl.YarnClientImpl: 
Submitted application application_1503036971561_0022
17/08/22 19:35:04 WARN org.apache.spark.streaming.StreamingContext: Dynamic 
Allocation is enabled for this application. Enabling Dynamic allocation for 
Spark Streaming applications can cause data loss if Write Ahead Log is not 
enabled for non-replayable sources like Flume. See the programming guide for 
details on how to enable the Write Ahead Log.
17/08/22 19:35:21 WARN org.apache.spark.scheduler.cluster.YarnScheduler: 
Initial job has not accepted any resources; check your cluster UI to ensure 
that workers are registered and have sufficient resources
17/08/22 19:35:36 WARN org.apache.spark.scheduler.cluster.YarnScheduler: 
Initial job has not accepted any resources; check your cluster UI to ensure 
that workers are registered and have sufficient resources
17/08/22 19:35:51 WARN org.apache.spark.scheduler.cluster.YarnScheduler: 
Initial job has not accepted any resources; check your cluster UI to ensure 
that workers are registered and have sufficient resources

I confirmed that the YARN cluster has enough memory for dozens of executors, 
and verified that the application allocates executors when using Core's 
spark.dynamicAllocation.enabled=true, and leaving 
spark.streaming.dynamicAllocation.enabled=false.

Is streaming dynamic allocation actually supported? Sean Owen suggested it 
might have been experimental: https://issues.apache.org/jira/browse/SPARK-21792.



--
Cheers!




[Spark Streaming] Streaming Dynamic Allocation is broken (at least on YARN)

2017-08-22 Thread Karthik Palaniappan
I ran the HdfsWordCount example using this command:

spark-submit run-example \
  --conf spark.streaming.dynamicAllocation.enabled=true \
  --conf spark.executor.instances=0 \
  --conf spark.dynamicAllocation.enabled=false \
  --conf spark.master=yarn \
  --conf spark.submit.deployMode=client \
  org.apache.spark.examples.streaming.HdfsWordCount /foo

I tried it on both Spark 2.1.1 (through HDP 2.6) and Spark 2.2.0 (through 
Google Dataproc 1.2), and I get the same message repeatedly that Spark cannot 
allocate any executors.

17/08/22 19:34:57 INFO org.spark_project.jetty.util.log: Logging initialized 
@1694ms
17/08/22 19:34:57 INFO org.spark_project.jetty.server.Server: 
jetty-9.3.z-SNAPSHOT
17/08/22 19:34:57 INFO org.spark_project.jetty.server.Server: Started @1756ms
17/08/22 19:34:57 INFO org.spark_project.jetty.server.AbstractConnector: 
Started ServerConnector@578782d6{HTTP/1.1,[http/1.1]}{0.0.0.0:4040}
17/08/22 19:34:58 INFO 
com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase: GHFS version: 
1.6.1-hadoop2
17/08/22 19:34:58 INFO org.apache.hadoop.yarn.client.RMProxy: Connecting to 
ResourceManager at hadoop-m/10.240.1.92:8032
17/08/22 19:35:00 INFO org.apache.hadoop.yarn.client.api.impl.YarnClientImpl: 
Submitted application application_1503036971561_0022
17/08/22 19:35:04 WARN org.apache.spark.streaming.StreamingContext: Dynamic 
Allocation is enabled for this application. Enabling Dynamic allocation for 
Spark Streaming applications can cause data loss if Write Ahead Log is not 
enabled for non-replayable sources like Flume. See the programming guide for 
details on how to enable the Write Ahead Log.
17/08/22 19:35:21 WARN org.apache.spark.scheduler.cluster.YarnScheduler: 
Initial job has not accepted any resources; check your cluster UI to ensure 
that workers are registered and have sufficient resources
17/08/22 19:35:36 WARN org.apache.spark.scheduler.cluster.YarnScheduler: 
Initial job has not accepted any resources; check your cluster UI to ensure 
that workers are registered and have sufficient resources
17/08/22 19:35:51 WARN org.apache.spark.scheduler.cluster.YarnScheduler: 
Initial job has not accepted any resources; check your cluster UI to ensure 
that workers are registered and have sufficient resources

I confirmed that the YARN cluster has enough memory for dozens of executors, 
and verified that the application allocates executors when using Core's 
spark.dynamicAllocation.enabled=true, and leaving 
spark.streaming.dynamicAllocation.enabled=false.

Is streaming dynamic allocation actually supported? Sean Owen suggested it 
might have been experimental: https://issues.apache.org/jira/browse/SPARK-21792.


[Streaming][Structured Streaming] Understanding dynamic allocation in streaming jobs

2017-08-22 Thread Karthik Palaniappan
I'm trying to understand dynamic allocation in Spark Streaming and Structured 
Streaming. It seems if you set spark.dynamicAllocation.enabled=true, both 
frameworks use Core's dynamic allocation algorithm -- request executors if the 
task backlog is a certain size, and remove executors if they idle for a certain 
period of time.


However, as this Cloudera post points out, that algorithm doesn't really make 
sense for streaming: 
https://www.cloudera.com/documentation/enterprise/5-5-x/topics/spark_streaming.html.
 When writing a toy streaming job, I did run into the issue where executors are 
never removed. Cloudera's suggestion of turning off dynamic allocation seems 
unreasonable -- Spark applications should grow/shrink to match their workload.


I see that Spark Streaming has its own (undocumented) configuration for dynamic 
allocation: https://issues.apache.org/jira/browse/SPARK-12133. Is that actually 
a supported feature? Or was that just an experiment? I had trouble getting this 
to work, but I'll follow up in a different thread.


Also, does Structured Streaming have its own dynamic allocation algorithm?


Thanks,

Karthik Palaniappan



io.netty.handler.codec.EncoderException: java.lang.NoSuchMethodError:

2016-11-24 Thread Karthik Shyamsunder
Greetings,

I am using Spark 2.0.2 with Scala 2.11.7 and Hadoop 2.7.3.  When I run
spark-submit local mode, I get a netty exception like the following.  The
code runs fine with Spark 1.6.3, Scala 2.10.x and Hadoop 2.7.3.

6/11/24 08:18:24 ERROR server.TransportRequestHandler: Error sending result
StreamResponse{streamId=/jars/simple-project_2.11-1.0.jar, byteCount=3662,
body=FileSegmentManagedBuffer{file=/home/hdadmin/Examples/
spark/wordcount/target/scala-2.11/simple-project_2.11-1.0.jar,
offset=0, length=3662}} to /10.0.2.15:33926; closing connection
io.netty.handler.codec.EncoderException: java.lang.NoSuchMethodError:
io.netty.channel.DefaultFileRegion.(Ljava/io/File;JJ)V
at
io.netty.handler.codec.MessageToMessageEncoder.write(
MessageToMessageEncoder.java:107)
at
io.netty.channel.AbstractChannelHandlerContext.invokeWrite(
AbstractChannelHandlerContext.java:658)
at
io.netty.channel.AbstractChannelHandlerContext.write(
AbstractChannelHandlerContext.java:716)
at
io.netty.channel.AbstractChannelHandlerContext.write(
AbstractChannelHandlerContext.java:651)
at
io.netty.handler.timeout.IdleStateHandler.write(IdleStateHandler.java:266)
at
io.netty.channel.AbstractChannelHandlerContext.invokeWrite(
AbstractChannelHandlerContext.java:658)
at
io.netty.channel.AbstractChannelHandlerContext.write(
AbstractChannelHandlerContext.java:716)
at
io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(
AbstractChannelHandlerContext.java:706)
at
io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(
AbstractChannelHandlerContext.java:741)
at
io.netty.channel.DefaultChannelPipeline.writeAndFlush(
DefaultChannelPipeline.java:895)
at
io.netty.channel.AbstractChannel.writeAndFlush(AbstractChannel.java:240)
at
org.apache.spark.network.server.TransportRequestHandler.respond(
TransportRequestHandler.java:194)
at
org.apache.spark.network.server.TransportRequestHandler.
processStreamRequest(TransportRequestHandler.java:150)
at
org.apache.spark.network.server.TransportRequestHandler.handle(
TransportRequestHandler.java:111)
at
org.apache.spark.network.server.TransportChannelHandler.channelRead0(
TransportChannelHandler.java:119)
at
org.apache.spark.network.server.TransportChannelHandler.channelRead0(
TransportChannelHandler.java:51)
at
io.netty.channel.SimpleChannelInboundHandler.channelRead(
SimpleChannelInboundHandler.java:105)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(
AbstractChannelHandlerContext.java:333)
at
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(
AbstractChannelHandlerContext.java:319)
at
io.netty.handler.timeout.IdleStateHandler.channelRead(
IdleStateHandler.java:254)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(
AbstractChannelHandlerContext.java:333)
at
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(
AbstractChannelHandlerContext.java:319)
at
io.netty.handler.codec.MessageToMessageDecoder.channelRead(
MessageToMessageDecoder.java:103)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(
AbstractChannelHandlerContext.java:333)
at
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(
AbstractChannelHandlerContext.java:319)
at
org.apache.spark.network.util.TransportFrameDecoder.channelRead(
TransportFrameDecoder.java:85)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(
AbstractChannelHandlerContext.java:333)
at
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(
AbstractChannelHandlerContext.java:319)
at
io.netty.channel.DefaultChannelPipeline.fireChannelRead(
DefaultChannelPipeline.java:787)
at
io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(
AbstractNioByteChannel.java:130)
at
io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
at
io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(
NioEventLoop.java:468)
at
io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
at
io.netty.util.concurrent.SingleThreadEventExecutor$2.
run(SingleThreadEventExecutor.java:116)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.NoSuchMethodError:
io.netty.channel.DefaultFileRegion.(Ljava/io/File;JJ)V
at
org.apache.spark.network.buffer.FileSegmentManagedBuffer.convertToNetty(
FileSegmentManagedBuffer.java:133)
at
org.apache.spark.network.protocol.MessageEncoder.
encode(MessageEncoder.java:54)
at
org.apache.spark.network.protocol.MessageEncoder.
encode(MessageEncoder.java:33)
at
io.netty.handler.codec.MessageToMessageEncoder.write(
MessageToMessageEncoder.java:89)
... 35 more
16/11/24 08:18:24 ERROR client.TransportResponseHandler: Still have 1
requests outstanding when connection from /10.0.2.15:54561 is closed


PLEASE ADVISE.

Sincerely,

Karthik


using matrix as column datatype in SparkSQL Dataframe

2016-08-08 Thread Vadla, Karthik
Hello all,


I'm trying to load set of medical images(dicom) into spark SQL dataframe. Here 
each image is loaded into matrix column of dataframe. I see spark recently 
added MatrixUDT to support this kind of cases, but i don't find a sample for 
using matrix as column in dataframe.

https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/linalg/MatrixUDT.scala

Can anyone help me with this.

Really appreciate your help.

Thanks

Karthik Vadla



Re: What are using Spark for

2016-08-02 Thread Karthik Ramakrishnan
We used Storm for ETL, now currently thinking Spark might be advantageous
since some ML also is coming our way.

- Karthik

On Tue, Aug 2, 2016 at 1:10 PM, Rohit L <rohitfor...@gmail.com> wrote:

> Does anyone use Spark for ETL?
>
> On Tue, Aug 2, 2016 at 1:24 PM, Sonal Goyal <sonalgoy...@gmail.com> wrote:
>
>> Hi Rohit,
>>
>> You can check the powered by spark page for some real usage of Spark.
>>
>> https://cwiki.apache.org/confluence/display/SPARK/Powered+By+Spark
>>
>>
>> On Tuesday, August 2, 2016, Rohit L <rohitfor...@gmail.com> wrote:
>>
>>> Hi Everyone,
>>>
>>>   I want to know the real world uses cases for which Spark is used
>>> and hence can you please share for what purpose you are using Apache Spark
>>> in your project?
>>>
>>> --
>>> Rohit
>>>
>>
>>
>> --
>> Best Regards,
>> Sonal
>> Founder, Nube Technologies <http://www.nubetech.co>
>> Reifier at Strata Hadoop World
>> <https://www.youtube.com/watch?v=eD3LkpPQIgM>
>> Reifier at Spark Summit 2015
>> <https://spark-summit.org/2015/events/real-time-fuzzy-matching-with-spark-and-elastic-search/>
>>
>> <http://in.linkedin.com/in/sonalgoyal>
>>
>>
>>
>>
>


Spark Streaming join

2016-06-02 Thread karthik tunga
Hi,

I have a scenario where I need to join DStream with a RDD. This is to add
some metadata info to incoming events. This is fairly straight forward.

What I also want to do is refresh this metadata RDD on a fixed schedule(or
when  underlying hdfs file changes). I want to "expire" and reload this RDD
every say 10 minutes.

Is this possible ?

Apologies if this has been asked before.

Cheers,
Karthik


Re: issue with spark.driver.maxResultSize parameter in spark 1.3

2015-11-01 Thread karthik kadiyam
Did any one had issue setting spark.driver.maxResultSize value ?

On Friday, October 30, 2015, karthik kadiyam <karthik.kadiyam...@gmail.com>
wrote:

> Hi Shahid,
>
> I played around with spark driver memory too. In the conf file it was set
> to " --driver-memory 20G " first. When i changed the spark driver
> maxResultSize from default to 2g ,i changed the driver memory to 30G and
> tired too. It gave we same error says "bigger than  (1024.0 MB) " .
> spark.driver.maxResultSize
> One other thing i observed is , in one of the tasks the data its trying to
> process is more than 100 MB and that exceutor and task keeps losing
> connection and doing retry. I tried increase the Tasks by repartition from
> 120 to 240 to 480 also. Still i can see in one of my tasks it still is
> trying to process more than 100 mb. Other task hardly process 1 mb to 10 mb
> , some around 20 mbs, some have 0 mbs .
>
> Any idea how can i try to even the data distribution acrosss multiple
> node.
>
> On Fri, Oct 30, 2015 at 12:09 AM, shahid ashraf <sha...@trialx.com
> <javascript:_e(%7B%7D,'cvml','sha...@trialx.com');>> wrote:
>
>> Hi
>> I guess you need to increase spark driver memory as well. But that should
>> be set in conf files
>> Let me know if that resolves
>> On Oct 30, 2015 7:33 AM, "karthik kadiyam" <karthik.kadiyam...@gmail.com
>> <javascript:_e(%7B%7D,'cvml','karthik.kadiyam...@gmail.com');>> wrote:
>>
>>> Hi,
>>>
>>> In spark streaming job i had the following setting
>>>
>>> this.jsc.getConf().set("spark.driver.maxResultSize", “0”);
>>> and i got the error in the job as below
>>>
>>> User class threw exception: Job aborted due to stage failure: Total size
>>> of serialized results of 120 tasks (1082.2 MB) is bigger than
>>> spark.driver.maxResultSize (1024.0 MB)
>>>
>>> Basically i realized that as default value is 1 GB. I changed
>>> the configuration as below.
>>>
>>> this.jsc.getConf().set("spark.driver.maxResultSize", “2g”);
>>>
>>> and when i ran the job it gave the error
>>>
>>> User class threw exception: Job aborted due to stage failure: Total size
>>> of serialized results of 120 tasks (1082.2 MB) is bigger than
>>> spark.driver.maxResultSize (1024.0 MB)
>>>
>>> So, basically the change i made is not been considered in the job. so my
>>> question is
>>>
>>> - "spark.driver.maxResultSize", “2g” is this the right way to change or
>>> any other way to do it.
>>> - Is this a bug in spark 1.3 or something or any one had this issue
>>> before?
>>>
>>>
>


Re: issue with spark.driver.maxResultSize parameter in spark 1.3

2015-10-30 Thread karthik kadiyam
Hi Shahid,

I played around with spark driver memory too. In the conf file it was set
to " --driver-memory 20G " first. When i changed the spark driver
maxResultSize from default to 2g ,i changed the driver memory to 30G and
tired too. It gave we same error says "bigger than
spark.driver.maxResultSize (1024.0 MB) " .

One other thing i observed is , in one of the tasks the data its trying to
process is more than 100 MB and that exceutor and task keeps losing
connection and doing retry. I tried increase the Tasks by repartition from
120 to 240 to 480 also. Still i can see in one of my tasks it still is
trying to process more than 100 mb. Other task hardly process 1 mb to 10 mb
, some around 20 mbs, some have 0 mbs .

Any idea how can i try to even the data distribution acrosss multiple node.


On Fri, Oct 30, 2015 at 12:09 AM, shahid ashraf <sha...@trialx.com> wrote:

> Hi
> I guess you need to increase spark driver memory as well. But that should
> be set in conf files
> Let me know if that resolves
> On Oct 30, 2015 7:33 AM, "karthik kadiyam" <karthik.kadiyam...@gmail.com>
> wrote:
>
>> Hi,
>>
>> In spark streaming job i had the following setting
>>
>> this.jsc.getConf().set("spark.driver.maxResultSize", “0”);
>> and i got the error in the job as below
>>
>> User class threw exception: Job aborted due to stage failure: Total size
>> of serialized results of 120 tasks (1082.2 MB) is bigger than
>> spark.driver.maxResultSize (1024.0 MB)
>>
>> Basically i realized that as default value is 1 GB. I changed
>> the configuration as below.
>>
>> this.jsc.getConf().set("spark.driver.maxResultSize", “2g”);
>>
>> and when i ran the job it gave the error
>>
>> User class threw exception: Job aborted due to stage failure: Total size
>> of serialized results of 120 tasks (1082.2 MB) is bigger than
>> spark.driver.maxResultSize (1024.0 MB)
>>
>> So, basically the change i made is not been considered in the job. so my
>> question is
>>
>> - "spark.driver.maxResultSize", “2g” is this the right way to change or
>> any other way to do it.
>> - Is this a bug in spark 1.3 or something or any one had this issue
>> before?
>>
>>


Issue on spark.driver.maxResultSize

2015-10-29 Thread karthik kadiyam
Hi,

In spark streaming job i had the following setting 

this.jsc.getConf().set("spark.driver.maxResultSize", “0”);
and i got the error in the job as below

User class threw exception: Job aborted due to stage failure: Total size of 
serialized results of 120 tasks (1082.2 MB) is bigger than 
spark.driver.maxResultSize (1024.0 MB) 

Basically i realized that as default value is 1 GB. I changed the configuration 
as below.

this.jsc.getConf().set("spark.driver.maxResultSize", “2g”);

and when i ran the job it gave the error 

User class threw exception: Job aborted due to stage failure: Total size of 
serialized results of 120 tasks (1082.2 MB) is bigger than 
spark.driver.maxResultSize (1024.0 MB) 

So, basically the change i made is not been considered in the job. so my 
question is

- "spark.driver.maxResultSize", “2g” is this the right way to change or any 
other way to do it.
- Is this a bug in spark 1.3 or something or any one had this issue before? 

issue with spark.driver.maxResultSize parameter in spark 1.3

2015-10-29 Thread karthik kadiyam
Hi,

In spark streaming job i had the following setting

this.jsc.getConf().set("spark.driver.maxResultSize", “0”);
and i got the error in the job as below

User class threw exception: Job aborted due to stage failure: Total size of
serialized results of 120 tasks (1082.2 MB) is bigger than
spark.driver.maxResultSize (1024.0 MB)

Basically i realized that as default value is 1 GB. I changed
the configuration as below.

this.jsc.getConf().set("spark.driver.maxResultSize", “2g”);

and when i ran the job it gave the error

User class threw exception: Job aborted due to stage failure: Total size of
serialized results of 120 tasks (1082.2 MB) is bigger than
spark.driver.maxResultSize (1024.0 MB)

So, basically the change i made is not been considered in the job. so my
question is

- "spark.driver.maxResultSize", “2g” is this the right way to change or any
other way to do it.
- Is this a bug in spark 1.3 or something or any one had this issue before?


Re: Querying on multiple Hive stores using Apache Spark

2015-09-24 Thread Karthik
Any ideas or suggestions?

Thanks,
Karthik.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Querying-on-multiple-Hive-stores-using-Apache-Spark-tp24765p24797.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Querying on multiple Hive stores using Apache Spark

2015-09-22 Thread Karthik
I have a spark application which will successfully connect to hive and query
on hive tables using spark engine.

To build this, I just added hive-site.xml to classpath of the application
and spark will read the hive-site.xml to connect to its metastore. This
method was suggested in spark's mailing list.

So far so good. Now I want to connect to two hive stores and I don't think
adding another hive-site.xml to my classpath will be helpful. I referred
quite a few articles and spark mailing lists but could not find anyone doing
this.

Can anyone suggest how I can achieve this?

Thanks.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Querying-on-multiple-Hive-stores-using-Apache-Spark-tp24765.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Incorrect ACL checking for partitioned table in Spark SQL-1.4

2015-06-16 Thread Karthik Subramanian
*Problem Statement:*
While doing query on a partitioned table using Spark SQL (Version 1.4.0),
access denied exception is observed on the partition the user doesn’t belong
to (The user permission is controlled using HDF ACLs). The same works
correctly in hive.

*Usercase:* /To address Multitenancy/

Consider a table containing multiple customers and each customer with
multiple facility. The table is partitioned by customer and facility. The
user belonging to on facility will not have access to other facility. This
is enforced using HDFS ACLs on corresponding directories. When querying on
the table as ‘user1’ belonging to ‘facility1’ and ‘customer1’ on the
particular partition (using ‘where’ clause) only the corresponding directory
access should be verified and not the entire table. 
The above use case works as expected when using HIVE client, version 0.13.1
 1.1.0. 

*The query used:* /select count(*) from customertable where
customer=‘customer1’ and facility=‘facility1’/

*Below is the exception received in Spark-shell:*

org.apache.hadoop.security.AccessControlException: Permission denied:
user=user1, access=READ_EXECUTE,
inode=/data/customertable/customer=customer2/facility=facility2”:root:supergroup:drwxrwx---:group::r-x,group:facility2:rwx
at
org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkAccessAcl(FSPermissionChecker.java:351)
at
org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.check(FSPermissionChecker.java:253)
at
org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:185)
at
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkPermission(FSNamesystem.java:6512)
at
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkPermission(FSNamesystem.java:6494)
at
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkPathAccess(FSNamesystem.java:6419)
at
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getListingInt(FSNamesystem.java:4954)
at
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getListing(FSNamesystem.java:4915)
at
org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getListing(NameNodeRpcServer.java:826)
at
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getListing(ClientNamenodeProtocolServerSideTranslatorPB.java:612)
at
org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
at
org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:619)
at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:962)
at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2039)
at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2035)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:415)
at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1628)
at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2033)

at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
at
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
at
org.apache.hadoop.ipc.RemoteException.instantiateException(RemoteException.java:106)
at
org.apache.hadoop.ipc.RemoteException.unwrapRemoteException(RemoteException.java:73)
at org.apache.hadoop.hdfs.DFSClient.listPaths(DFSClient.java:1971)
at org.apache.hadoop.hdfs.DFSClient.listPaths(DFSClient.java:1952)
at
org.apache.hadoop.hdfs.DistributedFileSystem.listStatusInternal(DistributedFileSystem.java:693)
at
org.apache.hadoop.hdfs.DistributedFileSystem.access$600(DistributedFileSystem.java:105)
at
org.apache.hadoop.hdfs.DistributedFileSystem$15.doCall(DistributedFileSystem.java:755)
at
org.apache.hadoop.hdfs.DistributedFileSystem$15.doCall(DistributedFileSystem.java:751)
at
org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
at
org.apache.hadoop.hdfs.DistributedFileSystem.listStatus(DistributedFileSystem.java:751)
at
org.apache.spark.sql.sources.HadoopFsRelation$FileStatusCache.org$apache$spark$sql$sources$HadoopFsRelation$FileStatusCache$$listLeafFilesAndDirs$1(interfaces.scala:390)
at
org.apache.spark.sql.sources.HadoopFsRelation$FileStatusCache$$anonfun$2$$anonfun$apply$2.apply(interfaces.scala:402)
at
org.apache.spark.sql.sources.HadoopFsRelation$FileStatusCache$$anonfun$2$$anonfun$apply$2.apply(interfaces.scala:402)
at
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)