Re: Core allocation is scattered

2019-07-25 Thread Srikanth Sriram
Hello,

Below is my understanding.

The default configuration parameters which will be considered by the spark
job if these are not configured at the time of submitting job to the
required values.

# - SPARK_EXECUTOR_INSTANCES, Number of workers to start (Default: 2)
# - SPARK_EXECUTOR_CORES, Number of cores for the workers (Default: 1).
# - SPARK_EXECUTOR_MEMORY, Memory per Worker (e.g. 1000M, 2G) (Default: 1G)

SPARK_EXECUTOR_INSTANCES -> indicates the number of workers to be started,
it means for a job maximum this many number of executors it can ask/take
from the cluster resource manager.

SPARK_EXECUTOR_CORES -> indicates the number of cores in each executor, it
means the spark TaskScheduler will ask this many cores to be
allocated/blocked in each of the executor machine.

SPARK_EXECUTOR_MEMORY -> indicates the maximum amount of RAM/MEMORY it
requires in each executor.

All these details are asked by the TastScheduler to the cluster manager (it
may be a spark standalone, yarn, mesos and can be kubernetes supported
starting from spark 2.0) to provide before actually the job execution
starts.

Also, please note that, initial number of executor instances is dependent
on "--num-executors" but when the data is more to be processed and
"spark.dynamicAllocation.enabled" set true, then it will be dynamically add
more executors based on "spark.dynamicAllocation.initialExecutors".

Note: Always "spark.dynamicAllocation.initialExecutors" should be
configured greater than "--num-executors".
spark.dynamicAllocation.initialExecutors
spark.dynamicAllocation.minExecutors Initial number of executors to run if
dynamic allocation is enabled.

If `--num-executors` (or `spark.executor.instances`) is set and larger than
this value, it will be used as the initial number of executors.
spark.executor.memory 1g Amount of memory to use per executor process, in
the same format as JVM memory strings with a size unit suffix ("k", "m",
"g" or "t") (e.g. 512m, 2g).
spark.executor.cores 1 in YARN mode, all the available cores on the worker
in standalone and Mesos coarse-grained modes. The number of cores to use on
each executor. In standalone and Mesos coarse-grained modes, for more
detail, see this description
<http://spark.apache.org/docs/latest/spark-standalone.html#Executors%20Scheduling>
.

On Thu, Jul 25, 2019 at 5:54 PM Amit Sharma  wrote:

> I have cluster with 26 nodes having 16 cores on each. I am running a spark
> job with 20 cores but i did not understand why my application get 1-2 cores
> on couple of machines why not it just run on two nodes like node1=16 cores
> and node 2=4 cores . but cores are allocated like node1=2 node
> =1-node 14=1 like that. Is there any conf property i need to
> change. I know with dynamic allocation we can use below but without dynamic
> allocation is there any?
> --conf "spark.dynamicAllocation.maxExecutors=2"
>
>
> Thanks
> Amit
>


-- 
Regards,
Srikanth Sriram


Task failure to read input files

2018-04-13 Thread Srikanth
Hello,

I'm running Spark job on AWS EMR that reads many lzo files from a S3 bucket
partitioned by date.
Sometimes I see errors in logs similar to

18/04/13 11:53:52 WARN TaskSetManager: Lost task 151177.0 in stage
43.0 (TID 1516123, ip-10-10-2-6.ec2.internal, executor 57):
java.io.IOException: Corrupted uncompressed block
at 
com.hadoop.compression.lzo.LzopInputStream.verifyChecksums(LzopInputStream.java:219)
at 
com.hadoop.compression.lzo.LzopInputStream.getCompressedData(LzopInputStream.java:284)
at 
com.hadoop.compression.lzo.LzopInputStream.decompress(LzopInputStream.java:261)


I don't see the jobs fail. I assume this task succeeded when it is retried.
If the input file is actually corrupted even task retries should fail and
eventually job will fail based on "spark.task.maxFailures" config rt?

Is there way to make Spark/Hadoop lzo library to print the full file name
when such failures happen? So that I can then manually check if the file is
indeed corrupted.

Thanks,
Srikanth


Spark ML DAG Pipelines

2017-09-07 Thread Srikanth Sampath
Hi Spark Experts,

Can someone point me to some examples for non-linear (DAG) ML pipelines.
That would be of great help.
Thanks much in advance
-Srikanth


Spark streaming app leaking memory?

2017-05-16 Thread Srikanth
Hi,

I have a Spark streaming(Spark 2.1.0) app where I see these logs in
executor. Does this point to some memory leak?

17/05/16 15:11:13 WARN Executor: Managed memory leak detected; size =
67108864 bytes, TID = 7752
17/05/16 15:11:13 WARN Executor: Managed memory leak detected; size =
67108864 bytes, TID = 7740
17/05/16 15:11:13 WARN Executor: Managed memory leak detected; size =
67108864 bytes, TID = 7764
17/05/16 15:11:13 WARN Executor: Managed memory leak detected; size =
67108864 bytes, TID = 7728
17/05/16 15:12:02 WARN Executor: 1 block locks were not released by TID = 7783:
[rdd_1_15]
17/05/16 15:12:02 WARN Executor: 1 block locks were not released by TID = 7807:
[rdd_1_39]

I notice that "Managed memory leak" logs are not seen when I use G1GC.


Srikanth


Re: Spark streaming + kafka error with json library

2017-03-30 Thread Srikanth
Thanks for the tip. That worked. When would one use the assembly?

On Wed, Mar 29, 2017 at 7:13 PM, Tathagata Das <tathagata.das1...@gmail.com>
wrote:

> Try depending on "spark-streaming-kafka-0-10_2.11" (not the assembly)
>
> On Wed, Mar 29, 2017 at 9:59 AM, Srikanth <srikanth...@gmail.com> wrote:
>
>> Hello,
>>
>> I'm trying to use "org.json4s" % "json4s-native" library in a spark
>> streaming + kafka direct app.
>> When I use the latest version of the lib I get an error similar to this
>> <https://github.com/json4s/json4s/issues/316>
>> The work around suggest there is to use version 3.2.10. As spark has a
>> hard dependency on this version.
>>
>> I forced this version in SBT with
>> dependencyOverrides += "org.json4s" %% "json4s-native" % "3.2.10"
>>
>> But now it seems to have some conflict with spark-streaming-kafka-0-10-ass
>> embly
>>
>> [error] (*:assembly) deduplicate: different file contents found in the
>> following:
>>
>> [error] C:\Users\stati\.ivy2\cache\org.apache.spark\spark-streaming-
>> kafka-0-10-assembly_2.11\jars\spark-streaming-kafka-0-10-
>> assembly_2.11-2.1.0.jar:scala/util/parsing/combinator/Implic
>> itConversions$$anonfun$flatten2$1.class
>>
>> [error] C:\Users\stati\.ivy2\cache\org.scala-lang.modules\scala-pars
>> er-combinators_2.11\bundles\scala-parser-combinators_2.11-
>> 1.0.4.jar:scala/util/parsing/combinator/ImplicitConversions
>> $$anonfun$flatten2$1.class
>>
>> DependencyTree didn't show spark-streaming-kafka-0-10-assembly pulling
>> json4s-native.
>> Any idea how to resolve this? I'm using spark version 2.1.0
>>
>> Thanks,
>> Srikanth
>>
>
>


Spark streaming + kafka error with json library

2017-03-29 Thread Srikanth
Hello,

I'm trying to use "org.json4s" % "json4s-native" library in a spark
streaming + kafka direct app.
When I use the latest version of the lib I get an error similar to this
<https://github.com/json4s/json4s/issues/316>
The work around suggest there is to use version 3.2.10. As spark has a hard
dependency on this version.

I forced this version in SBT with
dependencyOverrides += "org.json4s" %% "json4s-native" % "3.2.10"

But now it seems to have some conflict with
spark-streaming-kafka-0-10-assembly

[error] (*:assembly) deduplicate: different file contents found in the
following:

[error]
C:\Users\stati\.ivy2\cache\org.apache.spark\spark-streaming-kafka-0-10-assembly_2.11\jars\spark-streaming-kafka-0-10-assembly_2.11-2.1.0.jar:scala/util/parsing/combinator/ImplicitConversions$$anonfun$flatten2$1.class

[error]
C:\Users\stati\.ivy2\cache\org.scala-lang.modules\scala-parser-combinators_2.11\bundles\scala-parser-combinators_2.11-1.0.4.jar:scala/util/parsing/combinator/ImplicitConversions$$anonfun$flatten2$1.class

DependencyTree didn't show spark-streaming-kafka-0-10-assembly pulling
json4s-native.
Any idea how to resolve this? I'm using spark version 2.1.0

Thanks,
Srikanth


Re: Exception in spark streaming + kafka direct app

2017-02-07 Thread Srikanth
This is running in YARN cluster mode. It was restarted automatically and
continued fine.
I was trying to see what went wrong. AFAIK there were no task failure.
Nothing in executor logs. The log I gave is in driver.

After some digging, I did see that there was a rebalance in kafka logs
around this time. So will driver fail and exit in such cases?
I've seen drivers exit after a job has hit max retry attempts. This is
different though rt?

Srikanth


On Tue, Feb 7, 2017 at 5:25 PM, Tathagata Das <tathagata.das1...@gmail.com>
wrote:

> Does restarting after a few minutes solves the problem? Could be a
> transient issue that lasts long enough for spark task-level retries to all
> fail.
>
> On Tue, Feb 7, 2017 at 4:34 PM, Srikanth <srikanth...@gmail.com> wrote:
>
>> Hello,
>>
>> I had a spark streaming app that reads from kafka running for a few hours
>> after which it failed with error
>>
>> *17/02/07 20:04:10 ERROR JobScheduler: Error generating jobs for time 
>> 148649785 ms
>> java.lang.IllegalStateException: No current assignment for partition 
>> mt_event-5
>>  at 
>> org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:251)
>>  at 
>> org.apache.kafka.clients.consumer.internals.SubscriptionState.needOffsetReset(SubscriptionState.java:315)
>>  at 
>> org.apache.kafka.clients.consumer.KafkaConsumer.seekToEnd(KafkaConsumer.java:1170)
>>  at 
>> org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.latestOffsets(DirectKafkaInputDStream.scala:179)
>>  at 
>> org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:196)
>>  at 
>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
>>  at 
>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)*
>>
>> 
>> 
>>
>> 17/02/07 20:04:10 ERROR ApplicationMaster: User class threw exception: 
>> java.lang.IllegalStateException: No current assignment for partition 
>> mt_event-5
>> java.lang.IllegalStateException: No current assignment for partition 
>> mt_event-5
>>
>> 
>> 
>>
>> 17/02/07 20:05:00 WARN JobGenerator: Timed out while stopping the job 
>> generator (timeout = 5)
>>
>>
>> Driver did not recover from this error and failed. The previous batch ran 
>> 5sec back. There are no indications in the logs that some rebalance happened.
>> As per kafka admin, kafka cluster health was good when this happened and no 
>> maintenance was being done.
>>
>> Any idea what could have gone wrong and why this is a fatal error?
>>
>> Regards,
>> Srikanth
>>
>>
>


Exception in spark streaming + kafka direct app

2017-02-07 Thread Srikanth
Hello,

I had a spark streaming app that reads from kafka running for a few hours
after which it failed with error

*17/02/07 20:04:10 ERROR JobScheduler: Error generating jobs for time
148649785 ms
java.lang.IllegalStateException: No current assignment for partition mt_event-5
at 
org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:251)
at 
org.apache.kafka.clients.consumer.internals.SubscriptionState.needOffsetReset(SubscriptionState.java:315)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.seekToEnd(KafkaConsumer.java:1170)
at 
org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.latestOffsets(DirectKafkaInputDStream.scala:179)
at 
org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:196)
at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)*




17/02/07 20:04:10 ERROR ApplicationMaster: User class threw exception:
java.lang.IllegalStateException: No current assignment for partition
mt_event-5
java.lang.IllegalStateException: No current assignment for partition mt_event-5




17/02/07 20:05:00 WARN JobGenerator: Timed out while stopping the job
generator (timeout = 5)


Driver did not recover from this error and failed. The previous batch
ran 5sec back. There are no indications in the logs that some
rebalance happened.
As per kafka admin, kafka cluster health was good when this happened
and no maintenance was being done.

Any idea what could have gone wrong and why this is a fatal error?

Regards,
Srikanth


Re: Spark 2.0 with Kafka 0.10 exception

2016-10-21 Thread Srikanth
Kakfa 0.10.1 release separates poll() from heartbeat. So session.timeout.ms
& max.poll.interval.ms can be set differently.
I'll leave it to you on how to add this to docs!


On Thu, Oct 20, 2016 at 1:41 PM, Cody Koeninger <c...@koeninger.org> wrote:

> Right on, I put in a PR to make a note of that in the docs.
>
> On Thu, Oct 20, 2016 at 12:13 PM, Srikanth <srikanth...@gmail.com> wrote:
> > Yeah, setting those params helped.
> >
> > On Wed, Oct 19, 2016 at 1:32 PM, Cody Koeninger <c...@koeninger.org>
> wrote:
> >>
> >> 60 seconds for a batch is above the default settings in kafka related
> >> to heartbeat timeouts, so that might be related.  Have you tried
> >> tweaking session.timeout.ms, heartbeat.interval.ms, or related
> >> configs?
> >>
> >> On Wed, Oct 19, 2016 at 12:22 PM, Srikanth <srikanth...@gmail.com>
> wrote:
> >> > Bringing this thread back as I'm seeing this exception on a production
> >> > kafka
> >> > cluster.
> >> >
> >> > I have two Spark streaming apps reading the same topic. App1 has batch
> >> > interval 2secs and app2 has 60secs.
> >> > Both apps are running on the same cluster on similar hardware. I see
> >> > this
> >> > exception only in app2 and fairly consistently.
> >> >
> >> > Difference I see between the apps is
> >> > App1
> >> >   spark.streaming.kafka.maxRatePerPartition, 6000
> >> >   batch interval 2 secs
> >> > App2
> >> >   spark.streaming.kafka.maxRatePerPartition, 1
> >> >   batch interval 60 secs
> >> >
> >> > All other kafka/spark related configs are same for both apps.
> >> >   spark.streaming.kafka.consumer.poll.ms = 4096
> >> >   spark.streaming.backpressure.enabled = true
> >> >
> >> > Not sure if pre-fetching or caching is messing things up.
> >> >
> >> > 16/10/19 14:32:04 WARN TaskSetManager: Lost task 2.0 in stage 1780.0
> >> > (TID
> >> > 12541, ip-10-150-20-200.ec2.internal): java.lang.AssertionError:
> >> > assertion
> >> > failed: Failed to get records for spark-executor-
> StreamingEventSplitProd
> >> > mt_event 6 49091480 after polling for 4096
> >> > at scala.Predef$.assert(Predef.scala:170)
> >> > at
> >> >
> >> > org.apache.spark.streaming.kafka010.CachedKafkaConsumer.
> get(CachedKafkaConsumer.scala:74)
> >> > at
> >> >
> >> > org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(
> KafkaRDD.scala:227)
> >> > at
> >> >
> >> > org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(
> KafkaRDD.scala:193)
> >> > at scala.collection.Iterator$$anon$11.next(Iterator.scala:
> 409)
> >> > at scala.collection.Iterator$$anon$11.next(Iterator.scala:
> 409)
> >> >     at scala.collection.Iterator$$anon$21.next(Iterator.scala:
> 838)
> >> >
> >> >
> >> > On Wed, Sep 7, 2016 at 3:55 PM, Cody Koeninger <c...@koeninger.org>
> >> > wrote:
> >> >>
> >> >> That's not what I would have expected to happen with a lower cache
> >> >> setting, but in general disabling the cache isn't something you want
> >> >> to do with the new kafka consumer.
> >> >>
> >> >>
> >> >> As far as the original issue, are you seeing those polling errors
> >> >> intermittently, or consistently?  From your description, it sounds
> >> >> like retry is working correctly.
> >> >>
> >> >>
> >> >> On Wed, Sep 7, 2016 at 2:37 PM, Srikanth <srikanth...@gmail.com>
> wrote:
> >> >> > Setting those two results in below exception.
> >> >> > No.of executors < no.of partitions. Could that be triggering this?
> >> >> >
> >> >> > 16/09/07 15:33:13 ERROR Executor: Exception in task 2.0 in stage
> 2.0
> >> >> > (TID 9)
> >> >> > java.util.ConcurrentModificationException: KafkaConsumer is not
> safe
> >> >> > for
> >> >> > multi-threaded access
> >> >> > at
> >> >> >
> >> >> >
> >> >> > org.apache.kafka.clients.consumer.KafkaConsumer.
> acquire(KafkaConsumer.java:1430)
> >> >> > 

Re: Spark 2.0 with Kafka 0.10 exception

2016-10-20 Thread Srikanth
Yeah, setting those params helped.

On Wed, Oct 19, 2016 at 1:32 PM, Cody Koeninger <c...@koeninger.org> wrote:

> 60 seconds for a batch is above the default settings in kafka related
> to heartbeat timeouts, so that might be related.  Have you tried
> tweaking session.timeout.ms, heartbeat.interval.ms, or related
> configs?
>
> On Wed, Oct 19, 2016 at 12:22 PM, Srikanth <srikanth...@gmail.com> wrote:
> > Bringing this thread back as I'm seeing this exception on a production
> kafka
> > cluster.
> >
> > I have two Spark streaming apps reading the same topic. App1 has batch
> > interval 2secs and app2 has 60secs.
> > Both apps are running on the same cluster on similar hardware. I see this
> > exception only in app2 and fairly consistently.
> >
> > Difference I see between the apps is
> > App1
> >   spark.streaming.kafka.maxRatePerPartition, 6000
> >   batch interval 2 secs
> > App2
> >   spark.streaming.kafka.maxRatePerPartition, 1
> >   batch interval 60 secs
> >
> > All other kafka/spark related configs are same for both apps.
> >   spark.streaming.kafka.consumer.poll.ms = 4096
> >   spark.streaming.backpressure.enabled = true
> >
> > Not sure if pre-fetching or caching is messing things up.
> >
> > 16/10/19 14:32:04 WARN TaskSetManager: Lost task 2.0 in stage 1780.0 (TID
> > 12541, ip-10-150-20-200.ec2.internal): java.lang.AssertionError:
> assertion
> > failed: Failed to get records for spark-executor-StreamingEventSplitProd
> > mt_event 6 49091480 after polling for 4096
> > at scala.Predef$.assert(Predef.scala:170)
> > at
> > org.apache.spark.streaming.kafka010.CachedKafkaConsumer.
> get(CachedKafkaConsumer.scala:74)
> > at
> > org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(
> KafkaRDD.scala:227)
> > at
> > org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(
> KafkaRDD.scala:193)
> > at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
> > at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
> > at scala.collection.Iterator$$anon$21.next(Iterator.scala:838)
> >
> >
> > On Wed, Sep 7, 2016 at 3:55 PM, Cody Koeninger <c...@koeninger.org>
> wrote:
> >>
> >> That's not what I would have expected to happen with a lower cache
> >> setting, but in general disabling the cache isn't something you want
> >> to do with the new kafka consumer.
> >>
> >>
> >> As far as the original issue, are you seeing those polling errors
> >> intermittently, or consistently?  From your description, it sounds
> >> like retry is working correctly.
> >>
> >>
> >> On Wed, Sep 7, 2016 at 2:37 PM, Srikanth <srikanth...@gmail.com> wrote:
> >> > Setting those two results in below exception.
> >> > No.of executors < no.of partitions. Could that be triggering this?
> >> >
> >> > 16/09/07 15:33:13 ERROR Executor: Exception in task 2.0 in stage 2.0
> >> > (TID 9)
> >> > java.util.ConcurrentModificationException: KafkaConsumer is not safe
> for
> >> > multi-threaded access
> >> > at
> >> >
> >> > org.apache.kafka.clients.consumer.KafkaConsumer.
> acquire(KafkaConsumer.java:1430)
> >> > at
> >> >
> >> > org.apache.kafka.clients.consumer.KafkaConsumer.close(
> KafkaConsumer.java:1360)
> >> > at
> >> >
> >> > org.apache.spark.streaming.kafka010.CachedKafkaConsumer$$
> anon$1.removeEldestEntry(CachedKafkaConsumer.scala:128)
> >> > at java.util.LinkedHashMap.afterNodeInsertion(Unknown Source)
> >> > at java.util.HashMap.putVal(Unknown Source)
> >> > at java.util.HashMap.put(Unknown Source)
> >> > at
> >> >
> >> > org.apache.spark.streaming.kafka010.CachedKafkaConsumer$.
> get(CachedKafkaConsumer.scala:158)
> >> > at
> >> >
> >> > org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.(
> KafkaRDD.scala:210)
> >> > at
> >> > org.apache.spark.streaming.kafka010.KafkaRDD.compute(
> KafkaRDD.scala:185)
> >> > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
> >> > at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
> >> > at
> >> > org.apache.spark.rdd.MapPartitionsRDD.compute(
> MapPartitionsRDD.scala:38)
> >> > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
> &g

Re: Spark 2.0 with Kafka 0.10 exception

2016-10-19 Thread Srikanth
Bringing this thread back as I'm seeing this exception on a production
kafka cluster.

I have two Spark streaming apps reading the same topic. App1 has batch
interval 2secs and app2 has 60secs.
Both apps are running on the same cluster on similar hardware. I see this
exception only in app2 and fairly consistently.

Difference I see between the apps is
App1
  spark.streaming.kafka.maxRatePerPartition, 6000
  batch interval 2 secs
App2
  spark.streaming.kafka.maxRatePerPartition, 1
  batch interval 60 secs

All other kafka/spark related configs are same for both apps.
  spark.streaming.kafka.consumer.poll.ms = 4096
  spark.streaming.backpressure.enabled = true

Not sure if pre-fetching or caching is messing things up.

16/10/19 14:32:04 WARN TaskSetManager: Lost task 2.0 in stage 1780.0 (TID
12541, ip-10-150-20-200.ec2.internal): java.lang.AssertionError: assertion
failed: Failed to get records for spark-executor-StreamingEventSplitProd
mt_event 6 49091480 after polling for 4096
at scala.Predef$.assert(Predef.scala:170)
at org.apache.spark.streaming.kafka010.CachedKafkaConsumer.
get(CachedKafkaConsumer.scala:74)
at org.apache.spark.streaming.kafka010.KafkaRDD$
KafkaRDDIterator.next(KafkaRDD.scala:227)
at org.apache.spark.streaming.kafka010.KafkaRDD$
KafkaRDDIterator.next(KafkaRDD.scala:193)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
at scala.collection.Iterator$$anon$21.next(Iterator.scala:838)


On Wed, Sep 7, 2016 at 3:55 PM, Cody Koeninger <c...@koeninger.org> wrote:

> That's not what I would have expected to happen with a lower cache
> setting, but in general disabling the cache isn't something you want
> to do with the new kafka consumer.
>
>
> As far as the original issue, are you seeing those polling errors
> intermittently, or consistently?  From your description, it sounds
> like retry is working correctly.
>
>
> On Wed, Sep 7, 2016 at 2:37 PM, Srikanth <srikanth...@gmail.com> wrote:
> > Setting those two results in below exception.
> > No.of executors < no.of partitions. Could that be triggering this?
> >
> > 16/09/07 15:33:13 ERROR Executor: Exception in task 2.0 in stage 2.0
> (TID 9)
> > java.util.ConcurrentModificationException: KafkaConsumer is not safe for
> > multi-threaded access
> > at
> > org.apache.kafka.clients.consumer.KafkaConsumer.
> acquire(KafkaConsumer.java:1430)
> > at
> > org.apache.kafka.clients.consumer.KafkaConsumer.close(
> KafkaConsumer.java:1360)
> > at
> > org.apache.spark.streaming.kafka010.CachedKafkaConsumer$$
> anon$1.removeEldestEntry(CachedKafkaConsumer.scala:128)
> > at java.util.LinkedHashMap.afterNodeInsertion(Unknown Source)
> > at java.util.HashMap.putVal(Unknown Source)
> > at java.util.HashMap.put(Unknown Source)
> > at
> > org.apache.spark.streaming.kafka010.CachedKafkaConsumer$.
> get(CachedKafkaConsumer.scala:158)
> > at
> > org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.(
> KafkaRDD.scala:210)
> > at org.apache.spark.streaming.kafka010.KafkaRDD.compute(
> KafkaRDD.scala:185)
> > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
> > at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
> > at org.apache.spark.rdd.MapPartitionsRDD.compute(
> MapPartitionsRDD.scala:38)
> > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
> > at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
> > at org.apache.spark.rdd.MapPartitionsRDD.compute(
> MapPartitionsRDD.scala:38)
> > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
> > at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
> > at org.apache.spark.rdd.MapPartitionsRDD.compute(
> MapPartitionsRDD.scala:38)
> > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
> > at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
> > at
> > org.apache.spark.scheduler.ShuffleMapTask.runTask(
> ShuffleMapTask.scala:79)
> > at
> > org.apache.spark.scheduler.ShuffleMapTask.runTask(
> ShuffleMapTask.scala:47)
> > at org.apache.spark.scheduler.Task.run(Task.scala:85)
> > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
> > at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
> > at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
> > at java.lang.Thread.run(Unknown Source)
> >
> >
> > On Wed, Sep 7, 2016 at 3:07 PM, Cody Koeninger <c...@koeninger.org>
> wrote:
> >>
> >> you could try setting
> >>
> >> spark.streaming.kafka.consumer.cache.initialCapac

Re: Spark with S3 DirectOutputCommitter

2016-09-12 Thread Srikanth
Thanks Steve!

We are already using HDFS as an intermediate store. This is for the last
stage of processing which has to put data in S3.
The output is partitioned by 3 fields, like
.../field1=111/field2=999/date=-MM-DD/*
Given that there are 100s for folders and 1000s of subfolder and part
files, rename from _temporary is just not practical in S3.
I guess we have to add another stage with S3Distcp??

Srikanth

On Sun, Sep 11, 2016 at 2:34 PM, Steve Loughran <ste...@hortonworks.com>
wrote:

>
> > On 9 Sep 2016, at 21:54, Srikanth <srikanth...@gmail.com> wrote:
> >
> > Hello,
> >
> > I'm trying to use DirectOutputCommitter for s3a in Spark 2.0. I've tried
> a few configs and none of them seem to work.
> > Output always creates _temporary directory. Rename is killing
> performance.
>
> > I read some notes about DirectOutputcommitter causing problems with
> speculation turned on. Was this option removed entirely?
>
> Spark turns off any committer with the word "direct' in its name if
> speculation==true . Concurrency, see.
>
> even on on-speculative execution, the trouble with the direct options is
> that executor/job failures can leave incomplete/inconsistent work around
> —and the things downstream wouldn't even notice
>
> There's work underway to address things, work which requires a consistent
> metadata store alongside S3 ( HADOOP-13345 : S3Guard).
>
> For now: stay with the file output committer
>
> hadoop.mapreduce.fileoutputcommitter.algorithm.version=2
> hadoop.mapreduce.fileoutputcommitter.cleanup.skipped=true
>
> Even better: use HDFS as the intermediate store for work, only do a bulk
> upload at the end.
>
> >
> >   val spark = SparkSession.builder()
> > .appName("MergeEntities")
> > .config("spark.sql.warehouse.dir",
> mergeConfig.getString("sparkSqlWarehouseDir"))
> > .config("fs.s3a.buffer.dir", "/tmp")
> > .config("spark.hadoop.mapred.output.committer.class",
> classOf[DirectOutputCommitter].getCanonicalName)
> > .config("mapred.output.committer.class",
> classOf[DirectOutputCommitter].getCanonicalName)
> > .config("mapreduce.use.directfileoutputcommitter",
> "true")
> > //.config("spark.sql.sources.outputCommitterClass",
> classOf[DirectOutputCommitter].getCanonicalName)
> > .getOrCreate()
> >
> > Srikanth
>
>


Spark with S3 DirectOutputCommitter

2016-09-09 Thread Srikanth
Hello,

I'm trying to use DirectOutputCommitter for s3a in Spark 2.0. I've tried a
few configs and none of them seem to work.
Output always creates _temporary directory. Rename is killing performance.
I read some notes about DirectOutputcommitter causing problems with
speculation turned on. Was this option removed entirely?

  val spark = SparkSession.builder()
.appName("MergeEntities")
.config("spark.sql.warehouse.dir", mergeConfig.getString("
sparkSqlWarehouseDir"))
.config("fs.s3a.buffer.dir", "/tmp")
.config("spark.hadoop.mapred.output.committer.class",
classOf[DirectOutputCommitter].getCanonicalName)
.config("mapred.output.committer.class",
classOf[DirectOutputCommitter].getCanonicalName)
.config("mapreduce.use.directfileoutputcommitter", "true")
//.config("spark.sql.sources.outputCommitterClass",
classOf[DirectOutputCommitter].getCanonicalName)
.getOrCreate()

Srikanth


"Job duration" and "Processing time" don't match

2016-09-08 Thread Srikanth
Hello,

I was looking at Spark streaming UI and noticed a big difference between
"Processing time" and "Job duration"

[image: Inline image 1]

Processing time/Output Op duration is show as 50s but sum of all job
duration is ~25s.
What is causing this difference? Based on logs I know that the batch
actually took 50s.

[image: Inline image 2]

The job that is taking most of time is
joinRDD.toDS()
   .write.format("com.databricks.spark.csv")
   .mode(SaveMode.Append)
   .options(Map("mode" -> "DROPMALFORMED", "delimiter" -> "\t",
"header" -> "false"))
   .partitionBy("entityId", "regionId", "eventDate")
   .save(outputPath)

Removing SaveMode.Append really speeds things up and also the mismatch
between Job duration and processing time disappears.
I'm not able to explain what is causing this though.

Srikanth


Re: Spark 2.0 with Kafka 0.10 exception

2016-09-07 Thread Srikanth
Yea, disabling cache was not going to be my permanent solution either.
I was going to ask how big an overhead is that?

It happens intermittently and each time it happens retry is successful.

Srikanth

On Wed, Sep 7, 2016 at 3:55 PM, Cody Koeninger <c...@koeninger.org> wrote:

> That's not what I would have expected to happen with a lower cache
> setting, but in general disabling the cache isn't something you want
> to do with the new kafka consumer.
>
>
> As far as the original issue, are you seeing those polling errors
> intermittently, or consistently?  From your description, it sounds
> like retry is working correctly.
>
>
> On Wed, Sep 7, 2016 at 2:37 PM, Srikanth <srikanth...@gmail.com> wrote:
> > Setting those two results in below exception.
> > No.of executors < no.of partitions. Could that be triggering this?
> >
> > 16/09/07 15:33:13 ERROR Executor: Exception in task 2.0 in stage 2.0
> (TID 9)
> > java.util.ConcurrentModificationException: KafkaConsumer is not safe for
> > multi-threaded access
> > at
> > org.apache.kafka.clients.consumer.KafkaConsumer.
> acquire(KafkaConsumer.java:1430)
> > at
> > org.apache.kafka.clients.consumer.KafkaConsumer.close(
> KafkaConsumer.java:1360)
> > at
> > org.apache.spark.streaming.kafka010.CachedKafkaConsumer$$
> anon$1.removeEldestEntry(CachedKafkaConsumer.scala:128)
> > at java.util.LinkedHashMap.afterNodeInsertion(Unknown Source)
> > at java.util.HashMap.putVal(Unknown Source)
> > at java.util.HashMap.put(Unknown Source)
> > at
> > org.apache.spark.streaming.kafka010.CachedKafkaConsumer$.
> get(CachedKafkaConsumer.scala:158)
> > at
> > org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.(
> KafkaRDD.scala:210)
> > at org.apache.spark.streaming.kafka010.KafkaRDD.compute(
> KafkaRDD.scala:185)
> > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
> > at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
> > at org.apache.spark.rdd.MapPartitionsRDD.compute(
> MapPartitionsRDD.scala:38)
> > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
> > at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
> > at org.apache.spark.rdd.MapPartitionsRDD.compute(
> MapPartitionsRDD.scala:38)
> > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
> > at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
> > at org.apache.spark.rdd.MapPartitionsRDD.compute(
> MapPartitionsRDD.scala:38)
> > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
> > at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
> > at
> > org.apache.spark.scheduler.ShuffleMapTask.runTask(
> ShuffleMapTask.scala:79)
> > at
> > org.apache.spark.scheduler.ShuffleMapTask.runTask(
> ShuffleMapTask.scala:47)
> > at org.apache.spark.scheduler.Task.run(Task.scala:85)
> > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
> > at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
> > at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
> > at java.lang.Thread.run(Unknown Source)
> >
> >
> > On Wed, Sep 7, 2016 at 3:07 PM, Cody Koeninger <c...@koeninger.org>
> wrote:
> >>
> >> you could try setting
> >>
> >> spark.streaming.kafka.consumer.cache.initialCapacity
> >>
> >> spark.streaming.kafka.consumer.cache.maxCapacity
> >>
> >> to 1
> >>
> >> On Wed, Sep 7, 2016 at 2:02 PM, Srikanth <srikanth...@gmail.com> wrote:
> >> > I had a look at the executor logs and noticed that this exception
> >> > happens
> >> > only when using the cached consumer.
> >> > Every retry is successful. This is consistent.
> >> > One possibility is that the cached consumer is causing the failure as
> >> > retry
> >> > clears it.
> >> > Is there a way to disable cache and test this?
> >> > Again, kafkacat is running fine on the same node.
> >> >
> >> > 16/09/07 16:00:00 INFO Executor: Running task 1.0 in stage 138.0 (TID
> >> > 7849)
> >> > 16/09/07 16:00:00 INFO Executor: Running task 3.0 in stage 138.0 (TID
> >> > 7851
> >> >
> >> > 16/09/07 16:00:00 INFO KafkaRDD: Computing topic mt_event, partition 2
> >> > offsets 57079162 -> 57090330
> >> > 16/09/07 16:00:00 INFO KafkaRDD: Computing topic mt_event, partition 0
> >> > offsets 57098866 -> 57109957
> >> > 16/09/07 16:00:00 INFO Executor: Finished task 3.0 in stage 138.0 (TID
> &

Re: Spark 2.0 with Kafka 0.10 exception

2016-09-07 Thread Srikanth
Setting those two results in below exception.
No.of executors < no.of partitions. Could that be triggering this?

16/09/07 15:33:13 ERROR Executor: Exception in task 2.0 in stage 2.0 (TID 9)
java.util.ConcurrentModificationException: KafkaConsumer is not safe for
multi-threaded access
at
org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1430)
at
org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:1360)
at
org.apache.spark.streaming.kafka010.CachedKafkaConsumer$$anon$1.removeEldestEntry(CachedKafkaConsumer.scala:128)
at java.util.LinkedHashMap.afterNodeInsertion(Unknown Source)
at java.util.HashMap.putVal(Unknown Source)
at java.util.HashMap.put(Unknown Source)
at
org.apache.spark.streaming.kafka010.CachedKafkaConsumer$.get(CachedKafkaConsumer.scala:158)
at
org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.(KafkaRDD.scala:210)
at org.apache.spark.streaming.kafka010.KafkaRDD.compute(KafkaRDD.scala:185)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
at org.apache.spark.scheduler.Task.run(Task.scala:85)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.lang.Thread.run(Unknown Source)


On Wed, Sep 7, 2016 at 3:07 PM, Cody Koeninger <c...@koeninger.org> wrote:

> you could try setting
>
> spark.streaming.kafka.consumer.cache.initialCapacity
>
> spark.streaming.kafka.consumer.cache.maxCapacity
>
> to 1
>
> On Wed, Sep 7, 2016 at 2:02 PM, Srikanth <srikanth...@gmail.com> wrote:
> > I had a look at the executor logs and noticed that this exception happens
> > only when using the cached consumer.
> > Every retry is successful. This is consistent.
> > One possibility is that the cached consumer is causing the failure as
> retry
> > clears it.
> > Is there a way to disable cache and test this?
> > Again, kafkacat is running fine on the same node.
> >
> > 16/09/07 16:00:00 INFO Executor: Running task 1.0 in stage 138.0 (TID
> 7849)
> > 16/09/07 16:00:00 INFO Executor: Running task 3.0 in stage 138.0 (TID
> 7851
> >
> > 16/09/07 16:00:00 INFO KafkaRDD: Computing topic mt_event, partition 2
> > offsets 57079162 -> 57090330
> > 16/09/07 16:00:00 INFO KafkaRDD: Computing topic mt_event, partition 0
> > offsets 57098866 -> 57109957
> > 16/09/07 16:00:00 INFO Executor: Finished task 3.0 in stage 138.0 (TID
> > 7851). 1030 bytes result sent to driver
> > 16/09/07 16:00:02 ERROR Executor: Exception in task 1.0 in stage 138.0
> (TID
> > 7849)
> > java.lang.AssertionError: assertion failed: Failed to get records for
> > spark-executor-StreamingPixelCount1 mt_event 0 57100069 after polling
> for
> > 2048
> >   at scala.Predef$.assert(Predef.scala:170)
> >   at
> > org.apache.spark.streaming.kafka010.CachedKafkaConsumer.
> get(CachedKafkaConsumer.scala:74)
> >   at
> > org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(
> KafkaRDD.scala:227)
> >   at
> > org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(
> KafkaRDD.scala:193)
> >
> > 16/09/07 16:00:02 INFO CoarseGrainedExecutorBackend: Got assigned task
> 7854
> > 16/09/07 16:00:02 INFO Executor: Running task 1.1 in stage 138.0 (TID
> 7854)
> > 16/09/07 16:00:02 INFO KafkaRDD: Computing topic mt_event, partition 0
> > offsets 57098866 -> 57109957
> > 16/09/07 16:00:02 INFO CachedKafkaConsumer: Initial fetch for
> > spark-executor-StreamingPixelCount1 mt_event 0 57098866
> >
> > 16/09/07 16:00:03 INFO Executor: Finished task 1.1 in stage 138.0 (TID
> > 7854). 1103 bytes result sent to driver
> >
> >
> >
> > On Wed, Aug 24, 2016 at 2:13 PM, Srikanth <srikanth...@gmail.com> wrote:
> >>
> >> Thanks Cody. Setting poll timeout helped.
> >> Our 

Re: Spark 2.0 with Kafka 0.10 exception

2016-09-07 Thread Srikanth
I had a look at the executor logs and noticed that this exception happens
only when using the cached consumer.
Every retry is successful. This is consistent.
One possibility is that the cached consumer is causing the failure as retry
clears it.
Is there a way to disable cache and test this?
Again, kafkacat is running fine on the same node.

16/09/07 16:00:00 INFO Executor: Running task 1.0 in stage 138.0 (TID 7849)
16/09/07 16:00:00 INFO Executor: Running task 3.0 in stage 138.0 (TID 7851

16/09/07 16:00:00 INFO KafkaRDD: Computing topic mt_event, partition 2
offsets 57079162 -> 57090330
16/09/07 16:00:00 INFO KafkaRDD: Computing topic mt_event, partition 0
offsets 57098866 -> 57109957
16/09/07 16:00:00 INFO Executor: Finished task 3.0 in stage 138.0 (TID
7851). 1030 bytes result sent to driver
16/09/07 16:00:02 ERROR Executor: Exception in task 1.0 in stage 138.0
(TID 7849)
java.lang.AssertionError: assertion failed: Failed to get records for
spark-executor-StreamingPixelCount1 mt_event 0 57100069 after polling
for 2048
at scala.Predef$.assert(Predef.scala:170)
at 
org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:74)
at 
org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:227)
at 
org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:193)

16/09/07 16:00:02 INFO CoarseGrainedExecutorBackend: Got assigned task 7854
16/09/07 16:00:02 INFO Executor: Running task 1.1 in stage 138.0 (TID 7854)
16/09/07 16:00:02 INFO KafkaRDD: Computing topic mt_event, partition 0
offsets 57098866 -> 57109957
16/09/07 16:00:02 INFO CachedKafkaConsumer: Initial fetch for
spark-executor-StreamingPixelCount1 mt_event 0 57098866

16/09/07 16:00:03 INFO Executor: Finished task 1.1 in stage 138.0 (TID
7854). 1103 bytes result sent to driver



On Wed, Aug 24, 2016 at 2:13 PM, Srikanth <srikanth...@gmail.com> wrote:

> Thanks Cody. Setting poll timeout helped.
> Our network is fine but brokers are not fully provisioned in test cluster.
> But there isn't enough load to max out on broker capacity.
> Curious that kafkacat running on the same node doesn't have any issues.
>
> Srikanth
>
> On Tue, Aug 23, 2016 at 9:52 PM, Cody Koeninger <c...@koeninger.org>
> wrote:
>
>> You can set that poll timeout higher with
>>
>> spark.streaming.kafka.consumer.poll.ms
>>
>> but half a second is fairly generous.  I'd try to take a look at
>> what's going on with your network or kafka broker during that time.
>>
>> On Tue, Aug 23, 2016 at 4:44 PM, Srikanth <srikanth...@gmail.com> wrote:
>> > Hello,
>> >
>> > I'm getting the below exception when testing Spark 2.0 with Kafka 0.10.
>> >
>> >> 16/08/23 16:31:01 INFO AppInfoParser: Kafka version : 0.10.0.0
>> >> 16/08/23 16:31:01 INFO AppInfoParser: Kafka commitId : b8642491e78c5a13
>> >> 16/08/23 16:31:01 INFO CachedKafkaConsumer: Initial fetch for
>> >> spark-executor-example mt_event 0 15782114
>> >> 16/08/23 16:31:01 INFO AbstractCoordinator: Discovered coordinator
>> >> 10.150.254.161:9233 (id: 2147483646 rack: null) for group
>> >> spark-executor-example.
>> >> 16/08/23 16:31:02 ERROR Executor: Exception in task 0.0 in stage 1.0
>> (TID
>> >> 6)
>> >> java.lang.AssertionError: assertion failed: Failed to get records for
>> >> spark-executor-example mt_event 0 15782114 after polling for 512
>> >> at scala.Predef$.assert(Predef.scala:170)
>> >> at
>> >> org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(
>> CachedKafkaConsumer.scala:74)
>> >> at
>> >> org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterato
>> r.next(KafkaRDD.scala:227)
>> >> at
>> >> org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterato
>> r.next(KafkaRDD.scala:193)
>> >> at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
>> >
>> >
>> > I get this error intermittently. Sometimes a few batches are scheduled
>> and
>> > run fine. Then I get this error.
>> > kafkacat is able to fetch from this topic continuously.
>> >
>> > Full exception is here --
>> > https://gist.github.com/SrikanthTati/c2e95c4ac689cd49aab817e24ec42767
>> >
>> > Srikanth
>>
>
>


Re: Reset auto.offset.reset in Kafka 0.10 integ

2016-09-07 Thread Srikanth
Yes that's right.

I understand this is a data loss. The restart doesn't have to be all that
silent. It requires us to set a flag. I thought auto.offset.reset is that
flag.
But there isn't much I can do at this point given that retention has
cleaned things up.
The app has to start. Let admins address the data loss on the side.

On Wed, Sep 7, 2016 at 12:15 PM, Cody Koeninger <c...@koeninger.org> wrote:

> Just so I'm clear on what's happening...
>
> - you're running a job that auto-commits offsets to kafka.
> - you stop that job for longer than your retention
> - you start that job back up, and it errors because the last committed
> offset is no longer available
> - you think that instead of erroring, the job should silently restart
> based on the value of auto.offset.reset
>
> Is that accurate?
>
>
> On Wed, Sep 7, 2016 at 10:44 AM, Srikanth <srikanth...@gmail.com> wrote:
> > My retention is 1d which isn't terribly low. The problem is every time I
> > restart after retention expiry, I get this exception instead of honoring
> > auto.offset.reset.
> > It isn't a corner case where retention expired after driver created a
> batch.
> > Its easily reproducible and consistent.
> >
> > On Tue, Sep 6, 2016 at 3:34 PM, Cody Koeninger <c...@koeninger.org>
> wrote:
> >>
> >> You don't want auto.offset.reset on executors, you want executors to
> >> do what the driver told them to do.  Otherwise you're going to get
> >> really horrible data inconsistency issues if the executors silently
> >> reset.
> >>
> >> If your retention is so low that retention gets expired in between
> >> when the driver created a batch with a given starting offset, and when
> >> an executor starts to process that batch, you're going to have
> >> problems.
> >>
> >> On Tue, Sep 6, 2016 at 2:30 PM, Srikanth <srikanth...@gmail.com> wrote:
> >> > This isn't a production setup. We kept retention low intentionally.
> >> > My original question was why I got the exception instead of it using
> >> > auto.offset.reset on restart?
> >> >
> >> >
> >> >
> >> >
> >> > On Tue, Sep 6, 2016 at 10:48 AM, Cody Koeninger <c...@koeninger.org>
> >> > wrote:
> >> >>
> >> >> If you leave enable.auto.commit set to true, it will commit offsets
> to
> >> >> kafka, but you will get undefined delivery semantics.
> >> >>
> >> >> If you just want to restart from a fresh state, the easiest thing to
> >> >> do is use a new consumer group name.
> >> >>
> >> >> But if that keeps happening, you should look into why your retention
> >> >> is not sufficient.
> >> >>
> >> >> On Tue, Sep 6, 2016 at 9:39 AM, Srikanth <srikanth...@gmail.com>
> wrote:
> >> >> > You are right. I got confused as its all part of same log when
> >> >> > running
> >> >> > from
> >> >> > IDE.
> >> >> > I was looking for a good guide to read to understand the this
> integ.
> >> >> >
> >> >> > I'm not managing offset on my own. I've not enabled checkpoint for
> my
> >> >> > tests.
> >> >> > I assumed offsets will be stored in kafka by default.
> >> >> >
> >> >> > KafkaUtils.createDirectStream[Array[Byte], Array[Byte]] (
> >> >> > ssc, PreferConsistent, SubscribePattern[Array[Byte],
> >> >> > Array[Byte]](pattern, kafkaParams) )
> >> >> >
> >> >> >* @param offsets: offsets to begin at on initial startup.  If no
> >> >> > offset
> >> >> > is given for a
> >> >> >* TopicPartition, the committed offset (if applicable) or kafka
> >> >> > param
> >> >> >* auto.offset.reset will be used.
> >> >> >
> >> >> > 16/09/02 13:11:11 INFO ConsumerConfig: ConsumerConfig values:
> >> >> > enable.auto.commit = true
> >> >> > auto.offset.reset = latest
> >> >> >
> >> >> > Srikanth
> >> >> >
> >> >> > On Sat, Sep 3, 2016 at 8:59 AM, Cody Koeninger <c...@koeninger.org
> >
> >> >> > wrote:
> >> >> >>
> >> >> >> Seems like you're confused about the purpose of that line of code,
> >> >> >> it
> >> >> >

Re: Reset auto.offset.reset in Kafka 0.10 integ

2016-09-07 Thread Srikanth
My retention is 1d which isn't terribly low. The problem is every time I
restart after retention expiry, I get this exception instead of honoring
auto.offset.reset.
It isn't a corner case where retention expired after driver created a
batch. Its easily reproducible and consistent.

On Tue, Sep 6, 2016 at 3:34 PM, Cody Koeninger <c...@koeninger.org> wrote:

> You don't want auto.offset.reset on executors, you want executors to
> do what the driver told them to do.  Otherwise you're going to get
> really horrible data inconsistency issues if the executors silently
> reset.
>
> If your retention is so low that retention gets expired in between
> when the driver created a batch with a given starting offset, and when
> an executor starts to process that batch, you're going to have
> problems.
>
> On Tue, Sep 6, 2016 at 2:30 PM, Srikanth <srikanth...@gmail.com> wrote:
> > This isn't a production setup. We kept retention low intentionally.
> > My original question was why I got the exception instead of it using
> > auto.offset.reset on restart?
> >
> >
> >
> >
> > On Tue, Sep 6, 2016 at 10:48 AM, Cody Koeninger <c...@koeninger.org>
> wrote:
> >>
> >> If you leave enable.auto.commit set to true, it will commit offsets to
> >> kafka, but you will get undefined delivery semantics.
> >>
> >> If you just want to restart from a fresh state, the easiest thing to
> >> do is use a new consumer group name.
> >>
> >> But if that keeps happening, you should look into why your retention
> >> is not sufficient.
> >>
> >> On Tue, Sep 6, 2016 at 9:39 AM, Srikanth <srikanth...@gmail.com> wrote:
> >> > You are right. I got confused as its all part of same log when running
> >> > from
> >> > IDE.
> >> > I was looking for a good guide to read to understand the this integ.
> >> >
> >> > I'm not managing offset on my own. I've not enabled checkpoint for my
> >> > tests.
> >> > I assumed offsets will be stored in kafka by default.
> >> >
> >> > KafkaUtils.createDirectStream[Array[Byte], Array[Byte]] (
> >> > ssc, PreferConsistent, SubscribePattern[Array[Byte],
> >> > Array[Byte]](pattern, kafkaParams) )
> >> >
> >> >* @param offsets: offsets to begin at on initial startup.  If no
> >> > offset
> >> > is given for a
> >> >* TopicPartition, the committed offset (if applicable) or kafka
> param
> >> >* auto.offset.reset will be used.
> >> >
> >> > 16/09/02 13:11:11 INFO ConsumerConfig: ConsumerConfig values:
> >> > enable.auto.commit = true
> >> > auto.offset.reset = latest
> >> >
> >> > Srikanth
> >> >
> >> > On Sat, Sep 3, 2016 at 8:59 AM, Cody Koeninger <c...@koeninger.org>
> >> > wrote:
> >> >>
> >> >> Seems like you're confused about the purpose of that line of code, it
> >> >> applies to executors, not the driver. The driver is responsible for
> >> >> determining offsets.
> >> >>
> >> >> Where are you storing offsets, in Kafka, checkpoints, or your own
> >> >> store?
> >> >> Auto offset reset won't be used if there are stored offsets.
> >> >>
> >> >>
> >> >> On Sep 2, 2016 14:58, "Srikanth" <srikanth...@gmail.com> wrote:
> >> >>>
> >> >>> Hi,
> >> >>>
> >> >>> Upon restarting my Spark Streaming app it is failing with error
> >> >>>
> >> >>> Exception in thread "main" org.apache.spark.SparkException: Job
> >> >>> aborted
> >> >>> due to stage failure: Task 0 in stage 1.0 failed 1 times, most
> recent
> >> >>> failure: Lost task 0.0 in stage 1.0 (TID 6, localhost):
> >> >>> org.apache.kafka.clients.consumer.OffsetOutOfRangeException:
> Offsets
> >> >>> out of
> >> >>> range with no configured reset policy for partitions:
> >> >>> {mt-event-2=1710706}
> >> >>>
> >> >>> It is correct that the last read offset was deleted by kafka due to
> >> >>> retention period expiry.
> >> >>> I've set auto.offset.reset in my app but it is getting reset here
> >> >>>
> >> >>>
> >> >>> https://github.com/apache/spark/blob/master/external/
> kafka-0-10/src/main/scala/org/apache/spark/streaming/
> kafka010/KafkaUtils.scala#L160
> >> >>>
> >> >>> How to force it to restart in this case (fully aware of potential
> data
> >> >>> loss)?
> >> >>>
> >> >>> Srikanth
> >> >
> >> >
> >
> >
>


Re: Reset auto.offset.reset in Kafka 0.10 integ

2016-09-06 Thread Srikanth
This isn't a production setup. We kept retention low intentionally.
My original question was why I got the exception instead of it using
auto.offset.reset
on restart?




On Tue, Sep 6, 2016 at 10:48 AM, Cody Koeninger <c...@koeninger.org> wrote:

> If you leave enable.auto.commit set to true, it will commit offsets to
> kafka, but you will get undefined delivery semantics.
>
> If you just want to restart from a fresh state, the easiest thing to
> do is use a new consumer group name.
>
> But if that keeps happening, you should look into why your retention
> is not sufficient.
>
> On Tue, Sep 6, 2016 at 9:39 AM, Srikanth <srikanth...@gmail.com> wrote:
> > You are right. I got confused as its all part of same log when running
> from
> > IDE.
> > I was looking for a good guide to read to understand the this integ.
> >
> > I'm not managing offset on my own. I've not enabled checkpoint for my
> tests.
> > I assumed offsets will be stored in kafka by default.
> >
> > KafkaUtils.createDirectStream[Array[Byte], Array[Byte]] (
> > ssc, PreferConsistent, SubscribePattern[Array[Byte],
> > Array[Byte]](pattern, kafkaParams) )
> >
> >* @param offsets: offsets to begin at on initial startup.  If no
> offset
> > is given for a
> >* TopicPartition, the committed offset (if applicable) or kafka param
> >* auto.offset.reset will be used.
> >
> > 16/09/02 13:11:11 INFO ConsumerConfig: ConsumerConfig values:
> > enable.auto.commit = true
> > auto.offset.reset = latest
> >
> > Srikanth
> >
> > On Sat, Sep 3, 2016 at 8:59 AM, Cody Koeninger <c...@koeninger.org>
> wrote:
> >>
> >> Seems like you're confused about the purpose of that line of code, it
> >> applies to executors, not the driver. The driver is responsible for
> >> determining offsets.
> >>
> >> Where are you storing offsets, in Kafka, checkpoints, or your own store?
> >> Auto offset reset won't be used if there are stored offsets.
> >>
> >>
> >> On Sep 2, 2016 14:58, "Srikanth" <srikanth...@gmail.com> wrote:
> >>>
> >>> Hi,
> >>>
> >>> Upon restarting my Spark Streaming app it is failing with error
> >>>
> >>> Exception in thread "main" org.apache.spark.SparkException: Job
> aborted
> >>> due to stage failure: Task 0 in stage 1.0 failed 1 times, most recent
> >>> failure: Lost task 0.0 in stage 1.0 (TID 6, localhost):
> >>> org.apache.kafka.clients.consumer.OffsetOutOfRangeException: Offsets
> out of
> >>> range with no configured reset policy for partitions:
> {mt-event-2=1710706}
> >>>
> >>> It is correct that the last read offset was deleted by kafka due to
> >>> retention period expiry.
> >>> I've set auto.offset.reset in my app but it is getting reset here
> >>>
> >>> https://github.com/apache/spark/blob/master/external/
> kafka-0-10/src/main/scala/org/apache/spark/streaming/
> kafka010/KafkaUtils.scala#L160
> >>>
> >>> How to force it to restart in this case (fully aware of potential data
> >>> loss)?
> >>>
> >>> Srikanth
> >
> >
>


Reset auto.offset.reset in Kafka 0.10 integ

2016-09-02 Thread Srikanth
Hi,

Upon restarting my Spark Streaming app it is failing with error

Exception in thread "main" org.apache.spark.SparkException: Job aborted due
to stage failure: Task 0 in stage 1.0 failed 1 times, most recent failure:
Lost task 0.0 in stage 1.0 (TID 6, localhost):
org.apache.kafka.clients.consumer.OffsetOutOfRangeException: Offsets out of
range with no configured reset policy for partitions: {mt-event-2=1710706}

It is correct that the last read offset was deleted by kafka due to
retention period expiry.
I've set auto.offset.reset in my app but it is getting reset here

https://github.com/apache/spark/blob/master/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaUtils.scala#L160

How to force it to restart in this case (fully aware of potential data
loss)?

Srikanth


Re: Spark 2.0 with Kafka 0.10 exception

2016-08-24 Thread Srikanth
Thanks Cody. Setting poll timeout helped.
Our network is fine but brokers are not fully provisioned in test cluster.
But there isn't enough load to max out on broker capacity.
Curious that kafkacat running on the same node doesn't have any issues.

Srikanth

On Tue, Aug 23, 2016 at 9:52 PM, Cody Koeninger <c...@koeninger.org> wrote:

> You can set that poll timeout higher with
>
> spark.streaming.kafka.consumer.poll.ms
>
> but half a second is fairly generous.  I'd try to take a look at
> what's going on with your network or kafka broker during that time.
>
> On Tue, Aug 23, 2016 at 4:44 PM, Srikanth <srikanth...@gmail.com> wrote:
> > Hello,
> >
> > I'm getting the below exception when testing Spark 2.0 with Kafka 0.10.
> >
> >> 16/08/23 16:31:01 INFO AppInfoParser: Kafka version : 0.10.0.0
> >> 16/08/23 16:31:01 INFO AppInfoParser: Kafka commitId : b8642491e78c5a13
> >> 16/08/23 16:31:01 INFO CachedKafkaConsumer: Initial fetch for
> >> spark-executor-example mt_event 0 15782114
> >> 16/08/23 16:31:01 INFO AbstractCoordinator: Discovered coordinator
> >> 10.150.254.161:9233 (id: 2147483646 rack: null) for group
> >> spark-executor-example.
> >> 16/08/23 16:31:02 ERROR Executor: Exception in task 0.0 in stage 1.0
> (TID
> >> 6)
> >> java.lang.AssertionError: assertion failed: Failed to get records for
> >> spark-executor-example mt_event 0 15782114 after polling for 512
> >> at scala.Predef$.assert(Predef.scala:170)
> >> at
> >> org.apache.spark.streaming.kafka010.CachedKafkaConsumer.
> get(CachedKafkaConsumer.scala:74)
> >> at
> >> org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(
> KafkaRDD.scala:227)
> >> at
> >> org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(
> KafkaRDD.scala:193)
> >> at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
> >
> >
> > I get this error intermittently. Sometimes a few batches are scheduled
> and
> > run fine. Then I get this error.
> > kafkacat is able to fetch from this topic continuously.
> >
> > Full exception is here --
> > https://gist.github.com/SrikanthTati/c2e95c4ac689cd49aab817e24ec42767
> >
> > Srikanth
>


Spark 2.0 with Kafka 0.10 exception

2016-08-23 Thread Srikanth
Hello,

I'm getting the below exception when testing Spark 2.0 with Kafka 0.10.

16/08/23 16:31:01 INFO AppInfoParser: Kafka version : 0.10.0.0
> 16/08/23 16:31:01 INFO AppInfoParser: Kafka commitId : b8642491e78c5a13
> 16/08/23 16:31:01 INFO CachedKafkaConsumer: Initial fetch for
> spark-executor-example mt_event 0 15782114
> 16/08/23 16:31:01 INFO AbstractCoordinator: Discovered coordinator
> 10.150.254.161:9233 (id: 2147483646 rack: null) for group
> spark-executor-example.
> 16/08/23 16:31:02 ERROR Executor: Exception in task 0.0 in stage 1.0 (TID
> 6)
> java.lang.AssertionError: assertion failed: Failed to get records for
> spark-executor-example mt_event 0 15782114 after polling for 512
> at scala.Predef$.assert(Predef.scala:170)
> at
> org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:74)
> at
> org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:227)
> at
> org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:193)
> at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
>

I get this error intermittently. Sometimes a few batches are scheduled and
run fine. Then I get this error.
kafkacat is able to fetch from this topic continuously.

Full exception is here --
https://gist.github.com/SrikanthTati/c2e95c4ac689cd49aab817e24ec42767

Srikanth


Re: Rebalancing when adding kafka partitions

2016-08-16 Thread Srikanth
Yes, SubscribePattern detects new partition. Also, it has a comment saying

Subscribe to all topics matching specified pattern to get dynamically
> assigned partitions.
>  * The pattern matching will be done periodically against topics existing
> at the time of check.
>  * @param pattern pattern to subscribe to
>  * @param kafkaParams Kafka


Who does the new partition discover? Underlying kafka consumer or
spark-streaming-kafka-0-10-assembly??

Srikanth

On Fri, Aug 12, 2016 at 5:15 PM, Cody Koeninger <c...@koeninger.org> wrote:

> Hrrm, that's interesting. Did you try with subscribe pattern, out of
> curiosity?
>
> I haven't tested repartitioning on the  underlying new Kafka consumer, so
> its possible I misunderstood something.
> On Aug 12, 2016 2:47 PM, "Srikanth" <srikanth...@gmail.com> wrote:
>
>> I did try a test with spark 2.0 + spark-streaming-kafka-0-10-assembly.
>> Partition was increased using "bin/kafka-topics.sh --alter" after spark
>> job was started.
>> I don't see messages from new partitions in the DStream.
>>
>> KafkaUtils.createDirectStream[Array[Byte], Array[Byte]] (
>>> ssc, PreferConsistent, Subscribe[Array[Byte],
>>> Array[Byte]](topics, kafkaParams) )
>>> .map(r => (r.key(), r.value()))
>>
>>
>> Also, no.of partitions did not increase too.
>>
>>> dataStream.foreachRDD( (rdd, curTime) => {
>>>      logger.info(s"rdd has ${rdd.getNumPartitions} partitions.")
>>
>>
>> Should I be setting some parameter/config? Is the doc for new integ
>> available?
>>
>> Thanks,
>> Srikanth
>>
>> On Fri, Jul 22, 2016 at 2:15 PM, Cody Koeninger <c...@koeninger.org>
>> wrote:
>>
>>> No, restarting from a checkpoint won't do it, you need to re-define the
>>> stream.
>>>
>>> Here's the jira for the 0.10 integration
>>>
>>> https://issues.apache.org/jira/browse/SPARK-12177
>>>
>>> I haven't gotten docs completed yet, but there are examples at
>>>
>>> https://github.com/koeninger/kafka-exactly-once/tree/kafka-0.10
>>>
>>> On Fri, Jul 22, 2016 at 1:05 PM, Srikanth <srikanth...@gmail.com> wrote:
>>> > In Spark 1.x, if we restart from a checkpoint, will it read from new
>>> > partitions?
>>> >
>>> > If you can, pls point us to some doc/link that talks about Kafka 0.10
>>> integ
>>> > in Spark 2.0.
>>> >
>>> > On Fri, Jul 22, 2016 at 1:33 PM, Cody Koeninger <c...@koeninger.org>
>>> wrote:
>>> >>
>>> >> For the integration for kafka 0.8, you are literally starting a
>>> >> streaming job against a fixed set of topicapartitions,  It will not
>>> >> change throughout the job, so you'll need to restart the spark job if
>>> >> you change kafka partitions.
>>> >>
>>> >> For the integration for kafka 0.10 / spark 2.0, if you use subscribe
>>> >> or subscribepattern, it should pick up new partitions as they are
>>> >> added.
>>> >>
>>> >> On Fri, Jul 22, 2016 at 11:29 AM, Srikanth <srikanth...@gmail.com>
>>> wrote:
>>> >> > Hello,
>>> >> >
>>> >> > I'd like to understand how Spark Streaming(direct) would handle
>>> Kafka
>>> >> > partition addition?
>>> >> > Will a running job be aware of new partitions and read from it?
>>> >> > Since it uses Kafka APIs to query offsets and offsets are handled
>>> >> > internally.
>>> >> >
>>> >> > Srikanth
>>> >
>>> >
>>>
>>
>>


Re: Rebalancing when adding kafka partitions

2016-08-12 Thread Srikanth
I did try a test with spark 2.0 + spark-streaming-kafka-0-10-assembly.
Partition was increased using "bin/kafka-topics.sh --alter" after spark job
was started.
I don't see messages from new partitions in the DStream.

KafkaUtils.createDirectStream[Array[Byte], Array[Byte]] (
> ssc, PreferConsistent, Subscribe[Array[Byte], Array[Byte]](topics,
> kafkaParams) )
> .map(r => (r.key(), r.value()))


Also, no.of partitions did not increase too.

> dataStream.foreachRDD( (rdd, curTime) => {
>  logger.info(s"rdd has ${rdd.getNumPartitions} partitions.")


Should I be setting some parameter/config? Is the doc for new integ
available?

Thanks,
Srikanth

On Fri, Jul 22, 2016 at 2:15 PM, Cody Koeninger <c...@koeninger.org> wrote:

> No, restarting from a checkpoint won't do it, you need to re-define the
> stream.
>
> Here's the jira for the 0.10 integration
>
> https://issues.apache.org/jira/browse/SPARK-12177
>
> I haven't gotten docs completed yet, but there are examples at
>
> https://github.com/koeninger/kafka-exactly-once/tree/kafka-0.10
>
> On Fri, Jul 22, 2016 at 1:05 PM, Srikanth <srikanth...@gmail.com> wrote:
> > In Spark 1.x, if we restart from a checkpoint, will it read from new
> > partitions?
> >
> > If you can, pls point us to some doc/link that talks about Kafka 0.10
> integ
> > in Spark 2.0.
> >
> > On Fri, Jul 22, 2016 at 1:33 PM, Cody Koeninger <c...@koeninger.org>
> wrote:
> >>
> >> For the integration for kafka 0.8, you are literally starting a
> >> streaming job against a fixed set of topicapartitions,  It will not
> >> change throughout the job, so you'll need to restart the spark job if
> >> you change kafka partitions.
> >>
> >> For the integration for kafka 0.10 / spark 2.0, if you use subscribe
> >> or subscribepattern, it should pick up new partitions as they are
> >> added.
> >>
> >> On Fri, Jul 22, 2016 at 11:29 AM, Srikanth <srikanth...@gmail.com>
> wrote:
> >> > Hello,
> >> >
> >> > I'd like to understand how Spark Streaming(direct) would handle Kafka
> >> > partition addition?
> >> > Will a running job be aware of new partitions and read from it?
> >> > Since it uses Kafka APIs to query offsets and offsets are handled
> >> > internally.
> >> >
> >> > Srikanth
> >
> >
>


Re: Rebalancing when adding kafka partitions

2016-07-22 Thread Srikanth
Yeah, that's what I thought. We need to redefine not just restart.
Thanks for the info!

I do see the usage of subscribe[K,V] in your DStreams example.
Looks simple but its not very obvious how it works :-)
I'll watch out for the docs and ScalaDoc.

Srikanth

On Fri, Jul 22, 2016 at 2:15 PM, Cody Koeninger <c...@koeninger.org> wrote:

> No, restarting from a checkpoint won't do it, you need to re-define the
> stream.
>
> Here's the jira for the 0.10 integration
>
> https://issues.apache.org/jira/browse/SPARK-12177
>
> I haven't gotten docs completed yet, but there are examples at
>
> https://github.com/koeninger/kafka-exactly-once/tree/kafka-0.10
>
> On Fri, Jul 22, 2016 at 1:05 PM, Srikanth <srikanth...@gmail.com> wrote:
> > In Spark 1.x, if we restart from a checkpoint, will it read from new
> > partitions?
> >
> > If you can, pls point us to some doc/link that talks about Kafka 0.10
> integ
> > in Spark 2.0.
> >
> > On Fri, Jul 22, 2016 at 1:33 PM, Cody Koeninger <c...@koeninger.org>
> wrote:
> >>
> >> For the integration for kafka 0.8, you are literally starting a
> >> streaming job against a fixed set of topicapartitions,  It will not
> >> change throughout the job, so you'll need to restart the spark job if
> >> you change kafka partitions.
> >>
> >> For the integration for kafka 0.10 / spark 2.0, if you use subscribe
> >> or subscribepattern, it should pick up new partitions as they are
> >> added.
> >>
> >> On Fri, Jul 22, 2016 at 11:29 AM, Srikanth <srikanth...@gmail.com>
> wrote:
> >> > Hello,
> >> >
> >> > I'd like to understand how Spark Streaming(direct) would handle Kafka
> >> > partition addition?
> >> > Will a running job be aware of new partitions and read from it?
> >> > Since it uses Kafka APIs to query offsets and offsets are handled
> >> > internally.
> >> >
> >> > Srikanth
> >
> >
>


Re: Rebalancing when adding kafka partitions

2016-07-22 Thread Srikanth
In Spark 1.x, if we restart from a checkpoint, will it read from new
partitions?

If you can, pls point us to some doc/link that talks about Kafka 0.10 integ
in Spark 2.0.

On Fri, Jul 22, 2016 at 1:33 PM, Cody Koeninger <c...@koeninger.org> wrote:

> For the integration for kafka 0.8, you are literally starting a
> streaming job against a fixed set of topicapartitions,  It will not
> change throughout the job, so you'll need to restart the spark job if
> you change kafka partitions.
>
> For the integration for kafka 0.10 / spark 2.0, if you use subscribe
> or subscribepattern, it should pick up new partitions as they are
> added.
>
> On Fri, Jul 22, 2016 at 11:29 AM, Srikanth <srikanth...@gmail.com> wrote:
> > Hello,
> >
> > I'd like to understand how Spark Streaming(direct) would handle Kafka
> > partition addition?
> > Will a running job be aware of new partitions and read from it?
> > Since it uses Kafka APIs to query offsets and offsets are handled
> > internally.
> >
> > Srikanth
>


Rebalancing when adding kafka partitions

2016-07-22 Thread Srikanth
Hello,

I'd like to understand how Spark Streaming(direct) would handle Kafka
partition addition?
Will a running job be aware of new partitions and read from it?
Since it uses Kafka APIs to query offsets and offsets are handled
internally.

Srikanth


Re: Streaming with broadcast joins

2016-02-20 Thread Srikanth
Sabastian,

*Update:-*  This is not possible. Probably will remain this way for the
foreseeable future.
https://issues.apache.org/jira/browse/SPARK-3863

Srikanth

On Fri, Feb 19, 2016 at 10:20 AM, Sebastian Piu <sebastian@gmail.com>
wrote:

> I don't have the code with me now, and I ended moving everything to RDD in
> the end and using map operations to do some lookups, i.e. instead of
> broadcasting a Dataframe I ended broadcasting a Map
>
>
> On Fri, Feb 19, 2016 at 11:39 AM Srikanth <srikanth...@gmail.com> wrote:
>
>> It didn't fail. It wasn't broadcasting. I just ran the test again and
>> here are the logs.
>> Every batch is reading the metadata file.
>>
>> 16/02/19 06:27:02 INFO HadoopRDD: Input split:
>> file:/shared/data/test-data.txt:0+27
>> 16/02/19 06:27:02 INFO HadoopRDD: Input split:
>> file:/shared/data/test-data.txt:27+28
>>
>> 16/02/19 06:27:40 INFO HadoopRDD: Input split:
>> file:/shared/data/test-data.txt:27+28
>> 16/02/19 06:27:40 INFO HadoopRDD: Input split:
>> file:/shared/data/test-data.txt:0+27
>>
>> If I remember, foreachRDD is executed in the driver's context. Not sure
>> how we'll be able to achieve broadcast in this approach(unless we use SQL
>> broadcast hint again)
>>
>> When you say "it worked before",  was it with an older version of spark?
>> I'm trying this on 1.6.
>> If you still have the streaming job running can you verify in spark UI
>> that broadcast join is being used. Also, if the files are read and
>> broadcasted each batch??
>>
>> Thanks for the help!
>>
>>
>> On Fri, Feb 19, 2016 at 3:49 AM, Sebastian Piu <sebastian@gmail.com>
>> wrote:
>>
>>> I don't see anything obviously wrong on your second approach, I've done
>>> it like that before and it worked. When you say that it didn't work what do
>>> you mean? did it fail? it didnt broadcast?
>>>
>>> On Thu, Feb 18, 2016 at 11:43 PM Srikanth <srikanth...@gmail.com> wrote:
>>>
>>>> Code with SQL broadcast hint. This worked and I was able to see that
>>>> broadcastjoin was performed.
>>>>
>>>> val testDF = sqlContext.read.format("com.databricks.spark.csv")
>>>>
>>>>  .schema(schema).load("file:///shared/data/test-data.txt")
>>>>
>>>> val lines = ssc.socketTextStream("DevNode", )
>>>>
>>>> lines.foreachRDD((rdd, timestamp) => {
>>>>val recordDF = rdd.map(_.split(",")).map(l => Record(l(0).toInt,
>>>> l(1))).toDF()
>>>>val resultDF = recordDF.join(testDF, "Age")
>>>>
>>>>  
>>>> resultDF.write.format("com.databricks.spark.csv").save("file:///shared/data/output/streaming/"+timestamp)
>>>> }
>>>>
>>>> But for every batch this file was read and broadcast was performed.
>>>> Evaluating the entire DAG I guess.
>>>>   16/02/18 12:24:02 INFO HadoopRDD: Input split:
>>>> file:/shared/data/test-data.txt:27+28
>>>> 16/02/18 12:24:02 INFO HadoopRDD: Input split:
>>>> file:/shared/data/test-data.txt:0+27
>>>>
>>>> 16/02/18 12:25:00 INFO HadoopRDD: Input split:
>>>> file:/shared/data/test-data.txt:27+28
>>>> 16/02/18 12:25:00 INFO HadoopRDD: Input split:
>>>> file:/shared/data/test-data.txt:0+27
>>>>
>>>>
>>>> Then I changed code to broadcast the dataframe. This didn't work
>>>> either. Not sure if this is what you meant by broadcasting a dataframe.
>>>>
>>>> val testDF =
>>>> sc.broadcast(sqlContext.read.format("com.databricks.spark.csv")
>>>>
>>>>  .schema(schema).load("file:///shared/data/test-data.txt")
>>>>  )
>>>>
>>>> val lines = ssc.socketTextStream("DevNode", )
>>>>
>>>> lines.foreachRDD((rdd, timestamp) => {
>>>> val recordDF = rdd.map(_.split(",")).map(l => Record(l(0).toInt,
>>>> l(1))).toDF()
>>>> val resultDF = recordDF.join(testDF.value, "Age")
>>>>
>>>>  
>>>> resultDF.write.format("com.databricks.spark.csv").save("file:///shared/data/output/streaming/"+timestamp)
>>>> }
>>>>
>>>>
>>>> On Thu, Feb 18, 2016 at 12:55 PM, Sebastian Piu <
>>>> sebastian@gmail.com> w

Re: Streaming with broadcast joins

2016-02-19 Thread Srikanth
Sure. These may be unrelated.

On Fri, Feb 19, 2016 at 10:39 AM, Jerry Lam <chiling...@gmail.com> wrote:

> Hi guys,
>
> I also encounter broadcast dataframe issue not for steaming jobs but
> regular dataframe join. In my case, the executors died probably due to OOM
> which I don't think it should use that much memory. Anyway, I'm going to
> craft an example and send it here to see if it is a bug or something I've
> misunderstood.
>
> Best Regards,
>
> Jerry
>
> Sent from my iPhone
>
> On 19 Feb, 2016, at 10:20 am, Sebastian Piu <sebastian@gmail.com>
> wrote:
>
> I don't have the code with me now, and I ended moving everything to RDD in
> the end and using map operations to do some lookups, i.e. instead of
> broadcasting a Dataframe I ended broadcasting a Map
>
>
> On Fri, Feb 19, 2016 at 11:39 AM Srikanth <srikanth...@gmail.com> wrote:
>
>> It didn't fail. It wasn't broadcasting. I just ran the test again and
>> here are the logs.
>> Every batch is reading the metadata file.
>>
>> 16/02/19 06:27:02 INFO HadoopRDD: Input split:
>> file:/shared/data/test-data.txt:0+27
>> 16/02/19 06:27:02 INFO HadoopRDD: Input split:
>> file:/shared/data/test-data.txt:27+28
>>
>> 16/02/19 06:27:40 INFO HadoopRDD: Input split:
>> file:/shared/data/test-data.txt:27+28
>> 16/02/19 06:27:40 INFO HadoopRDD: Input split:
>> file:/shared/data/test-data.txt:0+27
>>
>> If I remember, foreachRDD is executed in the driver's context. Not sure
>> how we'll be able to achieve broadcast in this approach(unless we use SQL
>> broadcast hint again)
>>
>> When you say "it worked before",  was it with an older version of spark?
>> I'm trying this on 1.6.
>> If you still have the streaming job running can you verify in spark UI
>> that broadcast join is being used. Also, if the files are read and
>> broadcasted each batch??
>>
>> Thanks for the help!
>>
>>
>> On Fri, Feb 19, 2016 at 3:49 AM, Sebastian Piu <sebastian....@gmail.com>
>> wrote:
>>
>>> I don't see anything obviously wrong on your second approach, I've done
>>> it like that before and it worked. When you say that it didn't work what do
>>> you mean? did it fail? it didnt broadcast?
>>>
>>> On Thu, Feb 18, 2016 at 11:43 PM Srikanth <srikanth...@gmail.com> wrote:
>>>
>>>> Code with SQL broadcast hint. This worked and I was able to see that
>>>> broadcastjoin was performed.
>>>>
>>>> val testDF = sqlContext.read.format("com.databricks.spark.csv")
>>>>
>>>>  .schema(schema).load("file:///shared/data/test-data.txt")
>>>>
>>>> val lines = ssc.socketTextStream("DevNode", )
>>>>
>>>> lines.foreachRDD((rdd, timestamp) => {
>>>>val recordDF = rdd.map(_.split(",")).map(l => Record(l(0).toInt,
>>>> l(1))).toDF()
>>>>val resultDF = recordDF.join(testDF, "Age")
>>>>
>>>>  
>>>> resultDF.write.format("com.databricks.spark.csv").save("file:///shared/data/output/streaming/"+timestamp)
>>>> }
>>>>
>>>> But for every batch this file was read and broadcast was performed.
>>>> Evaluating the entire DAG I guess.
>>>>   16/02/18 12:24:02 INFO HadoopRDD: Input split:
>>>> file:/shared/data/test-data.txt:27+28
>>>> 16/02/18 12:24:02 INFO HadoopRDD: Input split:
>>>> file:/shared/data/test-data.txt:0+27
>>>>
>>>> 16/02/18 12:25:00 INFO HadoopRDD: Input split:
>>>> file:/shared/data/test-data.txt:27+28
>>>> 16/02/18 12:25:00 INFO HadoopRDD: Input split:
>>>> file:/shared/data/test-data.txt:0+27
>>>>
>>>>
>>>> Then I changed code to broadcast the dataframe. This didn't work
>>>> either. Not sure if this is what you meant by broadcasting a dataframe.
>>>>
>>>> val testDF =
>>>> sc.broadcast(sqlContext.read.format("com.databricks.spark.csv")
>>>>
>>>>  .schema(schema).load("file:///shared/data/test-data.txt")
>>>>  )
>>>>
>>>> val lines = ssc.socketTextStream("DevNode", )
>>>>
>>>> lines.foreachRDD((rdd, timestamp) => {
>>>> val recordDF = rdd.map(_.split(",")).map(l => Record(l(0).toInt,
>>>> l(1))).toDF()
>>>> val resultDF = recordDF.join(testDF

Re: Streaming with broadcast joins

2016-02-19 Thread Srikanth
Hmmm..OK.

Srikanth

On Fri, Feb 19, 2016 at 10:20 AM, Sebastian Piu <sebastian@gmail.com>
wrote:

> I don't have the code with me now, and I ended moving everything to RDD in
> the end and using map operations to do some lookups, i.e. instead of
> broadcasting a Dataframe I ended broadcasting a Map
>
>
> On Fri, Feb 19, 2016 at 11:39 AM Srikanth <srikanth...@gmail.com> wrote:
>
>> It didn't fail. It wasn't broadcasting. I just ran the test again and
>> here are the logs.
>> Every batch is reading the metadata file.
>>
>> 16/02/19 06:27:02 INFO HadoopRDD: Input split:
>> file:/shared/data/test-data.txt:0+27
>> 16/02/19 06:27:02 INFO HadoopRDD: Input split:
>> file:/shared/data/test-data.txt:27+28
>>
>> 16/02/19 06:27:40 INFO HadoopRDD: Input split:
>> file:/shared/data/test-data.txt:27+28
>> 16/02/19 06:27:40 INFO HadoopRDD: Input split:
>> file:/shared/data/test-data.txt:0+27
>>
>> If I remember, foreachRDD is executed in the driver's context. Not sure
>> how we'll be able to achieve broadcast in this approach(unless we use SQL
>> broadcast hint again)
>>
>> When you say "it worked before",  was it with an older version of spark?
>> I'm trying this on 1.6.
>> If you still have the streaming job running can you verify in spark UI
>> that broadcast join is being used. Also, if the files are read and
>> broadcasted each batch??
>>
>> Thanks for the help!
>>
>>
>> On Fri, Feb 19, 2016 at 3:49 AM, Sebastian Piu <sebastian@gmail.com>
>> wrote:
>>
>>> I don't see anything obviously wrong on your second approach, I've done
>>> it like that before and it worked. When you say that it didn't work what do
>>> you mean? did it fail? it didnt broadcast?
>>>
>>> On Thu, Feb 18, 2016 at 11:43 PM Srikanth <srikanth...@gmail.com> wrote:
>>>
>>>> Code with SQL broadcast hint. This worked and I was able to see that
>>>> broadcastjoin was performed.
>>>>
>>>> val testDF = sqlContext.read.format("com.databricks.spark.csv")
>>>>
>>>>  .schema(schema).load("file:///shared/data/test-data.txt")
>>>>
>>>> val lines = ssc.socketTextStream("DevNode", )
>>>>
>>>> lines.foreachRDD((rdd, timestamp) => {
>>>>val recordDF = rdd.map(_.split(",")).map(l => Record(l(0).toInt,
>>>> l(1))).toDF()
>>>>val resultDF = recordDF.join(testDF, "Age")
>>>>
>>>>  
>>>> resultDF.write.format("com.databricks.spark.csv").save("file:///shared/data/output/streaming/"+timestamp)
>>>> }
>>>>
>>>> But for every batch this file was read and broadcast was performed.
>>>> Evaluating the entire DAG I guess.
>>>>   16/02/18 12:24:02 INFO HadoopRDD: Input split:
>>>> file:/shared/data/test-data.txt:27+28
>>>> 16/02/18 12:24:02 INFO HadoopRDD: Input split:
>>>> file:/shared/data/test-data.txt:0+27
>>>>
>>>> 16/02/18 12:25:00 INFO HadoopRDD: Input split:
>>>> file:/shared/data/test-data.txt:27+28
>>>> 16/02/18 12:25:00 INFO HadoopRDD: Input split:
>>>> file:/shared/data/test-data.txt:0+27
>>>>
>>>>
>>>> Then I changed code to broadcast the dataframe. This didn't work
>>>> either. Not sure if this is what you meant by broadcasting a dataframe.
>>>>
>>>> val testDF =
>>>> sc.broadcast(sqlContext.read.format("com.databricks.spark.csv")
>>>>
>>>>  .schema(schema).load("file:///shared/data/test-data.txt")
>>>>  )
>>>>
>>>> val lines = ssc.socketTextStream("DevNode", )
>>>>
>>>> lines.foreachRDD((rdd, timestamp) => {
>>>> val recordDF = rdd.map(_.split(",")).map(l => Record(l(0).toInt,
>>>> l(1))).toDF()
>>>> val resultDF = recordDF.join(testDF.value, "Age")
>>>>
>>>>  
>>>> resultDF.write.format("com.databricks.spark.csv").save("file:///shared/data/output/streaming/"+timestamp)
>>>> }
>>>>
>>>>
>>>> On Thu, Feb 18, 2016 at 12:55 PM, Sebastian Piu <
>>>> sebastian@gmail.com> wrote:
>>>>
>>>>> Can you paste the code where you use sc.broadcast ?
>>>>>
>>>>> O

Re: listening to recursive folder structures in s3 using pyspark streaming (textFileStream)

2016-02-19 Thread Srikanth
Apparently you can pass comma separated folders.
Try the suggestion given here -->
http://stackoverflow.com/questions/29426246/spark-streaming-textfilestream-not-supporting-wildcards
Let me know if this helps

Srikanth

On Wed, Feb 17, 2016 at 5:47 PM, Shixiong(Ryan) Zhu <shixi...@databricks.com
> wrote:

> textFileStream doesn't support that. It only supports monitoring one
> folder.
>
> On Wed, Feb 17, 2016 at 7:20 AM, in4maniac <sa...@skimlinks.com> wrote:
>
>> Hi all,
>>
>> I am new to pyspark streaming and I was following a tutorial I saw in the
>> internet
>> (
>> https://github.com/apache/spark/blob/master/examples/src/main/python/streaming/network_wordcount.py
>> ).
>> But I replaced the data input with an s3 directory path as:
>>
>> lines = ssc.textFileStream("s3n://bucket/first/second/third1/")
>>
>> When I run the code and upload a file to s3n://bucket/first/second/third1/
>> (such as s3n://bucket/first/second/third1/test1.txt), the file gets
>> processed as expected.
>>
>> Now I want it to listen to multiple directories and process files if they
>> get uploaded to any of the directories:
>> for example : [s3n://bucket/first/second/third1/,
>> s3n://bucket/first/second/third2/ and s3n://bucket/first/second/third3/]
>>
>> I tried to use the pattern similar to sc.TextFile as :
>>
>> lines = ssc.textFileStream("s3n://bucket/first/second/*/")
>>
>> But this actually didn't work. Can someone please explain to me how I
>> could
>> achieve my objective?
>>
>> thanks in advance !!!
>>
>> in4maniac
>>
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/listening-to-recursive-folder-structures-in-s3-using-pyspark-streaming-textFileStream-tp26247.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


Re: Streaming with broadcast joins

2016-02-19 Thread Srikanth
It didn't fail. It wasn't broadcasting. I just ran the test again and here
are the logs.
Every batch is reading the metadata file.

16/02/19 06:27:02 INFO HadoopRDD: Input split:
file:/shared/data/test-data.txt:0+27
16/02/19 06:27:02 INFO HadoopRDD: Input split:
file:/shared/data/test-data.txt:27+28

16/02/19 06:27:40 INFO HadoopRDD: Input split:
file:/shared/data/test-data.txt:27+28
16/02/19 06:27:40 INFO HadoopRDD: Input split:
file:/shared/data/test-data.txt:0+27

If I remember, foreachRDD is executed in the driver's context. Not sure how
we'll be able to achieve broadcast in this approach(unless we use SQL
broadcast hint again)

When you say "it worked before",  was it with an older version of spark?
I'm trying this on 1.6.
If you still have the streaming job running can you verify in spark UI that
broadcast join is being used. Also, if the files are read and broadcasted
each batch??

Thanks for the help!


On Fri, Feb 19, 2016 at 3:49 AM, Sebastian Piu <sebastian@gmail.com>
wrote:

> I don't see anything obviously wrong on your second approach, I've done it
> like that before and it worked. When you say that it didn't work what do
> you mean? did it fail? it didnt broadcast?
>
> On Thu, Feb 18, 2016 at 11:43 PM Srikanth <srikanth...@gmail.com> wrote:
>
>> Code with SQL broadcast hint. This worked and I was able to see that
>> broadcastjoin was performed.
>>
>> val testDF = sqlContext.read.format("com.databricks.spark.csv")
>>.schema(schema).load("file:///shared/data/test-data.txt")
>>
>> val lines = ssc.socketTextStream("DevNode", )
>>
>> lines.foreachRDD((rdd, timestamp) => {
>>val recordDF = rdd.map(_.split(",")).map(l => Record(l(0).toInt,
>> l(1))).toDF()
>>val resultDF = recordDF.join(testDF, "Age")
>>
>>  
>> resultDF.write.format("com.databricks.spark.csv").save("file:///shared/data/output/streaming/"+timestamp)
>> }
>>
>> But for every batch this file was read and broadcast was performed.
>> Evaluating the entire DAG I guess.
>>   16/02/18 12:24:02 INFO HadoopRDD: Input split:
>> file:/shared/data/test-data.txt:27+28
>> 16/02/18 12:24:02 INFO HadoopRDD: Input split:
>> file:/shared/data/test-data.txt:0+27
>>
>> 16/02/18 12:25:00 INFO HadoopRDD: Input split:
>> file:/shared/data/test-data.txt:27+28
>> 16/02/18 12:25:00 INFO HadoopRDD: Input split:
>> file:/shared/data/test-data.txt:0+27
>>
>>
>> Then I changed code to broadcast the dataframe. This didn't work either.
>> Not sure if this is what you meant by broadcasting a dataframe.
>>
>> val testDF =
>> sc.broadcast(sqlContext.read.format("com.databricks.spark.csv")
>> .schema(schema).load("file:///shared/data/test-data.txt")
>>  )
>>
>> val lines = ssc.socketTextStream("DevNode", )
>>
>> lines.foreachRDD((rdd, timestamp) => {
>> val recordDF = rdd.map(_.split(",")).map(l => Record(l(0).toInt,
>> l(1))).toDF()
>> val resultDF = recordDF.join(testDF.value, "Age")
>>
>>  
>> resultDF.write.format("com.databricks.spark.csv").save("file:///shared/data/output/streaming/"+timestamp)
>> }
>>
>>
>> On Thu, Feb 18, 2016 at 12:55 PM, Sebastian Piu <sebastian@gmail.com>
>> wrote:
>>
>>> Can you paste the code where you use sc.broadcast ?
>>>
>>> On Thu, Feb 18, 2016 at 5:32 PM Srikanth <srikanth...@gmail.com> wrote:
>>>
>>>> Sebastian,
>>>>
>>>> I was able to broadcast using sql broadcast hint. Question is how to
>>>> prevent this broadcast for each RDD.
>>>> Is there a way where it can be broadcast once and used locally for each
>>>> RDD?
>>>> Right now every batch the metadata file is read and the DF is
>>>> broadcasted.
>>>> I tried sc.broadcast and that did not provide this behavior.
>>>>
>>>> Srikanth
>>>>
>>>>
>>>> On Wed, Feb 17, 2016 at 4:53 PM, Sebastian Piu <sebastian@gmail.com
>>>> > wrote:
>>>>
>>>>> You should be able to broadcast that data frame using sc.broadcast and
>>>>> join against it.
>>>>>
>>>>> On Wed, 17 Feb 2016, 21:13 Srikanth <srikanth...@gmail.com> wrote:
>>>>>
>>>>>> Hello,
>>>>>>
>>>>>> I have a streaming use case where I plan to keep a 

Re: Streaming with broadcast joins

2016-02-18 Thread Srikanth
Code with SQL broadcast hint. This worked and I was able to see that
broadcastjoin was performed.

val testDF = sqlContext.read.format("com.databricks.spark.csv")
   .schema(schema).load("file:///shared/data/test-data.txt")

val lines = ssc.socketTextStream("DevNode", )

lines.foreachRDD((rdd, timestamp) => {
   val recordDF = rdd.map(_.split(",")).map(l => Record(l(0).toInt,
l(1))).toDF()
   val resultDF = recordDF.join(testDF, "Age")

 
resultDF.write.format("com.databricks.spark.csv").save("file:///shared/data/output/streaming/"+timestamp)
}

But for every batch this file was read and broadcast was performed.
Evaluating the entire DAG I guess.
  16/02/18 12:24:02 INFO HadoopRDD: Input split:
file:/shared/data/test-data.txt:27+28
16/02/18 12:24:02 INFO HadoopRDD: Input split:
file:/shared/data/test-data.txt:0+27

16/02/18 12:25:00 INFO HadoopRDD: Input split:
file:/shared/data/test-data.txt:27+28
16/02/18 12:25:00 INFO HadoopRDD: Input split:
file:/shared/data/test-data.txt:0+27


Then I changed code to broadcast the dataframe. This didn't work either.
Not sure if this is what you meant by broadcasting a dataframe.

val testDF = sc.broadcast(sqlContext.read.format("com.databricks.spark.csv")
.schema(schema).load("file:///shared/data/test-data.txt")
 )

val lines = ssc.socketTextStream("DevNode", )

lines.foreachRDD((rdd, timestamp) => {
val recordDF = rdd.map(_.split(",")).map(l => Record(l(0).toInt,
l(1))).toDF()
val resultDF = recordDF.join(testDF.value, "Age")

 
resultDF.write.format("com.databricks.spark.csv").save("file:///shared/data/output/streaming/"+timestamp)
}


On Thu, Feb 18, 2016 at 12:55 PM, Sebastian Piu <sebastian@gmail.com>
wrote:

> Can you paste the code where you use sc.broadcast ?
>
> On Thu, Feb 18, 2016 at 5:32 PM Srikanth <srikanth...@gmail.com> wrote:
>
>> Sebastian,
>>
>> I was able to broadcast using sql broadcast hint. Question is how to
>> prevent this broadcast for each RDD.
>> Is there a way where it can be broadcast once and used locally for each
>> RDD?
>> Right now every batch the metadata file is read and the DF is broadcasted.
>> I tried sc.broadcast and that did not provide this behavior.
>>
>> Srikanth
>>
>>
>> On Wed, Feb 17, 2016 at 4:53 PM, Sebastian Piu <sebastian@gmail.com>
>> wrote:
>>
>>> You should be able to broadcast that data frame using sc.broadcast and
>>> join against it.
>>>
>>> On Wed, 17 Feb 2016, 21:13 Srikanth <srikanth...@gmail.com> wrote:
>>>
>>>> Hello,
>>>>
>>>> I have a streaming use case where I plan to keep a dataset broadcasted
>>>> and cached on each executor.
>>>> Every micro batch in streaming will create a DF out of the RDD and join
>>>> the batch.
>>>> The below code will perform the broadcast operation for each RDD. Is
>>>> there a way to broadcast it just once?
>>>>
>>>> Alternate approachs are also welcome.
>>>>
>>>> val DF1 = sqlContext.read.format("json").schema(schema1).load(file1)
>>>>
>>>> val metaDF =
>>>> sqlContext.read.format("json").schema(schema1).load(file2)
>>>>   .join(DF1, "id")
>>>> metaDF.cache
>>>>
>>>>
>>>>   val lines = streamingcontext.textFileStream(path)
>>>>
>>>>   lines.foreachRDD( rdd => {
>>>>   val recordDF = rdd.flatMap(r => Record(r)).toDF()
>>>>   val joinedDF = recordDF.join(broadcast(metaDF), "id")
>>>>
>>>>   joinedDF.rdd.foreachPartition ( partition => {
>>>> partition.foreach( row => {
>>>>  ...
>>>>  ...
>>>> })
>>>>   })
>>>>   })
>>>>
>>>>  streamingcontext.start
>>>>
>>>> On a similar note, if the metaDF is too big for broadcast, can I
>>>> partition it(df.repartition($"col")) and also partition each streaming RDD?
>>>> This way I can avoid shuffling metaDF each time.
>>>>
>>>> Let me know you thoughts.
>>>>
>>>> Thanks
>>>>
>>>>
>>


Re: Streaming with broadcast joins

2016-02-18 Thread Srikanth
Sebastian,

I was able to broadcast using sql broadcast hint. Question is how to
prevent this broadcast for each RDD.
Is there a way where it can be broadcast once and used locally for each RDD?
Right now every batch the metadata file is read and the DF is broadcasted.
I tried sc.broadcast and that did not provide this behavior.

Srikanth


On Wed, Feb 17, 2016 at 4:53 PM, Sebastian Piu <sebastian@gmail.com>
wrote:

> You should be able to broadcast that data frame using sc.broadcast and
> join against it.
>
> On Wed, 17 Feb 2016, 21:13 Srikanth <srikanth...@gmail.com> wrote:
>
>> Hello,
>>
>> I have a streaming use case where I plan to keep a dataset broadcasted
>> and cached on each executor.
>> Every micro batch in streaming will create a DF out of the RDD and join
>> the batch.
>> The below code will perform the broadcast operation for each RDD. Is
>> there a way to broadcast it just once?
>>
>> Alternate approachs are also welcome.
>>
>> val DF1 = sqlContext.read.format("json").schema(schema1).load(file1)
>>
>> val metaDF =
>> sqlContext.read.format("json").schema(schema1).load(file2)
>>   .join(DF1, "id")
>> metaDF.cache
>>
>>
>>   val lines = streamingcontext.textFileStream(path)
>>
>>   lines.foreachRDD( rdd => {
>>   val recordDF = rdd.flatMap(r => Record(r)).toDF()
>>   val joinedDF = recordDF.join(broadcast(metaDF), "id")
>>
>>   joinedDF.rdd.foreachPartition ( partition => {
>> partition.foreach( row => {
>>  ...
>>  ...
>> })
>>   })
>>   })
>>
>>  streamingcontext.start
>>
>> On a similar note, if the metaDF is too big for broadcast, can I
>> partition it(df.repartition($"col")) and also partition each streaming RDD?
>> This way I can avoid shuffling metaDF each time.
>>
>> Let me know you thoughts.
>>
>> Thanks
>>
>>


Streaming with broadcast joins

2016-02-17 Thread Srikanth
Hello,

I have a streaming use case where I plan to keep a dataset broadcasted and
cached on each executor.
Every micro batch in streaming will create a DF out of the RDD and join the
batch.
The below code will perform the broadcast operation for each RDD. Is there
a way to broadcast it just once?

Alternate approachs are also welcome.

val DF1 = sqlContext.read.format("json").schema(schema1).load(file1)

val metaDF = sqlContext.read.format("json").schema(schema1).load(file2)
  .join(DF1, "id")
metaDF.cache


  val lines = streamingcontext.textFileStream(path)

  lines.foreachRDD( rdd => {
  val recordDF = rdd.flatMap(r => Record(r)).toDF()
  val joinedDF = recordDF.join(broadcast(metaDF), "id")

  joinedDF.rdd.foreachPartition ( partition => {
partition.foreach( row => {
 ...
 ...
})
  })
  })

 streamingcontext.start

On a similar note, if the metaDF is too big for broadcast, can I partition
it(df.repartition($"col")) and also partition each streaming RDD?
This way I can avoid shuffling metaDF each time.

Let me know you thoughts.

Thanks


spark-csv partitionBy

2016-02-09 Thread Srikanth
Hello,

I want to save Spark job result as LZO compressed CSV files partitioned by
one or more columns.
Given that partitionBy is not supported by spark-csv, is there any
recommendation for achieving this in user code?

One quick option is to
  i) cache the result dataframe
  ii) get unique partition keys
  iii) Iterate over keys and filter the result for that key

   rawDF.cache
   val idList =
rawDF.select($"ID").distinct.collect.toList.map(_.getLong(0))
  idList.foreach( id => {
val rows = rawDF.filter($"ID" === id)

rows.write.format("com.databricks.spark.csv").save(s"hdfs:///output/id=$id/")
  })

This approach doesn't scale well. Especially since no.of unique IDs can be
between 500-700.
And adding a second partition column will make this even worst.

Wondering if anyone has an efficient work around?

Srikanth


Re: Broadcast join on multiple dataframes

2016-02-04 Thread Srikanth
Hello,

Any pointers on what is causing the optimizer to convert broadcast to
shuffle join?
This join is with a file that is just 4kb in size.

Complete plan -->
https://www.dropbox.com/s/apuomw1dg0t1jtc/plan_with_select.txt?dl=0
DAG from UI -->
https://www.dropbox.com/s/4xc9d0rdkx2fun8/DAG_with_select.PNG?dl=0


== Optimized Logical Plan ==
Project [...]
+- Join LeftOuter, Some((start_ip#48L = start_ip_num#144L))
   :- Project [...]
   :  +- Join Inner, Some((cast(creative_id#9 as bigint) =
creative_id#130L))
   : :- Project [...]
   : :  +- Join Inner, Some((cast(strategy_id#10 as bigint) =
strategy_id#126L))
   : : :- Project [...]
   : : :  +- Join LeftOuter, Some((cast(exchange_id#13 as bigint) =
id#142L))
   : : : :- Project [...]
   : : : :  +- Join LeftOuter, Some((browser_id#59 =
technology_key#169))
   : : : : :- Project [...]
   : : : : :  +- Join LeftOuter,
Some((primary_browser_language#61 = id#166))
   : : : : : :- Project [...]
   : : : : : :  +- Filter ((NOT (campaign_id#12 = 0) &&
(mm_int_cost#36 < 100.0)) && ((cost_sum#41 < 100.0) &&
(total_spend#42 < 100.0)))
   : : : : : : +- Relation[...)
   : : : : : +- Project [id#166,two_letter_code#167]
   : : : : :+- BroadcastHint
   : : : : :   +- Relation[...
   : : : : +- BroadcastHint
   : : : :+- Relation[...
   : : : +- Project [description#141,id#142L]
   : : :+- BroadcastHint
   : : :   +- Relation[description#141,id#142L,name#143]
JSONRelation

== Physical Plan ==
Project [...]
+- SortMergeOuterJoin [start_ip#48L], [start_ip_num#144L], LeftOuter, None
   :- Sort [start_ip#48L ASC], false, 0
   :  +- TungstenExchange hashpartitioning(start_ip#48L,480), None
   : +- Project [...]
   :+- BroadcastHashJoin [cast(creative_id#9 as bigint)],
[creative_id#130L], BuildRight
   :   :- Project [...]
   :   :  +- BroadcastHashJoin [cast(strategy_id#10 as bigint)],
[strategy_id#126L], BuildRight
   :   : :- Project [...]
   :   : :  +- BroadcastHashOuterJoin [cast(exchange_id#13 as
bigint)], [id#142L], LeftOuter, None
   :   : : :- Project [...]
   :   : : :  +- BroadcastHashOuterJoin [browser_id#59],
[technology_key#169], LeftOuter, None
   :   : : : :- Project [...]
   :   : : : :  +- SortMergeOuterJoin
[primary_browser_language#61], [id#166], LeftOuter, None
   :   : : : : :- Sort [primary_browser_language#61
ASC], false, 0
   :   : : : : :  +- TungstenExchange
hashpartitioning(primary_browser_language#61,480), None
   :   : : : : : +- Project [...]
   :   : : : : :+- Filter (((NOT
(campaign_id#12 = 0) && (mm_int_cost#36 < 100.0)) && (cost_sum#41 <
100.0)) && (total_spend#42 < 100.0))
   :   : : : : :   +- Scan
CsvRelation(,Some(s3://
   :   : : : : +- Sort [id#166 ASC], false, 0
   :   : : : :+- TungstenExchange
hashpartitioning(id#166,480), None
   :   : : : :   +- Project
[id#166,two_letter_code#167]
   :   : : : :  +- Scan
CsvRelation(,Some(s3
   :   : : : +- ConvertToUnsafe
   :   : : :+- Scan
CsvRelation(,Some(s3://
   :   : : +- Project [description#141,id#142L]
   :   : :+- Scan
JSONRelation[description#141,id#142L,name#143] InputPaths: s3://
   :   : +- Project


Re: Broadcast join on multiple dataframes

2016-01-29 Thread Srikanth
Micheal,

Output of DF.queryExecution is saved to
https://www.dropbox.com/s/1vizuwpswza1e3x/plan.txt?dl=0
I don't see anything in this to suggest a switch in strategy. Hopefully you
find this helpful.

Srikanth

On Thu, Jan 28, 2016 at 4:43 PM, Michael Armbrust <mich...@databricks.com>
wrote:

> Can you provide the analyzed and optimized plans (explain(true))
>
> On Thu, Jan 28, 2016 at 12:26 PM, Srikanth <srikanth...@gmail.com> wrote:
>
>> Hello,
>>
>> I have a use case where one large table has to be joined with several
>> smaller tables.
>> I've added broadcast hint for all small tables in the joins.
>>
>> val largeTableDF = sqlContext.read.format("com.databricks.spark.csv")
>>
>> val metaActionDF = sqlContext.read.format("json")
>> val cidOrgDF = sqlContext.read.format("com.databricks.spark.csv")
>> val metaLocationDF =
>> sqlContext.read.format("json").option("samplingRatio","0.1").load(metaLocationFile)
>>.join(broadcast(metaActionDF),
>> "campaign_id")
>>.join(broadcast(cidOrgDF),
>> List("organization_id"), "left_outer")
>>
>> val metaCreativeDF = sqlContext.read.format("json")
>> val metaExchangeDF = sqlContext.read.format("json")
>> val localizationDF =
>> sqlContext.read.format("com.databricks.spark.csv")
>> val techKeyDF = sqlContext.read.format("com.databricks.spark.csv")
>>
>> val joinedBidderDF = largeTableDF.as("BID")
>> .join(broadcast(metaLocationDF),
>> "strategy_id")
>> .join(broadcast(metaCreativeDF),
>> "creative_id")
>> .join(broadcast(metaExchangeDF),
>> $"exchange_id" === $"id" , "left_outer")
>> .join(broadcast(techKeyDF).as("TK"),
>> $"BID.tech_id" === $"TK.tech_key" , "left_outer")
>> .join(broadcast(localizationDF).as("BL"),
>> $"BID.language" === $"BL.id" , "left_outer")
>>
>> When I look at the execution plan, all the joins are marked as
>> broadcastjoin.
>> But when I look at the spark job UI, the DAG visualization shows that
>> some joins are sortmerged with shuffle involved.
>> The ones that I've highlighted in yellow were shuffled.
>> DAG can be viewed here -
>> https://www.dropbox.com/s/mnxu5h067uyzdaj/DAG.PNG?dl=0
>>
>> Why is the actual execution as seen in the DAG different from the
>> physical plan pasted below.
>> I'm trying not to shuffle my largeTable. Any idea what is causing this?
>>
>> == Physical Plan ==
>>
>> BroadcastHashOuterJoin [language#61], [id#167], LeftOuter, None
>>
>> :- BroadcastHashOuterJoin [tech_id#59], [tech_key#170], LeftOuter, None
>>
>> :  :- BroadcastHashOuterJoin [cast(exchange_id#13 as bigint)], [id#143L],
>> LeftOuter, None
>>
>> :  :  :- Project [...]
>>
>> :  :  :  +- BroadcastHashJoin [cast(creative_id#9 as bigint)],
>> [creative_id#131L], BuildRight
>>
>> :  :  : :- Project [...]
>>
>> :  :  : :  +- BroadcastHashJoin [cast(strategy_id#10 as bigint)],
>> [strategy_id#127L], BuildRight
>>
>> :  :  : : :- ConvertToUnsafe
>>
>> :  :  : : :  +- Scan
>> CsvRelation(,Some(file:///shared/data/bidder/*.lzo),false,
>>
>> :  :  : : +- Project [...]
>>
>> :  :  : :+- BroadcastHashOuterJoin [organization_id#90L],
>> [cast(organization_id#102 as bigint)], LeftOuter, None
>>
>> :  :  : :   :- Project [...]
>>
>> :  :  : :   :  +- BroadcastHashJoin [campaign_id#105L],
>> [campaign_id#75L], BuildRight
>>
>> :  :  : :   : :- Project [...]
>>
>> :  :  : :   : :  +- Scan
>> JSONRelation[id#112L,name#115,campaign_id#105L] InputPaths:
>> file:/shared/data/t1_meta/t1_meta_strategy.jsonl
>>
>> :  :  : :   : +- Scan JSONRelation[] InputPaths:
>> file:/shared/data/t1_meta/t1_meta_campaign.jsonl
>>
>> :  :  : :   +- ConvertToUnsafe
>>
>> :  :  : :  +- Scan
>> CsvRelation(,Some(file:///shared/data/t1_meta/cid-orgs.txt),false,,,
>>
>> :  :  : +- Scan
>> JSONRelation[creative_id#131L,creative_name#132,concept_id#129L,concept_name#130]
>> InputPaths: file:/shared/data/t1_meta/t1_meta_creative.jsonl
>>
>> :  :  +- Scan JSONRelation[description#142,id#143L,name#144] InputPaths:
>> file:/shared/data/t1_meta/t1_meta_exchange.jsonl
>>
>> :  +- ConvertToUnsafe
>>
>> : +- Scan
>> CsvRelation(,Some(file:///shared/data/t1_meta/technology_key.txt),false,
>>
>>
>> +- ConvertToUnsafe
>>
>>+- Scan
>> CsvRelation(,Some(file:///shared/data/t1_meta/browser_languages.osv),false
>>
>>
>>
>> Srikanth
>>
>
>


Broadcast join on multiple dataframes

2016-01-28 Thread Srikanth
Hello,

I have a use case where one large table has to be joined with several
smaller tables.
I've added broadcast hint for all small tables in the joins.

val largeTableDF = sqlContext.read.format("com.databricks.spark.csv")

val metaActionDF = sqlContext.read.format("json")
val cidOrgDF = sqlContext.read.format("com.databricks.spark.csv")
val metaLocationDF =
sqlContext.read.format("json").option("samplingRatio","0.1").load(metaLocationFile)
   .join(broadcast(metaActionDF),
"campaign_id")
   .join(broadcast(cidOrgDF),
List("organization_id"), "left_outer")

val metaCreativeDF = sqlContext.read.format("json")
val metaExchangeDF = sqlContext.read.format("json")
val localizationDF = sqlContext.read.format("com.databricks.spark.csv")
val techKeyDF = sqlContext.read.format("com.databricks.spark.csv")

val joinedBidderDF = largeTableDF.as("BID")
.join(broadcast(metaLocationDF), "strategy_id")
.join(broadcast(metaCreativeDF), "creative_id")
.join(broadcast(metaExchangeDF), $"exchange_id"
=== $"id" , "left_outer")
.join(broadcast(techKeyDF).as("TK"),
$"BID.tech_id" === $"TK.tech_key" , "left_outer")
.join(broadcast(localizationDF).as("BL"),
$"BID.language" === $"BL.id" , "left_outer")

When I look at the execution plan, all the joins are marked as
broadcastjoin.
But when I look at the spark job UI, the DAG visualization shows that some
joins are sortmerged with shuffle involved.
The ones that I've highlighted in yellow were shuffled.
DAG can be viewed here -
https://www.dropbox.com/s/mnxu5h067uyzdaj/DAG.PNG?dl=0

Why is the actual execution as seen in the DAG different from the physical
plan pasted below.
I'm trying not to shuffle my largeTable. Any idea what is causing this?

== Physical Plan ==

BroadcastHashOuterJoin [language#61], [id#167], LeftOuter, None

:- BroadcastHashOuterJoin [tech_id#59], [tech_key#170], LeftOuter, None

:  :- BroadcastHashOuterJoin [cast(exchange_id#13 as bigint)], [id#143L],
LeftOuter, None

:  :  :- Project [...]

:  :  :  +- BroadcastHashJoin [cast(creative_id#9 as bigint)],
[creative_id#131L], BuildRight

:  :  : :- Project [...]

:  :  : :  +- BroadcastHashJoin [cast(strategy_id#10 as bigint)],
[strategy_id#127L], BuildRight

:  :  : : :- ConvertToUnsafe

:  :  : : :  +- Scan
CsvRelation(,Some(file:///shared/data/bidder/*.lzo),false,

:  :  : : +- Project [...]

:  :  : :+- BroadcastHashOuterJoin [organization_id#90L],
[cast(organization_id#102 as bigint)], LeftOuter, None

:  :  : :   :- Project [...]

:  :  : :   :  +- BroadcastHashJoin [campaign_id#105L],
[campaign_id#75L], BuildRight

:  :  : :   : :- Project [...]

:  :  : :   : :  +- Scan
JSONRelation[id#112L,name#115,campaign_id#105L] InputPaths:
file:/shared/data/t1_meta/t1_meta_strategy.jsonl

:  :  : :   : +- Scan JSONRelation[] InputPaths:
file:/shared/data/t1_meta/t1_meta_campaign.jsonl

:  :  : :   +- ConvertToUnsafe

:  :  : :  +- Scan
CsvRelation(,Some(file:///shared/data/t1_meta/cid-orgs.txt),false,,,

:  :  : +- Scan
JSONRelation[creative_id#131L,creative_name#132,concept_id#129L,concept_name#130]
InputPaths: file:/shared/data/t1_meta/t1_meta_creative.jsonl

:  :  +- Scan JSONRelation[description#142,id#143L,name#144] InputPaths:
file:/shared/data/t1_meta/t1_meta_exchange.jsonl

:  +- ConvertToUnsafe

: +- Scan
CsvRelation(,Some(file:///shared/data/t1_meta/technology_key.txt),false,


+- ConvertToUnsafe

   +- Scan
CsvRelation(,Some(file:///shared/data/t1_meta/browser_languages.osv),false



Srikanth


Adding additional jars to distributed cache (yarn-client)

2015-09-07 Thread Srikanth Sundarrajan
Hi,
Am trying to use JavaSparkContext() to create a new SparkContext and 
attempted to pass the requisite jars. But looks like they aren't getting added 
to the distributed cache automatically. Looking into 
YarnClientSchedulerBackend::start() and ClientArguments, it did seem like it 
would just add the SPARK_JAR and APP_JAR. Am wondering what is the best way to 
add additional files to Distributed cache and also have them appear in the 
classpath for ExecutorLauncher.

Thanks
Srikanth Sundarrajan
  

Dataframe collect() work but count() fails

2015-08-26 Thread Srikanth
Hello,

I'm seeing a strange behavior where count() on a DataFrame errors as shown
below but collect() works fine.
This is what I tried from spark-shell. solrRDD.queryShards() return a
javaRDD.

val rdd = solrRDD.queryShards(sc, query, _version_, 2).rdd
 rdd: org.apache.spark.rdd.RDD[org.apache.solr.common.SolrDocument] =
 MapPartitionsRDD[3] at flatMap at SolrRDD.java:335



 scala val schema = solrRDD.getQuerySchema(query)
 schema: org.apache.spark.sql.types.StructType =
 StructType(StructField(ApplicationType,StringType,true),
 StructField(Language,StringType,true),
 StructField(MfgCode,StringType,true),
 StructField(OpSystemCode,StringType,true),
 StructField(ProductCode,StringType,true),
 StructField(ProductName,StringType,true),
 StructField(ProductVersion,StringType,true),
 StructField(_version_,LongType,true), StructField(id,StringType,true))



 scala val rows = rdd.map(doc = RowFactory.create(schema.fieldNames.map(f
 = doc.getFirstValue(f))) ) //Convert RDD[SolrDocument] to RDD[Row]
 scala val df = sqlContext.createDataFrame(rows, schema)


scala val data = df.collect
 data: Array[org.apache.spark.sql.Row] = Array([[Ljava.lang.Object;@2135773a],
 [[Ljava.lang.Object;@3d2691de], [[Ljava.lang.Object;@2f32a52f],
 [[Ljava.lang.Object;@25fac8de]



 scala df.count
 15/08/26 14:53:28 WARN TaskSetManager: Lost task 1.3 in stage 6.0 (TID 42,
 172.19.110.1): java.lang.AssertionError: assertion failed: Row column
 number mismatch, expected 9 columns, but got 1.
 Row content: [[Ljava.lang.Object;@1d962eb2]
 at scala.Predef$.assert(Predef.scala:179)
 at
 org.apache.spark.sql.columnar.InMemoryRelation$$anonfun$3$$anon$1.next(InMemoryColumnarTableScan.scala:140)
 at
 org.apache.spark.sql.columnar.InMemoryRelation$$anonfun$3$$anon$1.next(InMemoryColumnarTableScan.scala:124)
 at
 org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:277)


Any idea what is wrong here?

Srikanth


Re: Estimate size of Dataframe programatically

2015-08-10 Thread Srikanth
SizeEstimator.estimate(df) will not give the size of dataframe rt? I think
it will give size of df object.

With RDD, I sample() and collect() and sum size of each row. If I do the
same with dataframe it will no longer be size when represented in columnar
format.

I'd also like to know how spark.sql.autoBroadcastJoinThreshold estimates
size of dataframe. Is it going to broadcast when columnar storage size is
less that 10 MB?

Srikanth


On Fri, Aug 7, 2015 at 2:51 PM, Ted Yu yuzhih...@gmail.com wrote:

 Have you tried calling SizeEstimator.estimate() on a DataFrame ?

 I did the following in REPL:

 scala SizeEstimator.estimate(df)
 res1: Long = 17769680

 FYI

 On Fri, Aug 7, 2015 at 6:48 AM, Srikanth srikanth...@gmail.com wrote:

 Hello,

 Is there a way to estimate the approximate size of a dataframe? I know we
 can cache and look at the size in UI but I'm trying to do this
 programatically. With RDD, I can sample and sum up size using
 SizeEstimator. Then extrapolate it to the entire RDD. That will give me
 approx size of RDD. With dataframes, its tricky due to columnar storage.
 How do we do it?

 On a related note, I see size of RDD object to be ~60MB. Is that the
 footprint of RDD in driver JVM?

 scala val temp = sc.parallelize(Array(1,2,3,4,5,6))
 scala SizeEstimator.estimate(temp)
 res13: Long = 69507320

 Srikanth





Estimate size of Dataframe programatically

2015-08-07 Thread Srikanth
Hello,

Is there a way to estimate the approximate size of a dataframe? I know we
can cache and look at the size in UI but I'm trying to do this
programatically. With RDD, I can sample and sum up size using
SizeEstimator. Then extrapolate it to the entire RDD. That will give me
approx size of RDD. With dataframes, its tricky due to columnar storage.
How do we do it?

On a related note, I see size of RDD object to be ~60MB. Is that the
footprint of RDD in driver JVM?

scala val temp = sc.parallelize(Array(1,2,3,4,5,6))
scala SizeEstimator.estimate(temp)
res13: Long = 69507320

Srikanth


How does DataFrame except work?

2015-08-03 Thread Srikanth
Hello,

I'm planning to use DF1.except(DF2) to get difference between two
dataframes. I'd like to know how exactly this API works.
Both explain() and spark UI show except as an operation on its own.
Internally, does does it do a hash partition of both dataframes?
If so will it do auto broadcast if second dataframe is small enough?

Srikanth


spark-csv number of partitions

2015-07-28 Thread Srikanth
Hello,

I'm using spark-csv instead of sc.textfile() to work with CSV files.
How can I set no.of partitions that will be created when reading a CSV?
Basically an equivalent for minPartitions in textFile()

val myrdd = sc.textFile(my.csv,24)


Srikanth


spark.deploy.spreadOut core allocation

2015-07-22 Thread Srikanth
Hello,

I've set spark.deploy.spreadOut=false in spark-env.sh.

 export SPARK_MASTER_OPTS=-Dspark.deploy.defaultCores=4
 -Dspark.deploy.spreadOut=false


There are 3 workers each with 4 cores. Spark-shell was started with noof
cores = 6.
Spark UI show that one executor was used with 6 cores.

Is this a bug? This is with Spark 1.4.

[image: Inline image 1]

Srikanth


Re: spark.deploy.spreadOut core allocation

2015-07-22 Thread Srikanth
Cool. Thanks!

Srikanth

On Wed, Jul 22, 2015 at 3:12 PM, Andrew Or and...@databricks.com wrote:

 Hi Srikanth,

 I was able to reproduce the issue by setting `spark.cores.max` to a number
 greater than the number of cores on a worker. I've filed SPARK-9260 which I
 believe is already being fixed in
 https://github.com/apache/spark/pull/7274.

 Thanks for reporting the issue!
 -Andrew

 2015-07-22 11:49 GMT-07:00 Andrew Or and...@databricks.com:

 Hi Srikanth,

 It does look like a bug. Did you set `spark.executor.cores` in your
 application by any chance?

 -Andrew

 2015-07-22 8:05 GMT-07:00 Srikanth srikanth...@gmail.com:

 Hello,

 I've set spark.deploy.spreadOut=false in spark-env.sh.

 export SPARK_MASTER_OPTS=-Dspark.deploy.defaultCores=4
 -Dspark.deploy.spreadOut=false


 There are 3 workers each with 4 cores. Spark-shell was started with noof
 cores = 6.
 Spark UI show that one executor was used with 6 cores.

 Is this a bug? This is with Spark 1.4.

 [image: Inline image 1]

 Srikanth






ShuffledHashJoin instead of CartesianProduct

2015-07-22 Thread Srikanth
Hello,

I'm trying to link records from two large data sources. Both datasets have
almost same number of rows.
Goal is to match records based on multiple columns.

val matchId =
 SFAccountDF.as(SF).join(ELAccountDF.as(EL)).where($SF.Email ===
 $EL.EmailAddress || $SF.Phone === EL.Phone)


Joining with a OR(||) will result in a CartesianProduct. I'm trying to
avoid that.
One way to do this is to join on each column and UNION the results.


 val phoneMatch = SFAccountDF.as(SF).filter(Phone !=
 '').join(ELAccountDF.as(EL).filter(BusinessPhone !=
 '')).where($SF.Phone === $EL.BusinessPhone)
 val emailMatch = SFAccountDF.as(SF).filter(Email !=
 '').join(ELAccountDF.as(EL).filter(EmailAddress !=
 '')).where($SF.Email === $EL.EmailAddress)

 val matchId =
 phoneMatch.unionAll(emailMatch.unionAll(faxMatch.unionAll(mobileMatch)))
 matchId.cache().registerTempTable(matchId)


Is there a more elegant way to do this?

On a related note, has anyone worked on record linkage using Bloom Filters,
Levenshtein distance, etc in Spark?

Srikanth


Re: RowId unique key for Dataframes

2015-07-21 Thread Srikanth
Will work. Thanks!
zipWithUniqueId() doesn't guarantee continuous ID either.

Srikanth

On Tue, Jul 21, 2015 at 9:48 PM, Burak Yavuz brk...@gmail.com wrote:

 Would monotonicallyIncreasingId
 https://github.com/apache/spark/blob/d4c7a7a3642a74ad40093c96c4bf45a62a470605/sql/core/src/main/scala/org/apache/spark/sql/functions.scala#L637
 work for you?

 Best,
 Burak



 On Tue, Jul 21, 2015 at 4:55 PM, Srikanth srikanth...@gmail.com wrote:

 Hello,

 I'm creating dataframes from three CSV files using spark-csv package. I
 want to add a unique ID for each row in dataframe.
 Not sure how withColumn() can be used to achieve this. I need a Long
 value not an UUID.

 One option I found was to create a RDD and use zipWithUniqueId.

 sqlContext.textFile(file).
 zipWithUniqueId().
 map(case(d, i)=i.toString + delimiter + d).
 map(_.split(delimiter)).
 map(s=caseclass(...))

 .toDF().select(field1, field2)


 Its a bit hacky. Is there an easier way to do this on dataframes and use
 spark-csv?

 Srikanth





Re: HiveThriftServer2.startWithContext error with registerTempTable

2015-07-16 Thread Srikanth
Cheng,

Yes, select * from temp_table was working. I was able to perform some
transformation+action on the dataframe and print it on console.
HiveThriftServer2.startWithContext was being run on the same session.

When you say try --jars option, are you asking me to pass spark-csv jar?
I'm already doing this with --packages com.databricks:spark-csv_2.10:1.0.3
Not sure if I'm missing your point here.

Anyways, I gave it a shot. I downloaded spark-csv_2.10-0.1.jar and started
spark-shell with --jars.
I still get the same exception. I'm pasting the exception below.

scala 15/07/16 11:29:22 ERROR SparkExecuteStatementOperation: Error
executing query:
java.lang.ClassNotFoundException:
com.databricks.spark.csv.CsvRelation$$anonfun$buildScan$1$$anonfun$1
at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
at java.net.URLClassLoader$1.run(URLClassLoader.java:355)

15/07/16 11:29:22 WARN ThriftCLIService: Error executing statement:
org.apache.hive.service.cli.HiveSQLException:
java.lang.ClassNotFoundException:
com.databricks.spark.csv.CsvRelation$$anonfun$buildScan$1$$anonfun$1
at
org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation.run(Shim13.scala:206)
at
org.apache.hive.service.cli.session.HiveSessionImpl.executeStatementInternal(HiveSessionImpl.java:231)
at
org.apache.hive.service.cli.session.HiveSessionImpl.executeStatementAsync(HiveSessionImpl.java:218)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

Srikanth




On Thu, Jul 16, 2015 at 12:44 AM, Cheng, Hao hao.ch...@intel.com wrote:

  Have you ever try query the “select * from temp_table” from the spark
 shell? Or can you try the option --jars while starting the spark shell?



 *From:* Srikanth [mailto:srikanth...@gmail.com]
 *Sent:* Thursday, July 16, 2015 9:36 AM
 *To:* user
 *Subject:* Re: HiveThriftServer2.startWithContext error with
 registerTempTable



 Hello,



 Re-sending this to see if I'm second time lucky!

 I've not managed to move past this error.



 Srikanth



 On Mon, Jul 13, 2015 at 9:14 PM, Srikanth srikanth...@gmail.com wrote:

  Hello,



 I want to expose result of Spark computation to external tools. I plan to
 do this with Thrift server JDBC interface by registering result Dataframe
 as temp table.

 I wrote a sample program in spark-shell to test this.



 val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
 import hiveContext.implicits._
 HiveThriftServer2.startWithContext(hiveContext)
 val myDF =
 hiveContext.read.format(com.databricks.spark.csv).option(header,
 true).load(/datafolder/weblog/pages.csv)
 myDF.registerTempTable(temp_table)



 I'm able to see the temp table in Beeline



 +-+--+
 |  tableName  | isTemporary  |
 +-+--+
 | temp_table  | true |
 | my_table| false|
 +-+--+



 Now when I issue select * from temp_table from Beeline, I see below
 exception in spark-shell



 15/07/13 17:18:27 WARN ThriftCLIService: Error executing statement:

 org.apache.hive.service.cli.HiveSQLException: 
 *java.lang.ClassNotFoundException:
 com.databricks.spark.csv.CsvRelation$$anonfun$buildScan$1$$anonfun$1*

 at
 org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation.run(Shim13.scala:206)

 at
 org.apache.hive.service.cli.session.HiveSessionImpl.executeStatementInternal(HiveSessionImpl.java:231)

 at
 org.apache.hive.service.cli.session.HiveSessionImpl.executeStatementAsync(HiveSessionImpl.java:218)

 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)



 I'm able to read the other table(my_table) from Beeline though.

 Any suggestions on how to overcome this?



 This is with Spark 1.4 pre-built version. Spark-shell was started with
 --package to pass spark-csv.



 Srikanth





Re: HiveThriftServer2.startWithContext error with registerTempTable

2015-07-15 Thread Srikanth
Hello,

Re-sending this to see if I'm second time lucky!
I've not managed to move past this error.

Srikanth

On Mon, Jul 13, 2015 at 9:14 PM, Srikanth srikanth...@gmail.com wrote:

 Hello,

 I want to expose result of Spark computation to external tools. I plan to
 do this with Thrift server JDBC interface by registering result Dataframe
 as temp table.
 I wrote a sample program in spark-shell to test this.

 val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
 import hiveContext.implicits._
 HiveThriftServer2.startWithContext(hiveContext)
 val myDF =
 hiveContext.read.format(com.databricks.spark.csv).option(header,
 true).load(/datafolder/weblog/pages.csv)
 myDF.registerTempTable(temp_table)


 I'm able to see the temp table in Beeline

 +-+--+
 |  tableName  | isTemporary  |
 +-+--+
 | temp_table  | true |
 | my_table| false|
 +-+--+


 Now when I issue select * from temp_table from Beeline, I see below
 exception in spark-shell

 15/07/13 17:18:27 WARN ThriftCLIService: Error executing statement:
 org.apache.hive.service.cli.HiveSQLException: 
 *java.lang.ClassNotFoundException:
 com.databricks.spark.csv.CsvRelation$$anonfun$buildScan$1$$anonfun$1*
 at
 org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation.run(Shim13.scala:206)
 at
 org.apache.hive.service.cli.session.HiveSessionImpl.executeStatementInternal(HiveSessionImpl.java:231)
 at
 org.apache.hive.service.cli.session.HiveSessionImpl.executeStatementAsync(HiveSessionImpl.java:218)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

 I'm able to read the other table(my_table) from Beeline though.
 Any suggestions on how to overcome this?

 This is with Spark 1.4 pre-built version. Spark-shell was started with
 --package to pass spark-csv.

 Srikanth



cache() VS cacheTable()

2015-07-13 Thread Srikanth
Hello,

I was reading learning spark book and saw a tip in chapter 9 that read
   In Spark 1.2, the regular cache() method on RDDs also results in a
cacheTable()

Is that true? When I cache a RDD and cache same data as a dataframe I see
that memory usage for dataframe cache is way less than RDD cache. I thought
this difference is due to columnar format used by dataframe. As per the
statement in the book, cache size should be similar.

Srikanth


HiveThriftServer2.startWithContext error with registerTempTable

2015-07-13 Thread Srikanth
Hello,

I want to expose result of Spark computation to external tools. I plan to
do this with Thrift server JDBC interface by registering result Dataframe
as temp table.
I wrote a sample program in spark-shell to test this.

val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
 import hiveContext.implicits._
 HiveThriftServer2.startWithContext(hiveContext)
 val myDF =
 hiveContext.read.format(com.databricks.spark.csv).option(header,
 true).load(/datafolder/weblog/pages.csv)
 myDF.registerTempTable(temp_table)


I'm able to see the temp table in Beeline

+-+--+
 |  tableName  | isTemporary  |
 +-+--+
 | temp_table  | true |
 | my_table| false|
 +-+--+


Now when I issue select * from temp_table from Beeline, I see below
exception in spark-shell

15/07/13 17:18:27 WARN ThriftCLIService: Error executing statement:
org.apache.hive.service.cli.HiveSQLException:
*java.lang.ClassNotFoundException:
com.databricks.spark.csv.CsvRelation$$anonfun$buildScan$1$$anonfun$1*
at
org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation.run(Shim13.scala:206)
at
org.apache.hive.service.cli.session.HiveSessionImpl.executeStatementInternal(HiveSessionImpl.java:231)
at
org.apache.hive.service.cli.session.HiveSessionImpl.executeStatementAsync(HiveSessionImpl.java:218)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

I'm able to read the other table(my_table) from Beeline though.
Any suggestions on how to overcome this?

This is with Spark 1.4 pre-built version. Spark-shell was started with
--package to pass spark-csv.

Srikanth


Re: How do we control output part files created by Spark job?

2015-07-11 Thread Srikanth
Reducing no.of partitions may have impact on memory consumption. Especially
if there is uneven distribution of key used in groupBy.
Depends on your dataset.

On Sat, Jul 11, 2015 at 5:06 AM, Umesh Kacha umesh.ka...@gmail.com wrote:

 Hi Sriknath thanks much it worked when I set spark.sql.shuffle.partitions=10
 I think reducing shuffle partitions will slower my group by query of
 hiveContext or it wont slow it down please guide.

 On Sat, Jul 11, 2015 at 7:41 AM, Srikanth srikanth...@gmail.com wrote:

 Is there a join involved in your sql?
 Have a look at spark.sql.shuffle.partitions?

 Srikanth

 On Wed, Jul 8, 2015 at 1:29 AM, Umesh Kacha umesh.ka...@gmail.com
 wrote:

 Hi Srikant thanks for the response. I have the following code:

 hiveContext.sql(insert into... ).coalesce(6)

 Above code does not create 6 part files it creates around 200 small
 files.

 Please guide. Thanks.
 On Jul 8, 2015 4:07 AM, Srikanth srikanth...@gmail.com wrote:

 Did you do

 yourRdd.coalesce(6).saveAsTextFile()

 or

 yourRdd.coalesce(6)
 yourRdd.saveAsTextFile()
 ?

 Srikanth

 On Tue, Jul 7, 2015 at 12:59 PM, Umesh Kacha umesh.ka...@gmail.com
 wrote:

 Hi I tried both approach using df. repartition(6) and df.coalesce(6)
 it doesn't reduce part-x files. Even after calling above method I 
 still
 see around 200 small part files of size 20 mb each which is again orc 
 files.


 On Tue, Jul 7, 2015 at 12:52 AM, Sathish Kumaran Vairavelu 
 vsathishkuma...@gmail.com wrote:

 Try coalesce function to limit no of part files
 On Mon, Jul 6, 2015 at 1:23 PM kachau umesh.ka...@gmail.com wrote:

 Hi I am having couple of Spark jobs which processes thousands of
 files every
 day. File size may very from MBs to GBs. After finishing job I
 usually save
 using the following code

 finalJavaRDD.saveAsParquetFile(/path/in/hdfs); OR
 dataFrame.write.format(orc).save(/path/in/hdfs) //storing as ORC
 file as
 of Spark 1.4

 Spark job creates plenty of small part files in final output
 directory. As
 far as I understand Spark creates part file for each partition/task
 please
 correct me if I am wrong. How do we control amount of part files
 Spark
 creates? Finally I would like to create Hive table using these
 parquet/orc
 directory and I heard Hive is slow when we have large no of small
 files.
 Please guide I am new to Spark. Thanks in advance.



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/How-do-we-control-output-part-files-created-by-Spark-job-tp23649.html
 Sent from the Apache Spark User List mailing list archive at
 Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org








Re: How do we control output part files created by Spark job?

2015-07-10 Thread Srikanth
Is there a join involved in your sql?
Have a look at spark.sql.shuffle.partitions?

Srikanth

On Wed, Jul 8, 2015 at 1:29 AM, Umesh Kacha umesh.ka...@gmail.com wrote:

 Hi Srikant thanks for the response. I have the following code:

 hiveContext.sql(insert into... ).coalesce(6)

 Above code does not create 6 part files it creates around 200 small files.

 Please guide. Thanks.
 On Jul 8, 2015 4:07 AM, Srikanth srikanth...@gmail.com wrote:

 Did you do

 yourRdd.coalesce(6).saveAsTextFile()

 or

 yourRdd.coalesce(6)
 yourRdd.saveAsTextFile()
 ?

 Srikanth

 On Tue, Jul 7, 2015 at 12:59 PM, Umesh Kacha umesh.ka...@gmail.com
 wrote:

 Hi I tried both approach using df. repartition(6) and df.coalesce(6) it
 doesn't reduce part-x files. Even after calling above method I still
 see around 200 small part files of size 20 mb each which is again orc files.


 On Tue, Jul 7, 2015 at 12:52 AM, Sathish Kumaran Vairavelu 
 vsathishkuma...@gmail.com wrote:

 Try coalesce function to limit no of part files
 On Mon, Jul 6, 2015 at 1:23 PM kachau umesh.ka...@gmail.com wrote:

 Hi I am having couple of Spark jobs which processes thousands of files
 every
 day. File size may very from MBs to GBs. After finishing job I usually
 save
 using the following code

 finalJavaRDD.saveAsParquetFile(/path/in/hdfs); OR
 dataFrame.write.format(orc).save(/path/in/hdfs) //storing as ORC
 file as
 of Spark 1.4

 Spark job creates plenty of small part files in final output
 directory. As
 far as I understand Spark creates part file for each partition/task
 please
 correct me if I am wrong. How do we control amount of part files Spark
 creates? Finally I would like to create Hive table using these
 parquet/orc
 directory and I heard Hive is slow when we have large no of small
 files.
 Please guide I am new to Spark. Thanks in advance.



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/How-do-we-control-output-part-files-created-by-Spark-job-tp23649.html
 Sent from the Apache Spark User List mailing list archive at
 Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org






Re: Parallelizing multiple RDD / DataFrame creation in Spark

2015-07-08 Thread Srikanth
Your tableLoad() APIs are not actions. File will be read fully only when an
action is performed.
If the action is something like table1.join(table2), then I think both
files will be read in parallel.
Can you try that and look at the execution plan or in 1.4 this is shown in
Spark UI.

Srikanth

On Wed, Jul 8, 2015 at 11:06 AM, Brandon White bwwintheho...@gmail.com
wrote:

 The point of running them in parallel would be faster creation of the
 tables. Has anybody been able to efficiently parallelize something like
 this in Spark?
 On Jul 8, 2015 12:29 AM, Akhil Das ak...@sigmoidanalytics.com wrote:

 Whats the point of creating them in parallel? You can multi-thread it run
 it in parallel though.

 Thanks
 Best Regards

 On Wed, Jul 8, 2015 at 5:34 AM, Brandon White bwwintheho...@gmail.com
 wrote:

 Say I have a spark job that looks like following:

 def loadTable1() {
   val table1 = sqlContext.jsonFile(ss3://textfiledirectory/)
   table1.cache().registerTempTable(table1)}
 def loadTable2() {
   val table2 = sqlContext.jsonFile(ss3://testfiledirectory2/)
   table2.cache().registerTempTable(table2)}

 def loadAllTables() {
   loadTable1()
   loadTable2()}

 loadAllTables()

 How do I parallelize this Spark job so that both tables are created at
 the same time or in parallel?





Re: How do we control output part files created by Spark job?

2015-07-07 Thread Srikanth
Did you do

yourRdd.coalesce(6).saveAsTextFile()

or

yourRdd.coalesce(6)
yourRdd.saveAsTextFile()
?

Srikanth

On Tue, Jul 7, 2015 at 12:59 PM, Umesh Kacha umesh.ka...@gmail.com wrote:

 Hi I tried both approach using df. repartition(6) and df.coalesce(6) it
 doesn't reduce part-x files. Even after calling above method I still
 see around 200 small part files of size 20 mb each which is again orc files.


 On Tue, Jul 7, 2015 at 12:52 AM, Sathish Kumaran Vairavelu 
 vsathishkuma...@gmail.com wrote:

 Try coalesce function to limit no of part files
 On Mon, Jul 6, 2015 at 1:23 PM kachau umesh.ka...@gmail.com wrote:

 Hi I am having couple of Spark jobs which processes thousands of files
 every
 day. File size may very from MBs to GBs. After finishing job I usually
 save
 using the following code

 finalJavaRDD.saveAsParquetFile(/path/in/hdfs); OR
 dataFrame.write.format(orc).save(/path/in/hdfs) //storing as ORC
 file as
 of Spark 1.4

 Spark job creates plenty of small part files in final output directory.
 As
 far as I understand Spark creates part file for each partition/task
 please
 correct me if I am wrong. How do we control amount of part files Spark
 creates? Finally I would like to create Hive table using these
 parquet/orc
 directory and I heard Hive is slow when we have large no of small files.
 Please guide I am new to Spark. Thanks in advance.



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/How-do-we-control-output-part-files-created-by-Spark-job-tp23649.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org





Re: BroadcastHashJoin when RDD is not cached

2015-07-02 Thread Srikanth
Good to know this will be in next release. Thanks.

On Wed, Jul 1, 2015 at 3:13 PM, Michael Armbrust mich...@databricks.com
wrote:

 We don't know that the table is small unless you cache it.  In Spark 1.5
 you'll be able to give us a hint though (
 https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/functions.scala#L581
 )

 On Wed, Jul 1, 2015 at 8:30 AM, Srikanth srikanth...@gmail.com wrote:

 Hello,



 I have a straight forward use case of joining a large table with a
 smaller table. The small table is within the limit I set for
 spark.sql.autoBroadcastJoinThreshold.

 I notice that ShuffledHashJoin is used to perform the join.
 BroadcastHashJoin was used only when I pre-fetched and cached the small
 table.

 I understand that for typical broadcast we would have to read and
 collect() the small table in driver before broadcasting.

 Why not do this automatically for joins? That way stage1(read large
 table) and stage2(read small table) can still be run in parallel.





 Sort [emailId#19 ASC,date#0 ASC], true

  Exchange (RangePartitioning 24)

   Project [emailId#19,ip#7,date#0,siteName#1,uri#3,status#12,csbytes#16L]

Filter ((lowerTime#22 = date#0)  (date#0 = upperTime#23))

 *ShuffledHashJoin* [ip#7], [ip#18], BuildRight

  Exchange (HashPartitioning 24)

   Project [ip#7,date#0,siteName#1,uri#3,status#12,csbytes#16L]

PhysicalRDD
 [date#0,siteName#1,method#2,uri#3,query#4,port#5,userName#6],
 MapPartitionsRDD[6] at rddToDataFrameHolder at DataSourceReader.scala:25

  Exchange (HashPartitioning 24)

   Project [emailId#19,scalaUDF(date#20) AS
 upperTime#23,ip#18,scalaUDF(date#20) AS lowerTime#22]

PhysicalRDD [ip#18,emailId#19,date#20], MapPartitionsRDD[12] at
 rddToDataFrameHolder at DataSourceReader.scala:41


 Srikanth





BroadcastHashJoin when RDD is not cached

2015-07-01 Thread Srikanth
Hello,



I have a straight forward use case of joining a large table with a smaller
table. The small table is within the limit I set for
spark.sql.autoBroadcastJoinThreshold.

I notice that ShuffledHashJoin is used to perform the join.
BroadcastHashJoin was used only when I pre-fetched and cached the small
table.

I understand that for typical broadcast we would have to read and collect()
the small table in driver before broadcasting.

Why not do this automatically for joins? That way stage1(read large table)
and stage2(read small table) can still be run in parallel.





Sort [emailId#19 ASC,date#0 ASC], true

 Exchange (RangePartitioning 24)

  Project [emailId#19,ip#7,date#0,siteName#1,uri#3,status#12,csbytes#16L]

   Filter ((lowerTime#22 = date#0)  (date#0 = upperTime#23))

*ShuffledHashJoin* [ip#7], [ip#18], BuildRight

 Exchange (HashPartitioning 24)

  Project [ip#7,date#0,siteName#1,uri#3,status#12,csbytes#16L]

   PhysicalRDD
[date#0,siteName#1,method#2,uri#3,query#4,port#5,userName#6],
MapPartitionsRDD[6] at rddToDataFrameHolder at DataSourceReader.scala:25

 Exchange (HashPartitioning 24)

  Project [emailId#19,scalaUDF(date#20) AS
upperTime#23,ip#18,scalaUDF(date#20) AS lowerTime#22]

   PhysicalRDD [ip#18,emailId#19,date#20], MapPartitionsRDD[12] at
rddToDataFrameHolder at DataSourceReader.scala:41


Srikanth


Re: Spark 1.4 RDD to DF fails with toDF()

2015-06-29 Thread Srikanth
My error was related to Scala version. Upon further reading, I realized
that it takes some effort to get Spark working with Scala 2.11.
I've reverted to using 2.10 and moved past that error. Now I hit the issue
you mentioned. Waiting for 1.4.1.

Srikanth

On Fri, Jun 26, 2015 at 9:10 AM, Roberto Coluccio 
roberto.coluc...@gmail.com wrote:

 I got a similar issue. Might your as well be related to this
 https://issues.apache.org/jira/browse/SPARK-8368 ?

 On Fri, Jun 26, 2015 at 2:00 PM, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 Those provided spark libraries are compatible with scala 2.11?

 Thanks
 Best Regards

 On Fri, Jun 26, 2015 at 4:48 PM, Srikanth srikanth...@gmail.com wrote:

 Thanks Akhil for checking this out. Here is my build.sbt.

 name := Weblog Analysis

 version := 1.0

 scalaVersion := 2.11.5

 javacOptions ++= Seq(-source, 1.7, -target, 1.7)

 libraryDependencies ++= Seq(
   org.apache.spark %% spark-core % 1.4.0 % provided,
   org.apache.spark %% spark-sql % 1.4.0,
   org.apache.spark %% spark-streaming % 1.4.0,
   org.apache.spark %% spark-streaming-kafka % 1.4.0,
   org.apache.spark %% spark-mllib % 1.4.0,
   org.apache.commons % commons-lang3 % 3.0,
   org.eclipse.jetty  % jetty-client % 8.1.14.v20131031,
   org.scalatest %% scalatest % 2.2.1 % test,
   com.databricks % spark-csv_2.11 % 1.0.3,
   joda-time % joda-time % 2.8.1,
   org.joda  % joda-convert % 1.7
 )

 resolvers ++= Seq(
   Sonatype OSS Snapshots  at 
 http://oss.sonatype.org/content/repositories/snapshots/;,
   Sonatype public at 
 http://oss.sonatype.org/content/groups/public/;,
   Sonatype at 
 http://nexus.scala-tools.org/content/repositories/public;,
   Scala Tools at http://scala-tools.org/repo-snapshots/;,
   Typesafeat 
 http://repo.typesafe.com/typesafe/releases/;,
   Akka at http://akka.io/repository/;,
   JBoss   at 
 http://repository.jboss.org/nexus/content/groups/public/;,
   GuiceyFruit at http://guiceyfruit.googlecode.com/svn/repo/releases/
 
 )

 On Fri, Jun 26, 2015 at 4:13 AM, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 Its a scala version conflict, can you paste your build.sbt file?

 Thanks
 Best Regards

 On Fri, Jun 26, 2015 at 7:05 AM, stati srikanth...@gmail.com wrote:

 Hello,

 When I run a spark job with spark-submit it fails with below exception
 for
 code line
/*val webLogDF = webLogRec.toDF().select(ip, date,
 name)*/

 I had similar issue running from spark-shell, then realized that I
 needed
 sqlContext.implicit._
 Now my code has the following imports
 /*
  import org.apache.spark._
  import org.apache.spark.sql._
  import org.apache.spark.sql.functions._
  val sqlContext = new SQLContext(sc)
  import sqlContext.implicits._
 */

 Code works fine from spark-shell REPL. It also runs fine when run in
 local
 mode from Eclipse. I get this
 error only when I submit to cluster using spark-submit.
 bin/spark-submit /local/weblog-analysis_2.11-1.0.jar --class
 WebLogAnalysis
 --master spark://machu:7077

 I'm testing with spark 1.4. My code was built using scala 2.11 and
 spark+sparkSQL 1.4.0 as dependency in build.sbt

 Exception in thread main java.lang.NoSuchMethodError:

 scala.reflect.api.JavaUniverse.runtimeMirror(Ljava/lang/ClassLoader;)Lscala/reflect/api/JavaUniverse$JavaMirror;
 at WebLogAnalysis$.readWebLogFiles(WebLogAnalysis.scala:38)
 at WebLogAnalysis$.main(WebLogAnalysis.scala:62)
 at WebLogAnalysis.main(WebLogAnalysis.scala)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at

 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
 at

 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:606)
 at

 org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:664)
 at
 org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:169)
 at
 org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:192)
 at
 org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:111)
 at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

 I can provide more code or log if that will help. Let me know.

 Srikanth



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-4-RDD-to-DF-fails-with-toDF-tp23499.html
 Sent from the Apache Spark User List mailing list archive at
 Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org








Re: Spark 1.4 RDD to DF fails with toDF()

2015-06-26 Thread Srikanth
Thanks Akhil for checking this out. Here is my build.sbt.

name := Weblog Analysis

version := 1.0

scalaVersion := 2.11.5

javacOptions ++= Seq(-source, 1.7, -target, 1.7)

libraryDependencies ++= Seq(
  org.apache.spark %% spark-core % 1.4.0 % provided,
  org.apache.spark %% spark-sql % 1.4.0,
  org.apache.spark %% spark-streaming % 1.4.0,
  org.apache.spark %% spark-streaming-kafka % 1.4.0,
  org.apache.spark %% spark-mllib % 1.4.0,
  org.apache.commons % commons-lang3 % 3.0,
  org.eclipse.jetty  % jetty-client % 8.1.14.v20131031,
  org.scalatest %% scalatest % 2.2.1 % test,
  com.databricks % spark-csv_2.11 % 1.0.3,
  joda-time % joda-time % 2.8.1,
  org.joda  % joda-convert % 1.7
)

resolvers ++= Seq(
  Sonatype OSS Snapshots  at 
http://oss.sonatype.org/content/repositories/snapshots/;,
  Sonatype public at 
http://oss.sonatype.org/content/groups/public/;,
  Sonatype at 
http://nexus.scala-tools.org/content/repositories/public;,
  Scala Tools at http://scala-tools.org/repo-snapshots/;,
  Typesafeat http://repo.typesafe.com/typesafe/releases/
,
  Akka at http://akka.io/repository/;,
  JBoss   at 
http://repository.jboss.org/nexus/content/groups/public/;,
  GuiceyFruit at http://guiceyfruit.googlecode.com/svn/repo/releases/;
)

On Fri, Jun 26, 2015 at 4:13 AM, Akhil Das ak...@sigmoidanalytics.com
wrote:

 Its a scala version conflict, can you paste your build.sbt file?

 Thanks
 Best Regards

 On Fri, Jun 26, 2015 at 7:05 AM, stati srikanth...@gmail.com wrote:

 Hello,

 When I run a spark job with spark-submit it fails with below exception for
 code line
/*val webLogDF = webLogRec.toDF().select(ip, date, name)*/

 I had similar issue running from spark-shell, then realized that I needed
 sqlContext.implicit._
 Now my code has the following imports
 /*
  import org.apache.spark._
  import org.apache.spark.sql._
  import org.apache.spark.sql.functions._
  val sqlContext = new SQLContext(sc)
  import sqlContext.implicits._
 */

 Code works fine from spark-shell REPL. It also runs fine when run in local
 mode from Eclipse. I get this
 error only when I submit to cluster using spark-submit.
 bin/spark-submit /local/weblog-analysis_2.11-1.0.jar --class
 WebLogAnalysis
 --master spark://machu:7077

 I'm testing with spark 1.4. My code was built using scala 2.11 and
 spark+sparkSQL 1.4.0 as dependency in build.sbt

 Exception in thread main java.lang.NoSuchMethodError:

 scala.reflect.api.JavaUniverse.runtimeMirror(Ljava/lang/ClassLoader;)Lscala/reflect/api/JavaUniverse$JavaMirror;
 at WebLogAnalysis$.readWebLogFiles(WebLogAnalysis.scala:38)
 at WebLogAnalysis$.main(WebLogAnalysis.scala:62)
 at WebLogAnalysis.main(WebLogAnalysis.scala)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at

 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
 at

 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:606)
 at

 org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:664)
 at
 org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:169)
 at
 org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:192)
 at
 org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:111)
 at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

 I can provide more code or log if that will help. Let me know.

 Srikanth



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-4-RDD-to-DF-fails-with-toDF-tp23499.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org





Re: java.lang.OutOfMemoryError: PermGen space

2015-06-24 Thread Srikanth
That worked. Thanks!

I wonder what changed in 1.4 to cause this. It wouldn't work with anything
less than 256m for a simple piece of code.
1.3.1 used to work with default(64m I think)

Srikanth

On Wed, Jun 24, 2015 at 12:47 PM, Roberto Coluccio 
roberto.coluc...@gmail.com wrote:

 Did you try to pass it with

 --driver-java-options -XX:MaxPermSize=256m

 as spark-shell input argument?


 Roberto


 On Wed, Jun 24, 2015 at 5:57 PM, stati srikanth...@gmail.com wrote:

 Hello,

 I moved from 1.3.1 to 1.4.0 and started receiving
 java.lang.OutOfMemoryError: PermGen space  when I use spark-shell.
 Same Scala code works fine in 1.3.1 spark-shell. I was loading same set of
 external JARs and have same imports in 1.3.1.

 I tried increasing perm size to 256m. I still got OOM.

 /SPARK_REPL_OPTS=-XX:MaxPermSize=256m bin/spark-shell  --master
 spark://machu:7077 --total-executor-cores 12  --packages
 com.databricks:spark-csv_2.10:1.0.3 --packages joda-time:joda-time:2.8.1
 /

 Spark UI Environment tab didn't show -XX:MaxPermSize. I'm not sure if
 this config was picked up.
 This is standalone mode.

 Any pointers to next step?





 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/java-lang-OutOfMemoryError-PermGen-space-tp23472.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org