Regarding structured streaming windows on older data

2019-12-23 Thread Hemant Bhanawat
For demonstration purpose, I was using data that had older timestamps with
structured streaming. The data was for the year 2018, window was of 24
hours and watermark of 0 seconds. Few things that I saw and could not
explain are:
1. The initial batch of streaming had around 60 windows. It processed all
but the last one.
2. The data for a window is not sent to the writer immediately.
3. If I ingest data for 2019 in the midway, it is not processed. In fact,
spark didnt output the 2019 data at all.

Can someone point me to some doc or explanation on how the structured
streaming works with data that has non current timestamps?

Thanks in advance,
Hemant


Re: How to specify file

2016-09-23 Thread Hemant Bhanawat
Check out the READEME on the following page. This is the csv connector that
you are using. I think you need to specify the delimiter option.

https://github.com/databricks/spark-csv

Hemant Bhanawat <https://www.linkedin.com/in/hemant-bhanawat-92a3811>
www.snappydata.io

On Fri, Sep 23, 2016 at 12:26 PM, Sea <261810...@qq.com> wrote:

> Hi, I want to run sql directly on files, I find that spark has supported
> sql like select * from csv.`/path/to/file`, but files may not be split by
> ','. Maybe it is split by '\001', how can I specify delimiter?
>
> Thank you!
>
>
>


Memory usage by Spark jobs

2016-09-22 Thread Hemant Bhanawat
I am working on profiling TPCH queries for Spark 2.0.  I see lot of
temporary object creation (sometimes size as much as the data size) which
is justified for the kind of processing Spark does. But, from production
perspective, is there a guideline on how much memory should be allocated
for processing a specific data size of let's say parquet data? Also, has
someone investigated memory usage for the individual SQL operators like
Filter, group by, order by, Exchange etc.?

Hemant Bhanawat <https://www.linkedin.com/in/hemant-bhanawat-92a3811>
www.snappydata.io


Re: How to process one partition at a time?

2016-04-06 Thread Hemant Bhanawat
Apparently, there is another way to do it. You can try creating a
PartitionPruningRDD and pass a partition filter function to it. This RDD
will do the same thing that I suggested in my mail and you will not have to
create a new RDD.

Hemant Bhanawat <https://www.linkedin.com/in/hemant-bhanawat-92a3811>
www.snappydata.io

On Wed, Apr 6, 2016 at 5:35 PM, Sun, Rui <rui@intel.com> wrote:

> Maybe you can try SparkContext.submitJob:
>
> *def **submitJob**[T, U, R](rdd: RDD
> <http://spark.apache.org/docs/latest/api/scala/org/apache/spark/rdd/RDD.html>[T],
>  processPartition:
> (Iterator[T]) **⇒** U, partitions: Seq[Int], resultHandler: (Int, U) **⇒** 
> Unit, resultFunc:
> **⇒** R): SimpleFutureAction
> <http://spark.apache.org/docs/latest/api/scala/org/apache/spark/SimpleFutureAction.html>[R]*
>
>
>
>
>
> *From:* Hemant Bhanawat [mailto:hemant9...@gmail.com]
> *Sent:* Wednesday, April 6, 2016 7:16 PM
> *To:* Andrei <faithlessfri...@gmail.com>
> *Cc:* user <user@spark.apache.org>
> *Subject:* Re: How to process one partition at a time?
>
>
>
> Instead of doing it in compute, you could rather override getPartitions
> method of your RDD and return only the target partitions. This way tasks
> for only target partitions will be created. Currently in your case, tasks
> for all the partitions are getting created.
>
> I hope it helps. I would like to hear if you take some other approach.
>
>
> Hemant Bhanawat <https://www.linkedin.com/in/hemant-bhanawat-92a3811>
>
> www.snappydata.io
>
>
>
> On Wed, Apr 6, 2016 at 3:49 PM, Andrei <faithlessfri...@gmail.com> wrote:
>
> I'm writing a kind of sampler which in most cases will require only 1
> partition, sometimes 2 and very rarely more. So it doesn't make sense to
> process all partitions in parallel. What is the easiest way to limit
> computations to one partition only?
>
>
>
> So far the best idea I came to is to create a custom partition whose
> `compute` method looks something like:
>
>
>
> def compute(split: Partition, context: TaskContext) = {
>
> if (split.index == targetPartition) {
>
> // do computation
>
> } else {
>
>// return empty iterator
>
> }
>
> }
>
>
>
>
>
> But it's quite ugly and I'm unlikely to be the first person with such a
> need. Is there easier way to do it?
>
>
>
>
>
>
>


Re: Executor shutdown hooks?

2016-04-06 Thread Hemant Bhanawat
As part of PR https://github.com/apache/spark/pull/11723, I have added a
killAllTasks function that can be used to kill (rather interrupt)
individual tasks before an executor exits. If this PR is accepted, for
doing task level cleanups, we can add a call to this function before
executor exits. The exit thread will wait for a certain period of time
before the executor jvm exits to allow proper cleanups of the tasks.

Hemant Bhanawat <https://www.linkedin.com/in/hemant-bhanawat-92a3811>
www.snappydata.io

On Thu, Apr 7, 2016 at 6:08 AM, Reynold Xin <r...@databricks.com> wrote:

>
> On Wed, Apr 6, 2016 at 4:39 PM, Sung Hwan Chung <coded...@cs.stanford.edu>
> wrote:
>
>> My option so far seems to be using JVM's shutdown hook, but I was
>> wondering if Spark itself had an API for tasks.
>>
>
> Spark would be using that under the hood anyway, so you might as well just
> use the jvm shutdown hook directly.
>
>


Re: How to process one partition at a time?

2016-04-06 Thread Hemant Bhanawat
Instead of doing it in compute, you could rather override getPartitions
method of your RDD and return only the target partitions. This way tasks
for only target partitions will be created. Currently in your case, tasks
for all the partitions are getting created.

I hope it helps. I would like to hear if you take some other approach.


Hemant Bhanawat <https://www.linkedin.com/in/hemant-bhanawat-92a3811>
www.snappydata.io

On Wed, Apr 6, 2016 at 3:49 PM, Andrei <faithlessfri...@gmail.com> wrote:

> I'm writing a kind of sampler which in most cases will require only 1
> partition, sometimes 2 and very rarely more. So it doesn't make sense to
> process all partitions in parallel. What is the easiest way to limit
> computations to one partition only?
>
> So far the best idea I came to is to create a custom partition whose
> `compute` method looks something like:
>
> def compute(split: Partition, context: TaskContext) = {
> if (split.index == targetPartition) {
> // do computation
> } else {
>// return empty iterator
> }
> }
>
>
>
> But it's quite ugly and I'm unlikely to be the first person with such a
> need. Is there easier way to do it?
>
>
>


Re: SPARK-13900 - Join with simple OR conditions take too long

2016-04-01 Thread Hemant Bhanawat
As Mich has already noticed, Spark defaults to NL join if there are more
than one condition. Oracle is probably doing cost-based optimizations in
this scenario. You can call it a bug but in my opinion it is an area where
Spark is still evolving.

>> Hemant has mentioned the nested loop time will be very little.
I had mentioned that NL time will *vary *little with more number of
conditions.  What I meant was that instead of 3 conditions if you would
have 15 conditions, the NL loop would still take 13-15 mins while the hash
join would take more than that.

Hemant

Hemant Bhanawat <https://www.linkedin.com/in/hemant-bhanawat-92a3811>
www.snappydata.io

On Fri, Apr 1, 2016 at 3:08 PM, ashokkumar rajendran <
ashokkumar.rajend...@gmail.com> wrote:

> Hi Mich,
>
> Thanks for the input.
>
> Yes, it seems to be a bug. Is it possible to fix this in next release?
>
> Regards
> Ashok
>
> On Fri, Apr 1, 2016 at 2:06 PM, Mich Talebzadeh <mich.talebza...@gmail.com
> > wrote:
>
>> hm.
>>
>> Sounds like it ends up in Nested Loop Join (NLJ) as opposed to Hash Join
>> (HJ) when OR  is used for more than one predicate comparison.
>>
>> In below I have a table dummy created as ORC with 1 billion rows. Just
>> created another one called dummy1 with 60K rows
>>
>> A simple join results in Hash Join good!
>>
>> scala> sql("select d.id, d1.id from dummy d, dummy2 d1 where
>> d.random_string = d1.random_string").explain(true)
>>
>> == Physical Plan ==
>> Project [id#212,id#219]
>>
>> *+- BroadcastHashJoin [random_string#216], [random_string#223],
>> BuildRight*   :- ConvertToUnsafe
>>:  +- HiveTableScan [id#212,random_string#216], MetastoreRelation
>> test, dummy, Some(d)
>>+- ConvertToUnsafe
>>   +- HiveTableScan [id#219,random_string#223], MetastoreRelation
>> test, dummy2, Some(d1)
>>
>> When the join is done using OR on other predicates I see it starts doing
>> NLJ
>>
>> scala> sql("select d.id, d1.id from dummy d, dummy2 d1 where
>> d.random_string = d1.random_string OR d.small_vc =
>> d1.small_vc").explain(true)
>>
>> == Physical Plan ==
>> Project [id#241,id#248]
>> +- B*roadcastNestedLoopJoin *BuildRight, Inner, Some(((random_string#245
>> = random_string#252) || (small_vc#246 = small_vc#253)))
>>:- HiveTableScan [small_vc#246,id#241,random_string#245],
>> MetastoreRelation test, dummy, Some(d)
>>+- HiveTableScan [id#248,random_string#252,small_vc#253],
>> MetastoreRelation test, dummy2, Some(d1)
>>
>> in contrast the same identical tables in Oracle use Hash Join with OR
>> which is expected
>>
>> scratch...@mydb.mich.LOCAL> select d.id, d1.id from dummy d, dummy2 d1
>> where d.random_string = d1.random_string OR d.small_vc = d1.small_vc;
>>
>> Execution Plan
>> --
>> Plan hash value: 4163534687
>>
>> --
>> | Id  | Operation   | Name   | Rows  | Bytes |TempSpc| Cost
>> (%CPU)| Time |
>>
>> --
>> |   0 | SELECT STATEMENT|| 63207 |  8332K|   |  1280K
>> (1)| 04:16:05 |
>> |   1 |  CONCATENATION  ||   |   |   |
>> |  |
>> |*  2 |  * HASH JOIN *|| 60183 |  7934K|  4632K|   640K
>> (1)| 02:08:03 |
>> |   3 |TABLE ACCESS FULL| DUMMY2 | 6 |  3925K|   |   157
>> (1)| 00:00:02 |
>> |   4 |TABLE ACCESS FULL| DUMMY  |   100M|  6484M|   |   261K
>> (1)| 00:52:13 |
>> |*  5 |   *HASH JOIN *||  3024 |   398K|  4632K|   640K
>> (1)| 02:08:03 |
>> |   6 |TABLE ACCESS FULL| DUMMY2 | 6 |  3925K|   |   157
>> (1)| 00:00:02 |
>> |   7 |TABLE ACCESS FULL| DUMMY  |   100M|  6484M|   |   261K
>> (1)| 00:52:13 |
>>
>> --
>>
>> So this looks like a bug!
>>
>>
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn * 
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>>
>> On 1 April 2016 at 04:53, ashokkumar rajendran <
>> ashokkumar.rajend...@gmail.com> wrote:
>>
>>>

Re: SPARK-13900 - Join with simple OR conditions take too long

2016-03-31 Thread Hemant Bhanawat
Hi Ashok,

That's interesting.

As I understand, on table A and B, a nested loop join (that will produce m
X n rows) is performed and than each row is evaluated to see if any of the
condition is met. You are asking that Spark should instead do a
BroadcastHashJoin on the equality conditions in parallel and then union the
results like you are doing in a different query.

If we leave aside parallelism for a moment, theoretically, time taken for
nested loop join would vary little when the number of conditions are
increased while the time taken for the solution that you are suggesting
would increase linearly with number of conditions. So, when number of
conditions are too many, nested loop join would be faster than the solution
that you suggest. Now the question is, how should Spark decide when to do
what?


Hemant Bhanawat <https://www.linkedin.com/in/hemant-bhanawat-92a3811>
www.snappydata.io

On Thu, Mar 31, 2016 at 2:28 PM, ashokkumar rajendran <
ashokkumar.rajend...@gmail.com> wrote:

> Hi,
>
> I have filed ticket SPARK-13900. There was an initial reply from a
> developer but did not get any reply on this. How can we do multiple hash
> joins together for OR conditions based joins? Could someone please guide on
> how can we fix this?
>
> Regards
> Ashok
>


Re: Can we use spark inside a web service?

2016-03-11 Thread Hemant Bhanawat
Spark-jobserver is an elegant product that builds concurrency on top of
Spark. But, the current design of DAGScheduler prevents Spark to become a
truly concurrent solution for low latency queries. DagScheduler will turn
out to be a bottleneck for low latency queries. Sparrow project was an
effort to make Spark more suitable for such scenarios but it never made it
to the Spark codebase. If Spark has to become a highly concurrent solution,
scheduling has to be distributed.

Hemant Bhanawat <https://www.linkedin.com/in/hemant-bhanawat-92a3811>
www.snappydata.io

On Fri, Mar 11, 2016 at 7:02 AM, Chris Fregly <ch...@fregly.com> wrote:

> great discussion, indeed.
>
> Mark Hamstra and i spoke offline just now.
>
> Below is a quick recap of our discussion on how they've achieved
> acceptable performance from Spark on the user request/response path (@mark-
> feel free to correct/comment).
>
> 1) there is a big difference in request/response latency between
> submitting a full Spark Application (heavy weight) versus having a
> long-running Spark Application (like Spark Job Server) that submits
> lighter-weight Jobs using a shared SparkContext.  mark is obviously using
> the latter - a long-running Spark App.
>
> 2) there are some enhancements to Spark that are required to achieve
> acceptable user request/response times.  some links that Mark provided are
> as follows:
>
>- https://issues.apache.org/jira/browse/SPARK-11838
>- https://github.com/apache/spark/pull/11036
>- https://github.com/apache/spark/pull/11403
>- https://issues.apache.org/jira/browse/SPARK-13523
>- https://issues.apache.org/jira/browse/SPARK-13756
>
> Essentially, a deeper level of caching at the shuffle file layer to reduce
> compute and memory between queries.
>
> Note that Mark is running a slightly-modified version of stock Spark.
>  (He's mentioned this in prior posts, as well.)
>
> And I have to say that I'm, personally, seeing more and more
> slightly-modified versions of Spark being deployed to production to
> workaround outstanding PR's and Jiras.
>
> this may not be what people want to hear, but it's a trend that i'm seeing
> lately as more and more customize Spark to their specific use cases.
>
> Anyway, thanks for the good discussion, everyone!  This is why we have
> these lists, right!  :)
>
>
> On Thu, Mar 10, 2016 at 7:51 PM, Evan Chan <velvia.git...@gmail.com>
> wrote:
>
>> One of the premises here is that if you can restrict your workload to
>> fewer cores - which is easier with FiloDB and careful data modeling -
>> you can make this work for much higher concurrency and lower latency
>> than most typical Spark use cases.
>>
>> The reason why it typically does not work in production is that most
>> people are using HDFS and files.  These data sources are designed for
>> running queries and workloads on all your cores across many workers,
>> and not for filtering your workload down to only one or two cores.
>>
>> There is actually nothing inherent in Spark that prevents people from
>> using it as an app server.   However, the insistence on using it with
>> HDFS is what kills concurrency.   This is why FiloDB is important.
>>
>> I agree there are more optimized stacks for running app servers, but
>> the choices that you mentioned:  ES is targeted at text search;  Cass
>> and HBase by themselves are not fast enough for analytical queries
>> that the OP wants;  and MySQL is great but not scalable.   Probably
>> something like VectorWise, HANA, Vertica would work well, but those
>> are mostly not free solutions.   Druid could work too if the use case
>> is right.
>>
>> Anyways, great discussion!
>>
>> On Thu, Mar 10, 2016 at 2:46 PM, Chris Fregly <ch...@fregly.com> wrote:
>> > you are correct, mark.  i misspoke.  apologies for the confusion.
>> >
>> > so the problem is even worse given that a typical job requires multiple
>> > tasks/cores.
>> >
>> > i have yet to see this particular architecture work in production.  i
>> would
>> > love for someone to prove otherwise.
>> >
>> > On Thu, Mar 10, 2016 at 5:44 PM, Mark Hamstra <m...@clearstorydata.com>
>> > wrote:
>> >>>
>> >>> For example, if you're looking to scale out to 1000 concurrent
>> requests,
>> >>> this is 1000 concurrent Spark jobs.  This would require a cluster
>> with 1000
>> >>> cores.
>> >>
>> >>
>> >> This doesn't make sense.  A Spark Job is a driver/DAGScheduler concept
>> >> without any 1:1 correspondence between Worker cores and J

Re: S3 Zip File Loading Advice

2016-03-08 Thread Hemant Bhanawat
https://issues.apache.org/jira/browse/SPARK-3586 talks about creating a
file dstream which can monitor for new files recursively but this
functionality is not yet added.

I don't see an easy way out. You will have to create your folders based on
timeline (looks like you are already doing that) and running a new job over
the new folders created in an interval.  This will have to be an automated
using an external script.

Hemant Bhanawat <https://www.linkedin.com/in/hemant-bhanawat-92a3811>
www.snappydata.io

On Wed, Mar 9, 2016 at 10:33 AM, Benjamin Kim <bbuil...@gmail.com> wrote:

> I am wondering if anyone can help.
>
> Our company stores zipped CSV files in S3, which has been a big headache
> from the start. I was wondering if anyone has created a way to iterate
> through several subdirectories (s3n://events/2016/03/01/00,
> s3n://2016/03/01/01, etc.) in S3 to find the newest files and load them. It
> would be a big bonus to include the unzipping of the file in the process so
> that the CSV can be loaded directly into a dataframe for further
> processing. I’m pretty sure that the S3 part of this request is not
> uncommon. I would think the file being zipped is uncommon. If anyone can
> help, I would truly be grateful for I am new to Scala and Spark. This would
> be a great help in learning.
>
> Thanks,
> Ben
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Spark Streaming - graceful shutdown when stream has no more data

2016-02-23 Thread Hemant Bhanawat
A guess - parseRecord is returning None in some case (probaly empty lines).
And then entry.get is throwing the exception.

You may want to filter the None values from accessLogDStream before you run
the map function over it.

Hemant

Hemant Bhanawat <https://www.linkedin.com/in/hemant-bhanawat-92a3811>
www.snappydata.io

On Tue, Feb 23, 2016 at 6:00 PM, Ted Yu <yuzhih...@gmail.com> wrote:

> Which line is line 42 in your code ?
>
> When variable lines becomes empty, you can stop your program.
>
> Cheers
>
> On Feb 23, 2016, at 12:25 AM, Femi Anthony <femib...@gmail.com> wrote:
>
> I am working on Spark Streaming API and I wish to stream a set of
> pre-downloaded web log files continuously to simulate a real-time stream. I
> wrote a script that gunzips the compressed logs and pipes the output to nc
> on port .
>
> The script looks like this:
>
> BASEDIR=/home/mysuer/data/datamining/internet_traffic_archive
> zipped_files=`find $BASEDIR -name "*.gz"`
>
> for zfile in $zipped_files
>  do
>   echo "Unzipping $zfile..."
>   gunzip -c $zfile  | nc -l -p  -q 20
>
>  done
>
> I have streaming code written in Scala that processes the streams. It
> works well for the most part, but when its run out of files to stream I get
> the following error in Spark:
>
> 16/02/19 23:04:35 WARN ReceiverSupervisorImpl:
> Restarting receiver with delay 2000 ms: Socket data stream had no more data
> 16/02/19 23:04:35 ERROR ReceiverTracker: Deregistered receiver for stream 0:
> Restarting receiver with delay 2000ms: Socket data stream had no more data
> 16/02/19 23:04:35 WARN BlockManager: Block input-0-1455941075600 replicated 
> to only 0 peer(s) instead of 1 peers
> 
> 16/02/19 23:04:40 ERROR Executor: Exception in task 2.0 in stage 15.0 (TID 47)
> java.util.NoSuchElementException: None.get
> at scala.None$.get(Option.scala:313)
> at scala.None$.get(Option.scala:311)
> at 
> com.femibyte.learningsparkaddexamples.scala.StreamingLogEnhanced$$anonfun$2.apply(StreamingLogEnhanced.scala:42)
> at 
> com.femibyte.learningsparkaddexamples.scala.StreamingLogEnhanced$$anonfun$2.apply(StreamingLogEnhanced.scala:42)
>
> How to I implement a graceful shutdown so that the program exits
> gracefully when it no longer detects any data in the stream ?
>
> My Spark Streaming code looks like this:
>
> object StreamingLogEnhanced {
>  def main(args: Array[String]) {
>   val master = args(0)
>   val conf = new
>  SparkConf().setMaster(master).setAppName("StreamingLogEnhanced")
>  // Create a StreamingContext with a n second batch size
>   val ssc = new StreamingContext(conf, Seconds(10))
>  // Create a DStream from all the input on port 
>   val log = Logger.getLogger(getClass.getName)
>
>   sys.ShutdownHookThread {
>   log.info("Gracefully stopping Spark Streaming Application")
>   ssc.stop(true, true)
>   log.info("Application stopped")
>   }
>   val lines = ssc.socketTextStream("localhost", )
>   // Create a count of log hits by ip
>   var ipCounts=countByIp(lines)
>   ipCounts.print()
>
>   // start our streaming context and wait for it to "finish"
>   ssc.start()
>   // Wait for 600 seconds then exit
>   ssc.awaitTermination(1*600)
>   ssc.stop()
>   }
>
>  def countByIp(lines: DStream[String]) = {
>val parser = new AccessLogParser
>val accessLogDStream = lines.map(line => parser.parseRecord(line))
>val ipDStream = accessLogDStream.map(entry =>
> (entry.get.clientIpAddress, 1))
>ipDStream.reduceByKey((x, y) => x + y)
>  }
>
> }
>
> Thanks for any suggestions in advance.
>
>


Re: Specify number of executors in standalone cluster mode

2016-02-21 Thread Hemant Bhanawat
Max number of cores per executor can be controlled using
spark.executor.cores. And maximum number of executors on a single worker
can be determined by environment variable: SPARK_WORKER_INSTANCES.

However, to ensure that all available cores are used, you will have to take
care of how the stream is partitioned. Copy pasting help text of Spark.



*The number of tasks per receiver per batch will be approximately (batch
interval / block interval). For example, block interval of 200 ms will
create 10 tasks per 2 second batches. If the number of tasks is too low
(that is, less than the number of cores per machine), then it will be
inefficient as all available cores will not be used to process the data. To
increase the number of tasks for a given batch interval, reduce the block
interval. However, the recommended minimum value of block interval is about
50 ms, below which the task launching overheads may be a problem.An
alternative to receiving data with multiple input streams / receivers is to
explicitly repartition the input data stream (using
inputStream.repartition()). This distributes the
received batches of data across the specified number of machines in the
cluster before further processing.*

Hemant Bhanawat <https://www.linkedin.com/in/hemant-bhanawat-92a3811>
www.snappydata.io

On Sun, Feb 21, 2016 at 11:01 PM, Saiph Kappa <saiph.ka...@gmail.com> wrote:

> Hi,
>
> I'm running a spark streaming application onto a spark cluster that spans
> 6 machines/workers. I'm using spark cluster standalone mode. Each machine
> has 8 cores. Is there any way to specify that I want to run my application
> on all 6 machines and just use 2 cores on each machine?
>
> Thanks
>


Re: Behind the scene of RDD to DataFrame

2016-02-20 Thread Hemant Bhanawat
toDF internally calls sqlcontext.createDataFrame which transforms the RDD
to RDD[InternalRow]. This RDD[InternalRow] is then mapped to a dataframe.

Type conversions (from scala types to catalyst types) are involved but no
shuffling.

Hemant Bhanawat <https://www.linkedin.com/in/hemant-bhanawat-92a3811>
www.snappydata.io

On Sun, Feb 21, 2016 at 11:48 AM, Weiwei Zhang <wzhan...@dons.usfca.edu>
wrote:

> Hi there,
>
> Could someone explain to me what is behind the scene of rdd.toDF()? More
> importantly, will this step involve a lot of shuffles and cause the surge
> of the size of intermediate files? Thank you.
>
> Best Regards,
> Vivian
>


Re: spark stages in parallel

2016-02-20 Thread Hemant Bhanawat
Not possible as of today. See
https://issues.apache.org/jira/browse/SPARK-2387

Hemant Bhanawat
https://www.linkedin.com/in/hemant-bhanawat-92a3811
www.snappydata.io

On Thu, Feb 18, 2016 at 1:19 PM, Shushant Arora <shushantaror...@gmail.com>
wrote:

> can two stages of single job run in parallel in spark?
>
> e.g one stage is ,map transformation and another is repartition on mapped
> rdd.
>
> rdd.map(function,100).repartition(30);
>
> can it happen that map transformation which is running 100 tasks after few
> of them say (10 )  are finished and spark started another stage repartition
> which started copying data from mapped stage nodes in parallel.
>
> Thanks
>


Re: Optimal way to re-partition from a single partition

2016-02-09 Thread Hemant Bhanawat
For sql shuffle operations like groupby, the number of output partitions is
controlled by spark.sql.shuffle.partitions. But, it seems orderBy does not
honour this.

In my small test, I could see that the number of partitions  in DF returned
by orderBy was equal to the total number of distinct keys. Are you
observing the same, I mean do you have a single value for all rows in the
column on which you are running orderBy? If yes, you are better off not
running the orderBy clause.

May be someone from spark sql team could answer that how should the
partitioning of the output DF be handled when doing an orderBy?

Hemant
www.snappydata.io
https://github.com/SnappyDataInc/snappydata




On Tue, Feb 9, 2016 at 4:00 AM, Cesar Flores  wrote:

>
> I have a data frame which I sort using orderBy function. This operation
> causes my data frame to go to a single partition. After using those
> results, I would like to re-partition to a larger number of partitions.
> Currently I am just doing:
>
> val rdd = df.rdd.coalesce(100, true) //df is a dataframe with a single
> partition and around 14 million records
> val newDF =  hc.createDataFrame(rdd, df.schema)
>
> This process is really slow. Is there any other way of achieving this
> task, or to optimize it (perhaps tweaking a spark configuration parameter)?
>
>
> Thanks a lot
> --
> Cesar Flores
>


Re: Optimal way to re-partition from a single partition

2016-02-09 Thread Hemant Bhanawat
Ohk. I was comparing groupBy with orderBy and now I realize that they are
using different partitioning schemes.

Thanks Takeshi.



On Tue, Feb 9, 2016 at 9:09 PM, Takeshi Yamamuro <linguin@gmail.com>
wrote:

> Hi,
>
> DataFrame#sort() uses `RangePartitioning` in `Exchange` instead of
> `HashPartitioning`.
> `RangePartitioning` roughly samples input data and internally computes
> partition bounds
> to split given rows into `spark.sql.shuffle.partitions` partitions.
> Therefore, when sort keys are highly skewed, I think some partitions could
> end up being empty
> (that is, # of result partitions is lower than `spark.sql.shuffle.partitions`
> .
>
>
> On Tue, Feb 9, 2016 at 9:35 PM, Hemant Bhanawat <hemant9...@gmail.com>
> wrote:
>
>> For sql shuffle operations like groupby, the number of output partitions
>> is controlled by spark.sql.shuffle.partitions. But, it seems orderBy does
>> not honour this.
>>
>> In my small test, I could see that the number of partitions  in DF
>> returned by orderBy was equal to the total number of distinct keys. Are you
>> observing the same, I mean do you have a single value for all rows in the
>> column on which you are running orderBy? If yes, you are better off not
>> running the orderBy clause.
>>
>> May be someone from spark sql team could answer that how should the
>> partitioning of the output DF be handled when doing an orderBy?
>>
>> Hemant
>> www.snappydata.io
>> https://github.com/SnappyDataInc/snappydata
>>
>>
>>
>>
>> On Tue, Feb 9, 2016 at 4:00 AM, Cesar Flores <ces...@gmail.com> wrote:
>>
>>>
>>> I have a data frame which I sort using orderBy function. This operation
>>> causes my data frame to go to a single partition. After using those
>>> results, I would like to re-partition to a larger number of partitions.
>>> Currently I am just doing:
>>>
>>> val rdd = df.rdd.coalesce(100, true) //df is a dataframe with a single
>>> partition and around 14 million records
>>> val newDF =  hc.createDataFrame(rdd, df.schema)
>>>
>>> This process is really slow. Is there any other way of achieving this
>>> task, or to optimize it (perhaps tweaking a spark configuration parameter)?
>>>
>>>
>>> Thanks a lot
>>> --
>>> Cesar Flores
>>>
>>
>>
>
>
> --
> ---
> Takeshi Yamamuro
>


Re: Spark Streaming with Druid?

2016-02-08 Thread Hemant Bhanawat
SnappyData's deployment is different that how Spark is deployed. See
http://snappydatainc.github.io/snappydata/deployment/ and
http://snappydatainc.github.io/snappydata/jobs/.

For further questions, you can join us on stackoverflow
http://stackoverflow.com/questions/tagged/snappydata.

Hemant


On Mon, Feb 8, 2016 at 10:04 PM, Umesh Kacha <umesh.ka...@gmail.com> wrote:

> Hi Hemant, thanks much can we use SnappyData on YARN. My Spark jobs run
> using yarn client mode. Please guide.
>
> On Mon, Feb 8, 2016 at 9:46 AM, Hemant Bhanawat <hemant9...@gmail.com>
> wrote:
>
>> You may want to have a look at spark druid project already in progress:
>> https://github.com/SparklineData/spark-druid-olap
>>
>> You can also have a look at SnappyData
>> <https://github.com/SnappyDataInc/snappydata>, which is a low latency
>> store tightly integrated with Spark, Spark SQL and Spark Streaming. You can
>> find the 0.1 Preview release's documentation here.
>> <http://snappydatainc.github.io/snappydata/>
>>
>> Disclaimer: I am a SnappyData engineer.
>>
>> Hemant
>> www.snappydata.io
>>
>>
>> On Sun, Feb 7, 2016 at 12:47 AM, unk1102 <umesh.ka...@gmail.com> wrote:
>>
>>> Hi did anybody tried Spark Streaming with Druid as low latency store?
>>> Combination seems powerful is it worth trying both together? Please guide
>>> and share your experience. I am after creating the best low latency
>>> streaming analytics.
>>>
>>>
>>>
>>> --
>>> View this message in context:
>>> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-with-Druid-tp26164.html
>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>
>>> -
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>
>>>
>>
>


Re: Spark Streaming with Druid?

2016-02-07 Thread Hemant Bhanawat
You may want to have a look at spark druid project already in progress:
https://github.com/SparklineData/spark-druid-olap

You can also have a look at SnappyData
, which is a low latency store
tightly integrated with Spark, Spark SQL and Spark Streaming. You can find
the 0.1 Preview release's documentation here.


Disclaimer: I am a SnappyData engineer.

Hemant
www.snappydata.io


On Sun, Feb 7, 2016 at 12:47 AM, unk1102  wrote:

> Hi did anybody tried Spark Streaming with Druid as low latency store?
> Combination seems powerful is it worth trying both together? Please guide
> and share your experience. I am after creating the best low latency
> streaming analytics.
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-with-Druid-tp26164.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: DataFrame First method is resulting different results in each iteration

2016-02-03 Thread Hemant Bhanawat
Missing order by?

Hemant Bhanawat
SnappyData (http://snappydata.io/)

On Wed, Feb 3, 2016 at 3:45 PM, satish chandra j <jsatishchan...@gmail.com>
wrote:

> HI All,
> I have data in a emp_df (DataFrame) as mentioned below:
>
> EmpId   Sal   DeptNo
> 001   100   10
> 002   120   20
> 003   130   10
> 004   140   20
> 005   150   10
>
> ordrd_emp_df = emp_df.orderBy($"DeptNo",$"Sal".desc)  which results as
> below:
>
> DeptNo  Sal   EmpId
> 10 150   005
> 10 130   003
> 10 100   001
> 20 140   004
> 20 120   002
>
> Now I want to pick highest paid EmpId of each DeptNo.,hence applied agg
> First method as below
>
>
> ordrd_emp_df.groupBy("DeptNo").agg($"DeptNo",first("EmpId").as("TopSal")).select($"DeptNo",$"TopSal")
>
> Expected output is DeptNo  TopSal
>   10005
>20   004
> But my output varies for each iteration such as
>
> First Iteration results as  Dept  TopSal
>   10 003
>20 004
>
> Secnd Iteration results as Dept  TopSal
>   10 005
>   20 004
>
> Third Iteration results as  Dept  TopSal
>   10 003
>   20 002
>
> Not sure why output varies on each iteration as no change in code and
> values in DataFrame
>
> Please let me know if any inputs on this
>
> Regards,
> Satish Chandra J
>


Re: DataFrame First method is resulting different results in each iteration

2016-02-03 Thread Hemant Bhanawat
Ahh.. missed that.

I see that you have used "first" function. 'first' returns the first row it
has found. On a single executor it may return the right results. But, on
multiple executors, it will return the first row of any of the executor
which may not be the first row when the results are combined.

I believe, if you change your query like this, you will get the right
results:

ordrd_emp_df.groupBy("DeptNo").
agg($"DeptNo", max("Sal").as("HighestSal"))

But as you can see, you get the highest Sal and not the EmpId with highest
Sal. For getting EmpId with highest Sal, you will have to change your query
to add filters or add subqueries. See the following thread:

http://stackoverflow.com/questions/6841605/get-top-1-row-of-each-group

Hemant Bhanawat
SnappyData (http://snappydata.io/)


On Wed, Feb 3, 2016 at 4:33 PM, satish chandra j <jsatishchan...@gmail.com>
wrote:

> Hi Hemant,
> My dataframe "ordrd_emd_df" consist data in order as I have applied oderBy
> in the first step
> And also tried having "orderBy" method before "groupBy" than also getting
> different results in each iteration
>
> Regards,
> Satish Chandra
>
>
> On Wed, Feb 3, 2016 at 4:28 PM, Hemant Bhanawat <hemant9...@gmail.com>
> wrote:
>
>> Missing order by?
>>
>> Hemant Bhanawat
>> SnappyData (http://snappydata.io/)
>>
>>
>> On Wed, Feb 3, 2016 at 3:45 PM, satish chandra j <
>> jsatishchan...@gmail.com> wrote:
>>
>>> HI All,
>>> I have data in a emp_df (DataFrame) as mentioned below:
>>>
>>> EmpId   Sal   DeptNo
>>> 001   100   10
>>> 002   120   20
>>> 003   130   10
>>> 004   140   20
>>> 005   150   10
>>>
>>> ordrd_emp_df = emp_df.orderBy($"DeptNo",$"Sal".desc)  which results as
>>> below:
>>>
>>> DeptNo  Sal   EmpId
>>> 10 150   005
>>> 10 130   003
>>> 10 100   001
>>> 20 140   004
>>> 20 120   002
>>>
>>> Now I want to pick highest paid EmpId of each DeptNo.,hence applied agg
>>> First method as below
>>>
>>>
>>> ordrd_emp_df.groupBy("DeptNo").agg($"DeptNo",first("EmpId").as("TopSal")).select($"DeptNo",$"TopSal")
>>>
>>> Expected output is DeptNo  TopSal
>>>   10005
>>>20   004
>>> But my output varies for each iteration such as
>>>
>>> First Iteration results as  Dept  TopSal
>>>   10 003
>>>20 004
>>>
>>> Secnd Iteration results as Dept  TopSal
>>>   10 005
>>>   20 004
>>>
>>> Third Iteration results as  Dept  TopSal
>>>   10 003
>>>   20 002
>>>
>>> Not sure why output varies on each iteration as no change in code and
>>> values in DataFrame
>>>
>>> Please let me know if any inputs on this
>>>
>>> Regards,
>>> Satish Chandra J
>>>
>>
>>
>


Re: GenericMutableRow and Row mismatch on Spark 1.5?

2015-10-07 Thread Hemant Bhanawat
Please find attached.



On Wed, Oct 7, 2015 at 7:36 PM, Ted Yu <yuzhih...@gmail.com> wrote:

> Hemant:
> Can you post the code snippet to the mailing list - other people would be
> interested.
>
> On Wed, Oct 7, 2015 at 5:50 AM, Hemant Bhanawat <hemant9...@gmail.com>
> wrote:
>
>> Will send you the code on your email id.
>>
>> On Wed, Oct 7, 2015 at 4:37 PM, Ophir Cohen <oph...@gmail.com> wrote:
>>
>>> Thanks!
>>> Can you check if you can provide example of the conversion?
>>>
>>>
>>> On Wed, Oct 7, 2015 at 2:05 PM, Hemant Bhanawat <hemant9...@gmail.com>
>>> wrote:
>>>
>>>> Oh, this is an internal class of our project and I had used it without
>>>> realizing the source.
>>>>
>>>> Anyway, the idea is to  wrap the InternalRow in a class that derives
>>>> from Row. When you implement the functions of the trait 'Row ', the type
>>>> conversions from Row types to InternalRow types has to be done for each of
>>>> the types. But, as I can see, the primitive types (apart from String) don't
>>>> need conversions. Map and Array would need some handling.
>>>>
>>>> I will check with the author of this code, I think this code can be
>>>> contributed to Spark.
>>>>
>>>> Hemant
>>>> www.snappydata.io
>>>> linkedin.com/company/snappydata
>>>>
>>>> On Wed, Oct 7, 2015 at 3:30 PM, Ophir Cohen <oph...@gmail.com> wrote:
>>>>
>>>>> From which jar WrappedInternalRow comes from?
>>>>> It seems that I can't find it.
>>>>>
>>>>> BTW
>>>>> What I'm trying to do now is to create scala array from the fields and
>>>>> than create Row out of that array.
>>>>> The problem is that I get types mismatches...
>>>>>
>>>>> On Wed, Oct 7, 2015 at 8:03 AM, Hemant Bhanawat <hemant9...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> An approach can be to wrap your MutableRow in WrappedInternalRow
>>>>>> which is a child class of Row.
>>>>>>
>>>>>> Hemant
>>>>>> www.snappydata.io
>>>>>> linkedin.com/company/snappydata
>>>>>>
>>>>>>
>>>>>> On Tue, Oct 6, 2015 at 3:21 PM, Ophir Cohen <oph...@gmail.com> wrote:
>>>>>>
>>>>>>> Hi Guys,
>>>>>>> I'm upgrading to Spark 1.5.
>>>>>>>
>>>>>>> In our previous version (Spark 1.3 but it was OK on 1.4 as well) we
>>>>>>> created GenericMutableRow
>>>>>>> (org.apache.spark.sql.catalyst.expressions.GenericMutableRow) and 
>>>>>>> return it
>>>>>>> as org.apache.spark.sql.Row
>>>>>>>
>>>>>>> Starting from Spark 1.5 GenericMutableRow isn't extends Row.
>>>>>>>
>>>>>>> What do you suggest to do?
>>>>>>> How can I convert GenericMutableRow to Row?
>>>>>>>
>>>>>>> Prompt answer will be highly appreciated!
>>>>>>> Thanks,
>>>>>>> Ophir
>>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>
/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.spark.sql.collection

import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
import org.apache.spark.sql.types.StructType
import org.apache.spark.unsafe.types.UTF8String

/**
 * Wraps an `InternalRow` to expose a `Row`
 */
final class WrappedInternalRow(override val schema: StructType,
val converters: Array[(InternalRow, Int)

Re: GenericMutableRow and Row mismatch on Spark 1.5?

2015-10-07 Thread Hemant Bhanawat
Oh, this is an internal class of our project and I had used it without
realizing the source.

Anyway, the idea is to  wrap the InternalRow in a class that derives from
Row. When you implement the functions of the trait 'Row ', the type
conversions from Row types to InternalRow types has to be done for each of
the types. But, as I can see, the primitive types (apart from String) don't
need conversions. Map and Array would need some handling.

I will check with the author of this code, I think this code can be
contributed to Spark.

Hemant
www.snappydata.io
linkedin.com/company/snappydata

On Wed, Oct 7, 2015 at 3:30 PM, Ophir Cohen <oph...@gmail.com> wrote:

> From which jar WrappedInternalRow comes from?
> It seems that I can't find it.
>
> BTW
> What I'm trying to do now is to create scala array from the fields and
> than create Row out of that array.
> The problem is that I get types mismatches...
>
> On Wed, Oct 7, 2015 at 8:03 AM, Hemant Bhanawat <hemant9...@gmail.com>
> wrote:
>
>> An approach can be to wrap your MutableRow in WrappedInternalRow which is
>> a child class of Row.
>>
>> Hemant
>> www.snappydata.io
>> linkedin.com/company/snappydata
>>
>>
>> On Tue, Oct 6, 2015 at 3:21 PM, Ophir Cohen <oph...@gmail.com> wrote:
>>
>>> Hi Guys,
>>> I'm upgrading to Spark 1.5.
>>>
>>> In our previous version (Spark 1.3 but it was OK on 1.4 as well) we
>>> created GenericMutableRow
>>> (org.apache.spark.sql.catalyst.expressions.GenericMutableRow) and return it
>>> as org.apache.spark.sql.Row
>>>
>>> Starting from Spark 1.5 GenericMutableRow isn't extends Row.
>>>
>>> What do you suggest to do?
>>> How can I convert GenericMutableRow to Row?
>>>
>>> Prompt answer will be highly appreciated!
>>> Thanks,
>>> Ophir
>>>
>>
>>
>


Re: GenericMutableRow and Row mismatch on Spark 1.5?

2015-10-07 Thread Hemant Bhanawat
Will send you the code on your email id.

On Wed, Oct 7, 2015 at 4:37 PM, Ophir Cohen <oph...@gmail.com> wrote:

> Thanks!
> Can you check if you can provide example of the conversion?
>
>
> On Wed, Oct 7, 2015 at 2:05 PM, Hemant Bhanawat <hemant9...@gmail.com>
> wrote:
>
>> Oh, this is an internal class of our project and I had used it without
>> realizing the source.
>>
>> Anyway, the idea is to  wrap the InternalRow in a class that derives from
>> Row. When you implement the functions of the trait 'Row ', the type
>> conversions from Row types to InternalRow types has to be done for each of
>> the types. But, as I can see, the primitive types (apart from String) don't
>> need conversions. Map and Array would need some handling.
>>
>> I will check with the author of this code, I think this code can be
>> contributed to Spark.
>>
>> Hemant
>> www.snappydata.io
>> linkedin.com/company/snappydata
>>
>> On Wed, Oct 7, 2015 at 3:30 PM, Ophir Cohen <oph...@gmail.com> wrote:
>>
>>> From which jar WrappedInternalRow comes from?
>>> It seems that I can't find it.
>>>
>>> BTW
>>> What I'm trying to do now is to create scala array from the fields and
>>> than create Row out of that array.
>>> The problem is that I get types mismatches...
>>>
>>> On Wed, Oct 7, 2015 at 8:03 AM, Hemant Bhanawat <hemant9...@gmail.com>
>>> wrote:
>>>
>>>> An approach can be to wrap your MutableRow in WrappedInternalRow which
>>>> is a child class of Row.
>>>>
>>>> Hemant
>>>> www.snappydata.io
>>>> linkedin.com/company/snappydata
>>>>
>>>>
>>>> On Tue, Oct 6, 2015 at 3:21 PM, Ophir Cohen <oph...@gmail.com> wrote:
>>>>
>>>>> Hi Guys,
>>>>> I'm upgrading to Spark 1.5.
>>>>>
>>>>> In our previous version (Spark 1.3 but it was OK on 1.4 as well) we
>>>>> created GenericMutableRow
>>>>> (org.apache.spark.sql.catalyst.expressions.GenericMutableRow) and return 
>>>>> it
>>>>> as org.apache.spark.sql.Row
>>>>>
>>>>> Starting from Spark 1.5 GenericMutableRow isn't extends Row.
>>>>>
>>>>> What do you suggest to do?
>>>>> How can I convert GenericMutableRow to Row?
>>>>>
>>>>> Prompt answer will be highly appreciated!
>>>>> Thanks,
>>>>> Ophir
>>>>>
>>>>
>>>>
>>>
>>
>


Re: GenericMutableRow and Row mismatch on Spark 1.5?

2015-10-06 Thread Hemant Bhanawat
An approach can be to wrap your MutableRow in WrappedInternalRow which is a
child class of Row.

Hemant
www.snappydata.io
linkedin.com/company/snappydata


On Tue, Oct 6, 2015 at 3:21 PM, Ophir Cohen  wrote:

> Hi Guys,
> I'm upgrading to Spark 1.5.
>
> In our previous version (Spark 1.3 but it was OK on 1.4 as well) we
> created GenericMutableRow
> (org.apache.spark.sql.catalyst.expressions.GenericMutableRow) and return it
> as org.apache.spark.sql.Row
>
> Starting from Spark 1.5 GenericMutableRow isn't extends Row.
>
> What do you suggest to do?
> How can I convert GenericMutableRow to Row?
>
> Prompt answer will be highly appreciated!
> Thanks,
> Ophir
>


Re: [cache eviction] partition recomputation in big lineage RDDs

2015-10-01 Thread Hemant Bhanawat
As I understand, you don't need merge of  your historical data RDD with
your RDD_inc, what you need is merge of the computation results of the your
historical RDD with RDD_inc and so on.

IMO, you should consider having an external row store to hold your
computations. I say this because you need to update the rows of prior
computation based on the new data. Spark cached batches are column oriented
and any update to a spark cached batch is a costly op.


On Wed, Sep 30, 2015 at 10:59 PM, Nicolae Marasoiu <
nicolae.maras...@adswizz.com> wrote:

> Hi,
>
>
> An equivalent question would be: can the memory cache be selectively
> evicted from within a component run in the driver? I know it is breaking
> some abstraction/encapsulation, but clearly I need to evict part of the
> cache so that it is reloaded with newer values from DB.
>
>
> Because what I basically need is invalidating some portions of the data
> which have newer values. The "compute" method should be the same (read with
> TableInputFormat).
>
> Thanks
> Nicu
> --
> *From:* Nicolae Marasoiu 
> *Sent:* Wednesday, September 30, 2015 4:07 PM
> *To:* user@spark.apache.org
> *Subject:* Re: partition recomputation in big lineage RDDs
>
>
> Hi,
>
> In fact, my RDD will get a new version (a new RDD assigned to the same
> var) quite frequently, by merging bulks of 1000 events of events of last
> 10s.
>
> But recomputation would be more efficient to do not by reading initial RDD
> partition(s) and reapplying deltas, but by reading from HBase the latest
> data, and just compute on top of that if anything.
>
> Basically I guess I need to write my own RDD and implement compute method
> by sliding on hbase.
>
> Thanks,
> Nicu
> --
> *From:* Nicolae Marasoiu 
> *Sent:* Wednesday, September 30, 2015 3:05 PM
> *To:* user@spark.apache.org
> *Subject:* partition recomputation in big lineage RDDs
>
>
> Hi,
>
>
> If I implement a manner to have an up-to-date version of my RDD by
> ingesting some new events, called RDD_inc (from increment), and I provide a
> "merge" function m(RDD, RDD_inc), which returns the RDD_new, it looks like
> I can evolve the state of my RDD by constructing new RDDs all the time, and
> doing it in a manner that hopes to reuse as much data from the past RDD and
> make the rest garbage collectable. An example merge function would be a
> join on some ids, and creating a merged state for each element. The type of
> the result of m(RDD, RDD_inc) is the same type as that of RDD.
>
>
> My question on this is how does the recomputation work for such an RDD,
> which is not the direct result of hdfs load, but is the result of a long
> lineage of such functions/transformations:
>
>
> Lets say my RDD is now after 2 merge iterations like this:
>
> RDD_new = merge(merge(RDD, RDD_inc1), RDD_inc2)
>
>
> When recomputing a part of RDD_new here are my assumptions:
>
> - only full partitions are recomputed, nothing more granular?
>
> - the corresponding partitions of RDD, RDD_inc1 and RDD_inc2 are recomputed
>
> - the function are applied
>
>
> And this seems more simplistic, since the partitions do not fully align in
> the general case between all these RDDs. The other aspect is the
> potentially redundant load of data which is in fact not required anymore
> (the data ruled out in the merge).
>
>
> A more detailed version of this question is at
> https://www.quora.com/How-does-Spark-RDD-recomputation-avoids-duplicate-loading-or-computation/
>
>
> Thanks,
>
> Nicu
>


Re: flatmap() and spark performance

2015-09-28 Thread Hemant Bhanawat
You can use spark.executor.memory to specify the memory of the executors
which will  hold this intermediate results.

You may want to look at the section "Understanding Memory Management in
Spark" of this link:

https://databricks.com/blog/2015/05/28/tuning-java-garbage-collection-for-spark-applications.html


On Tue, Sep 29, 2015 at 10:51 AM, jeff saremi 
wrote:

> Is there anyway to let spark know ahead of time what size of RDD to expect
> as a result of a flatmap() operation?
> And would that help in terms of performance?
> For instance, if I have an RDD of 1million rows and I know that my
> flatMap() will produce 100million rows, is there a way to indicate that to
> Spark? to say "reserve" space for the resulting RDD?
>
> thanks
> Jeff
>


Re: caching DataFrames

2015-09-23 Thread Hemant Bhanawat
Two dataframes do not share cache storage in Spark. Hence it's immaterial
that how two dataFrames are related to each other. Both of them are going
to consume memory based on the data that they have.  So for your A1 and B1
you would need extra memory that would be equivalent to half the memory of
A/B.

You can check the storage that a dataFrame is consuming in the Spark UI's
Storage tab. http://host:4040/storage/



On Thu, Sep 24, 2015 at 5:37 AM, Zhang, Jingyu 
wrote:

> I have A and B DataFrames
> A has columns a11,a12, a21,a22
> B has columns b11,b12, b21,b22
>
> I persistent them in cache
> 1. A.Cache(),
> 2.  B.Cache()
>
> Then, I persistent the subset in cache later
>
> 3. DataFrame A1 (a11,a12).cache()
>
> 4. DataFrame B1 (b11,b12).cache()
>
> 5. DataFrame AB1 (a11,a12,b11,b12).cahce()
>
> Can you please tell me what happen for caching case (3,4, and 5) after A
> and B cached?
> How much  more memory do I need compare with Caching 1 and 2 only?
>
> Thanks
>
> Jingyu
>
> This message and its attachments may contain legally privileged or
> confidential information. It is intended solely for the named addressee. If
> you are not the addressee indicated in this message or responsible for
> delivery of the message to the addressee, you may not copy or deliver this
> message or its attachments to anyone. Rather, you should permanently delete
> this message and its attachments and kindly notify the sender by reply
> e-mail. Any content of this message and its attachments which does not
> relate to the official business of the sending company must be taken not to
> have been sent or endorsed by that company or any of its related entities.
> No warranty is made that the e-mail or attachments are free from computer
> virus or other defect.


Re: DataGenerator for streaming application

2015-09-21 Thread Hemant Bhanawat
Why are you using  rawSocketStream to read the data? I believe
rawSocketStream waits for a big chunk of data before it can start
processing it. I think what you are writing is a String and you should use
socketTextStream which reads the data on a per line basis.

On Sun, Sep 20, 2015 at 9:56 AM, Saiph Kappa  wrote:

> Hi,
>
> I am trying to build a data generator that feeds a streaming application.
> This data generator just reads a file and send its lines through a socket.
> I get no errors on the logs, and the benchmark bellow always prints
> "Received 0 records". Am I doing something wrong?
>
>
> object MyDataGenerator {
>
>   def main(args: Array[String]) {
> if (args.length != 3) {
>   System.err.println("Usage: RawTextSender   ")
>   System.exit(1)
> }
> // Parse the arguments using a pattern match
> val (port, file, sleepMillis) = (args(0).toInt, args(1), args(2).toInt)
>
> val serverSocket = new ServerSocket(port)
> println("Listening on port " + port)
>
>
> while (true) {
>   val socket = serverSocket.accept()
>   println("Got a new connection")
>
>
>   val out = new PrintWriter(socket.getOutputStream)
>   try {
> var count = 0
> var startTimestamp = -1
> for (line <- Source.fromFile(file).getLines()) {
>   val ts = line.substring(2, line.indexOf(',',2)).toInt
>   if(startTimestamp < 0)
> startTimestamp = ts
>
>   if(ts - startTimestamp <= 30) {
> out.println(line)
> count += 1
>   } else {
> println(s"Emmited reports: $count")
> count = 0
> out.flush()
> startTimestamp = ts
> Thread.sleep(sleepMillis)
>   }
> }
>   } catch {
> case e: IOException =>
>   println("Client disconnected")
>   socket.close()
>   }
> }
> }
> }
>
>
>
> object Benchmark {
>   def main(args: Array[String]) {
> if (args.length != 4) {
>   System.err.println("Usage: RawNetworkGrep
> ")
>   System.exit(1)
> }
>
> val (numStreams, host, port, batchMillis) = (args(0).toInt, args(1), 
> args(2).toInt, args(3).toInt)
> val sparkConf = new SparkConf()
> sparkConf.setAppName("BenchMark")
> 
> sparkConf.setJars(Array("target/scala-2.10/benchmark-app_2.10-0.1-SNAPSHOT.jar"))
> sparkConf.set("spark.serializer", 
> "org.apache.spark.serializer.KryoSerializer")
> sparkConf.set("spark.executor.extraJavaOptions", " -XX:+UseCompressedOops 
> -XX:+UseConcMarkSweepGC -XX:+AggressiveOpts -XX:FreqInlineSize=300 
> -XX:MaxInlineSize=300 ")
> if (sparkConf.getOption("spark.master") == None) {
>   // Master not set, as this was not launched through Spark-submit. 
> Setting master as local."
>   sparkConf.setMaster("local[*]")
> }
>
> // Create the context
> val ssc = new StreamingContext(sparkConf, Duration(batchMillis))
>
> val rawStreams = (1 to numStreams).map(_ =>
>   ssc.rawSocketStream[String](host, port, 
> StorageLevel.MEMORY_ONLY_SER)).toArray
> val union = ssc.union(rawStreams)
> union.count().map(c => s"Received $c records").print()
> ssc.start()
> ssc.awaitTermination()
>   }
> }
>
> Thanks.
>
>


Re: Why are executors on slave never used?

2015-09-21 Thread Hemant Bhanawat
When you specify master as local[2], it starts the spark components in a
single jvm. You need to specify the master correctly.
I have a default AWS EMR cluster (1 master, 1 slave) with Spark. When I run
a Spark process, it works fine -- but only on the master, as if it were
standalone.

The web-UI and logging code shows only 1 executor, the localhost.

How can I diagnose this?

(I create *SparkConf, *in Python, with *setMaster('local[2]'). )*

(Strangely, though I don't think that this causes the problem, there is
almost nothing spark-related on the slave machine:* /usr/lib/spark *has a
few jars, but that's it:  *datanucleus-api-jdo.jar  datanucleus-core.jar
 datanucleus-rdbms.jar  spark-yarn-shuffle.jar. *But this is an AWS EMR
cluster as created by* create-cluster*, so I would assume that the slave
and master are configured OK out-of the box.)

Joshua


Re: A way to timeout and terminate a laggard 'Stage' ?

2015-09-17 Thread Hemant Bhanawat
Driver timing out laggards seems like a reasonable way of handling
laggards. Are there any challenges because of which driver does not do it
today? Is there a JIRA for this? I couldn't find one.





On Tue, Sep 15, 2015 at 12:07 PM, Akhil Das 
wrote:

> As of now i think its a no. Not sure if its a naive approach, but yes you
> can have a separate program to keep an eye in the webui (possibly parsing
> the content) and make it trigger the kill task/job once it detects a lag.
> (Again you will have to figure out the correct numbers before killing any
> job)
>
> Thanks
> Best Regards
>
> On Mon, Sep 14, 2015 at 10:40 PM, Dmitry Goldenberg <
> dgoldenberg...@gmail.com> wrote:
>
>> Is there a way in Spark to automatically terminate laggard "stage's",
>> ones that appear to be hanging?   In other words, is there a timeout for
>> processing of a given RDD?
>>
>> In the Spark GUI, I see the "kill" function for a given Stage under
>> 'Details for Job <...>".
>>
>> Is there something in Spark that would identify and kill laggards
>> proactively?
>>
>> Thanks.
>>
>
>


Re: Difference between sparkDriver and "executor ID driver"

2015-09-16 Thread Hemant Bhanawat
1. When you call new SparkContext(), spark driver is started which
internally create Akka ActorSystem which registers on this port.

2. Since you are running in local mode, starting of executor is short
circuited and an Executor object is created in the same process (see
LocalEndpoint). This Executor object logs this message with executor ID as
"driver".

On Wed, Sep 16, 2015 at 9:44 AM, Muler  wrote:

> I'm running Spark in local mode and getting these two log messages who
> appear to be similar. I want to understand what each is doing:
>
>
>1. [main] util.Utils (Logging.scala:logInfo(59)) - Successfully
>started service 'sparkDriver' on port 60782.
>2. [main] executor.Executor (Logging.scala:logInfo(59)) - Starting
>executor ID driver on host localhost
>
> 1. is created using:
>
> val actorSystemName = if (isDriver) driverActorSystemName else
> executorActorSystemName
>
> val rpcEnv = RpcEnv.create(actorSystemName, hostname, port, conf,
> securityManager)
> val actorSystem = rpcEnv.asInstanceOf[AkkaRpcEnv].actorSystem
>
> 2. is created when:
>
>  _taskScheduler.start()
>
>
> What is the difference and what does each do?
>
>
>


Re: How to Serialize and Reconstruct JavaRDD later?

2015-09-02 Thread Hemant Bhanawat
You want to persist the state between the execution of two rdds. So, I
believe what you need is serialization of your model and not JavaRDD. If
you can serialize your model, you can persist that in HDFS or some other
datastore to be used by the next RDDs.

If you are using Spark Streaming, doing this would be easy.

On Wed, Sep 2, 2015 at 4:54 PM, Raja Reddy  wrote:

> Hi All,
>
> *Context:*
> I am exploring topic modelling with LDA with Spark MLLib. However, I need
> my model to enhance as more batches of documents come in.
>
> As of now I see no way of doing something like this, which gensim
>  does:
>
> lda.update(other_corpus)
>
> The only way I can enhance my model is essentially to recompute the
> LDAModel over all the documents accumulated after a new batch arrives.
>
> *Question:*
> One of the time consuming steps before performing topic modelling would be
> to construct the corpus as JavaRDD object, while reading through the actual
> documents.
>
> Capability to serialize a JavaRDD instance and reconstructing JavaRDD from
> the serialized snapshot would be helpful in this case. Suppose say I
> construct and serialize JavaRDD after reading Batch-1 of documents. When
> the Batch-2 arrives, I would like to deserialize the previously serialized
> RDD and mutate it with contents of new batch of documents. Could someone
> please let me know if serialization and deserialization of a JavaRDD
> instance is possible? I will have more questions if serialization is
> possible, mostly to do with changing spark configuration in between a
> serialization operation and deserialization operation.
>
> Thanks and Regards,
> Raja.
>


Re: Performance issue with Spark join

2015-08-26 Thread Hemant Bhanawat
Spark joins are different than traditional database joins because of the
lack of support of indexes.  Spark has to shuffle data between various
nodes to perform joins. Hence joins are bound to be much slower than count
which is just a parallel scan of the data.

Still, to ensure that nothing is wrong with the setup, you may want to look
at your Spark Task UI. You may want to look at the Shuffle Reads and
Shuffle write parameters.

On Wed, Aug 26, 2015 at 3:08 PM, lucap luca-pi...@hotmail.it wrote:

 Hi,

 I'm trying to perform an ETL using Spark, but as soon as I start performing
 joins performance degrades a lot. Let me explain what I'm doing and what I
 found out until now.

 First of all, I'm reading avro files that are on a Cloudera cluster, using
 commands like this:
 /val tab1 = sc.hadoopFile(hdfs:///path/to/file,
 classOf[org.apache.avro.mapred.AvroInputFormat[GenericRecord]],
 classOf[org.apache.avro.mapred.AvroWrapper[GenericRecord]],
 classOf[org.apache.hadoop.io.NullWritable], 10)/

 After this, I'm applying some filter functions to data (to reproduce
 where
 clauses of the original query) and then I'm using one map for each table in
 order to translate RDD elements in (key,record) format. Let's say I'm doing
 this:
 /val elabTab1 = tab1.filter(...).map()/

 It is important to notice that if I do something like /elabTab1.first/ or
 /elabTab1.count/ the task is performed in a short time, let's say around
 impala's time. Now I need to do the following:
 /val joined = elabTab1.leftOuterJoin(elabTab2)/
 Then I tried something like /joined.count/ to test performance, but it
 degraded really a lot (let's say that a count on a single table takes like
 4
 seconds and the count on the joined table takes 12 minutes). I think
 there's
 a problem with the configuration, but what might it be?

 I'll give you some more information:
 1] Spark is running on YARN on a Cloudera cluster
 2] I'm starting spark-shell with a command like /spark-shell
 --executor-cores 5 --executor-memory 10G/ that gives the shell approx 10
 vcores and 25 GB of memory
 3] The task seems still for a lot of time after the map tasks, with the
 following message in console: /Asked to send map output locations for
 shuffle ... to .../
 4] If I open the stderr of the executors, I can read plenty of messages
 like
 the following: /Thread ... spilling in-memory map of ... MB to disk/, where
 MBs are in the order of 300-400
 5] I tried to raise the number of executors, but the situation didn't seem
 to change much. I also tried to change the number of splits of the avro
 files (currently set to 10), but it didn't seem to change much as well
 6] Tables aren't particularly big, the bigger one should be few GBs

 Regards,
 Luca



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Performance-issue-with-Spark-join-tp24458.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Exception throws when running spark pi in Intellij Idea that scala.collection.Seq is not found

2015-08-25 Thread Hemant Bhanawat
Go to the module settings of the project and in the dependencies section
check the scope of scala jars. It would be either Test or Provided. Change
it to compile and it should work. Check the following link to understand
more about scope of modules:

https://maven.apache.org/guides/introduction/introduction-to-dependency-mechanism.html



On Tue, Aug 25, 2015 at 12:18 PM, Todd bit1...@163.com wrote:

 I cloned the code from https://github.com/apache/spark to my machine. It
 can compile successfully,
 But when I run the sparkpi, it throws an exception below complaining the
 scala.collection.Seq is not found.
 I have installed scala2.10.4 in my machine, and use the default profiles:
 window,scala2.10,maven-3,test-java-home.
 In Idea, I can find that the Seq class is on my classpath:





 Exception in thread main java.lang.NoClassDefFoundError:
 scala/collection/Seq
 at org.apache.spark.examples.SparkPi.main(SparkPi.scala)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at
 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
 at
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:606)
 at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140)
 Caused by: java.lang.ClassNotFoundException: scala.collection.Seq
 at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
 at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
 at java.security.AccessController.doPrivileged(Native Method)
 at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
 at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
 at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
 at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
 ... 6 more




Re: How to set environment of worker applications

2015-08-25 Thread Hemant Bhanawat
Ok, I went in the direction of system vars since beginning probably because
the question was to pass variables to a particular job.

Anyway, the decision to use either system vars or environment vars would
solely depend on whether you want to make them available to all the spark
processes on a node or to a particular job.

Are there any other reasons why one would prefer one over the other?


On Mon, Aug 24, 2015 at 8:48 PM, Raghavendra Pandey 
raghavendra.pan...@gmail.com wrote:

 System properties and environment variables are two different things.. One
 can use spark.executor.extraJavaOptions to pass system properties and
 spark-env.sh to pass environment variables.

 -raghav

 On Mon, Aug 24, 2015 at 1:00 PM, Hemant Bhanawat hemant9...@gmail.com
 wrote:

 That's surprising. Passing the environment variables using
 spark.executor.extraJavaOptions=-Dmyenvvar=xxx to the executor and then
 fetching them using System.getProperty(myenvvar) has worked for me.

 What is the error that you guys got?

 On Mon, Aug 24, 2015 at 12:10 AM, Sathish Kumaran Vairavelu 
 vsathishkuma...@gmail.com wrote:

 spark-env.sh works for me in Spark 1.4 but not
 spark.executor.extraJavaOptions.

 On Sun, Aug 23, 2015 at 11:27 AM Raghavendra Pandey 
 raghavendra.pan...@gmail.com wrote:

 I think the only way to pass on environment variables to worker node is
 to write it in spark-env.sh file on each worker node.

 On Sun, Aug 23, 2015 at 8:16 PM, Hemant Bhanawat hemant9...@gmail.com
 wrote:

 Check for spark.driver.extraJavaOptions and
 spark.executor.extraJavaOptions in the following article. I think you can
 use -D to pass system vars:

 spark.apache.org/docs/latest/configuration.html#runtime-environment
 Hi,

 I am starting a spark streaming job in standalone mode with
 spark-submit.

 Is there a way to make the UNIX environment variables with which
 spark-submit is started available to the processes started on the worker
 nodes?

 Jan
 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org







Re: Joining using mulitimap or array

2015-08-24 Thread Hemant Bhanawat
In your example, a.attributes.name is a list and is not a string . Run this
to find it out :

a.select($a.attributes.name).show()


On Mon, Aug 24, 2015 at 2:51 PM, Ilya Karpov i.kar...@cleverdata.ru wrote:

 Hi, guys
 I'm confused about joining columns in SparkSQL and need your advice.
 I want to join 2 datasets of profiles. Each profile has name and array of
 attributes(age, gender, email etc).
 There can be mutliple instances of attribute with the same name, e.g.
 profile has 2 emails - so 2 attributes with name = 'email' in
 array. Now I want to join 2 datasets using 'email' attribute. I cant find
 the way to do it :(

 The code is below. Now result of join is empty, while I expect to see 1
 row with all Alice emails.

 import org.apache.spark.sql.{DataFrame, SQLContext}
 import org.apache.spark.{SparkConf, SparkContext}

 case class Attribute(name: String, value: String, weight: Float)
 case class Profile(name: String, attributes: Seq[Attribute])

 object SparkJoinArrayColumn {
   def main(args: Array[String]) {
 val sc: SparkContext = new SparkContext(new
 SparkConf().setMaster(local).setAppName(getClass.getSimpleName))
 val sqlContext: SQLContext = new SQLContext(sc)

 import sqlContext.implicits._

 val a: DataFrame = sc.parallelize(Seq(
   Profile(Alice, Seq(Attribute(email, al...@mail.com, 1.0f),
 Attribute(email, a.jo...@mail.com, 1.0f)))
 )).toDF.as(a)

 val b: DataFrame = sc.parallelize(Seq(
   Profile(Alice, Seq(Attribute(email, al...@mail.com, 1.0f),
 Attribute(age, 29, 0.2f)))
 )).toDF.as(b)


 a.where($a.attributes.name === email)
   .join(
 b.where($b.attributes.name === email),
 $a.attributes.value === $b.attributes.value
   )
 .show()
   }
 }

 Thanks forward!
 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: How to set environment of worker applications

2015-08-24 Thread Hemant Bhanawat
That's surprising. Passing the environment variables using
spark.executor.extraJavaOptions=-Dmyenvvar=xxx to the executor and then
fetching them using System.getProperty(myenvvar) has worked for me.

What is the error that you guys got?

On Mon, Aug 24, 2015 at 12:10 AM, Sathish Kumaran Vairavelu 
vsathishkuma...@gmail.com wrote:

 spark-env.sh works for me in Spark 1.4 but not
 spark.executor.extraJavaOptions.

 On Sun, Aug 23, 2015 at 11:27 AM Raghavendra Pandey 
 raghavendra.pan...@gmail.com wrote:

 I think the only way to pass on environment variables to worker node is
 to write it in spark-env.sh file on each worker node.

 On Sun, Aug 23, 2015 at 8:16 PM, Hemant Bhanawat hemant9...@gmail.com
 wrote:

 Check for spark.driver.extraJavaOptions and
 spark.executor.extraJavaOptions in the following article. I think you can
 use -D to pass system vars:

 spark.apache.org/docs/latest/configuration.html#runtime-environment
 Hi,

 I am starting a spark streaming job in standalone mode with spark-submit.

 Is there a way to make the UNIX environment variables with which
 spark-submit is started available to the processes started on the worker
 nodes?

 Jan
 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org





Re: How to set environment of worker applications

2015-08-23 Thread Hemant Bhanawat
Check for spark.driver.extraJavaOptions and spark.executor.extraJavaOptions
in the following article. I think you can use -D to pass system vars:

spark.apache.org/docs/latest/configuration.html#runtime-environment
Hi,

I am starting a spark streaming job in standalone mode with spark-submit.

Is there a way to make the UNIX environment variables with which
spark-submit is started available to the processes started on the worker
nodes?

Jan
-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org


Re: PySpark concurrent jobs using single SparkContext

2015-08-21 Thread Hemant Bhanawat
It seems like you want simultaneous processing of multiple jobs but at the
same time serialization of few tasks within those jobs. I don't know how to
achieve that in Spark.

But, why would you bother about the inter-weaved processing when the data
that is being aggregated in different jobs is per customer per day? Is it
that save_aggregate depends on results of other customers and/or other
days?

I also don't understand how you would achieve that with yarn because
interweaving of tasks of separately submitted jobs may happen with dynamic
executor allocation as well.

Hemant


On Thu, Aug 20, 2015 at 7:04 PM, Mike Sukmanowsky 
mike.sukmanow...@gmail.com wrote:

 Hi all,

 We're using Spark 1.3.0 via a small YARN cluster to do some log
 processing. The jobs are pretty simple, for a number of customers and a
 number of days, fetch some event log data, build aggregates and store those
 aggregates into a data store.

 The way our script is written right now does something akin to:

 with SparkContext() as sc:
 for customer in customers:
 for day in days:
 logs = sc.textFile(get_logs(customer, day))
 aggregate = make_aggregate(logs)
 # This function contains the action saveAsNewAPIHadoopFile which
 # triggers a save
 save_aggregate(aggregate)

 ​
 So we have a Spark job per customer, per day.

 I tried doing some parallel job submission with something similar to:

 def make_and_save_aggregate(customer, day, spark_context):
 # Without a separate threading.Lock() here or better yet, one guarding the
 # Spark context, multiple customer/day transformations and actions could
 # be interweaved
 sc = spark_context
 logs = sc.textFile(get_logs(customer, day))
 aggregate = make_aggregate(logs)
 save_aggregate(aggregate)
 with SparkContext() as sc, futures.ThreadPoolExecutor(4) as executor:
 for customer in customers:
 for day in days:
 executor.submit(make_and_save_aggregate, customer, day, sc)

 ​
 The problem is, with no locks on a SparkContext except during
 initialization
 https://github.com/apache/spark/blob/v1.3.0/python/pyspark/context.py#L214-L241
  and
 shutdown
 https://github.com/apache/spark/blob/v1.3.0/python/pyspark/context.py#L296-L307,
 operations on the context could (if I understand correctly) be interweaved
 leading to DAG which contains transformations out of order and from
 different customer, day periods.

 One solution is instead to launch multiple Spark jobs via spark-submit and
 let YARN/Spark's dynamic executor allocation take care of fair scheduling.
 In practice, this doesn't seem to yield very fast computation perhaps due
 to some additional overhead with YARN.

 Is there any safe way to launch concurrent jobs like this using a single
 PySpark context?

 --
 Mike Sukmanowsky
 Aspiring Digital Carpenter

 *e*: mike.sukmanow...@gmail.com

 LinkedIn http://www.linkedin.com/profile/view?id=10897143 | github
 https://github.com/msukmanowsky




Re: persist for DStream

2015-08-20 Thread Hemant Bhanawat
Are you asking for something more than this?

http://spark.apache.org/docs/latest/streaming-programming-guide.html#caching--persistence



On Thu, Aug 20, 2015 at 2:09 PM, Deepesh Maheshwari 
deepesh.maheshwar...@gmail.com wrote:

 Hi,

 there are function available tp cache() or persist() RDD in memory but i
 am reading data from kafka in form of DStream and applying operation it and
 i want to persist that DStream in memory for further.

 Please suggest method how i can persist DStream in memory.

 Regards,
 Deepesh



Re: spark.sql.shuffle.partitions=1 seems to be working fine but creates timeout for large skewed data

2015-08-20 Thread Hemant Bhanawat
Sorry, I misread your mail. Thanks for pointing that out.

BTW, are the 8 files shuffle intermediate output and not the final
output? I assume yes. I didn't know that you can keep intermediate output
on HDFS and I don't think that is recommended.




On Thu, Aug 20, 2015 at 2:43 PM, Hemant Bhanawat hemant9...@gmail.com
wrote:

 Looks like you are using hash based shuffling and not sort based shuffling
 which creates a single file per maptask.

 On Thu, Aug 20, 2015 at 12:43 AM, unk1102 umesh.ka...@gmail.com wrote:

 Hi I have a Spark job which deals with large skewed dataset. I have around
 1000 Hive partitions to process in four different tables every day. So if
 I
 go with 200 spark.sql.shuffle.partitions default partitions created by
 Spark
 I end up with 4 * 1000 * 200 = 8 small small files in HDFS which wont
 be
 good for HDFS name node I have been told if you keep on creating such
 large
 no of small small files namenode will crash is it true? please help me
 understand. Anyways so to avoid creating small files I did set
 spark.sql.shuffle.partitions=1 it seems to be creating 1 output file and
 as
 per my understanding because of only one output there is so much shuffling
 to do to bring all data to once reducer please correct me if I am wrong.
 This is causing memory/timeout issues how do I deal with it

 I tried to give spark.shuffle.storage=0.7 also still this memory seems not
 enough for it. I have 25 gb executor with 4 cores and 20 such executors
 still Spark job fails please guide.



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/spark-sql-shuffle-partitions-1-seems-to-be-working-fine-but-creates-timeout-for-large-skewed-data-tp24346.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org





Re: Regarding rdd.collect()

2015-08-18 Thread Hemant Bhanawat
On Tue, Aug 18, 2015 at 1:16 PM, Dawid Wysakowicz 
wysakowicz.da...@gmail.com wrote:

 No, the data is not stored between two jobs. But it is stored for a
 lifetime of a job. Job can have multiple actions run.

I too thought so but wanted to confirm. Thanks.


 For a matter of sharing an rdd between jobs you can have a look at Spark
 Job Server(spark-jobserver https://github.com/ooyala/spark-jobserver)
 or some In-Memory storages: Tachyon(http://tachyon-project.org/) or
 Ignite(https://ignite.incubator.apache.org/)

 2015-08-18 9:37 GMT+02:00 Hemant Bhanawat hemant9...@gmail.com:

 It is still in memory for future rdd transformations and actions.

 This is interesting. You mean Spark holds the data in memory between two
 job executions.  How does the second job get the handle of the data in
 memory? I am interested in knowing more about it. Can you forward me a
 spark article or JIRA that talks about it?

 On Tue, Aug 18, 2015 at 12:49 PM, Sabarish Sasidharan 
 sabarish.sasidha...@manthan.com wrote:

 It is still in memory for future rdd transformations and actions. What
 you get in driver is a copy of the data.

 Regards
 Sab

 On Tue, Aug 18, 2015 at 12:02 PM, praveen S mylogi...@gmail.com wrote:

 When I do an rdd.collect().. The data moves back to driver  Or is still
 held in memory across the executors?




 --

 Architect - Big Data
 Ph: +91 99805 99458

 Manthan Systems | *Company of the year - Analytics (2014 Frost and
 Sullivan India ICT)*
 +++






Re: global variable in spark streaming with no dependency on key

2015-08-18 Thread Hemant Bhanawat
See if SparkContext.accumulator helps.

On Tue, Aug 18, 2015 at 2:27 PM, Joanne Contact joannenetw...@gmail.com
wrote:

 Hi Gurus,

 Please help.

 But please don't tell me to use updateStateByKey because I need a
 global variable (something like the clock time) across the micro
 batches but not depending on key. For my case, it is not acceptable to
 maintain a state for each key since each key comes in different times.
 Yes my global variable is related to time but cannot use machine
 clock.

 Any hint? Or is this lack of global variable by design?

 Thanks!

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Regarding rdd.collect()

2015-08-18 Thread Hemant Bhanawat
It is still in memory for future rdd transformations and actions.

This is interesting. You mean Spark holds the data in memory between two
job executions.  How does the second job get the handle of the data in
memory? I am interested in knowing more about it. Can you forward me a
spark article or JIRA that talks about it?

On Tue, Aug 18, 2015 at 12:49 PM, Sabarish Sasidharan 
sabarish.sasidha...@manthan.com wrote:

 It is still in memory for future rdd transformations and actions. What you
 get in driver is a copy of the data.

 Regards
 Sab

 On Tue, Aug 18, 2015 at 12:02 PM, praveen S mylogi...@gmail.com wrote:

 When I do an rdd.collect().. The data moves back to driver  Or is still
 held in memory across the executors?




 --

 Architect - Big Data
 Ph: +91 99805 99458

 Manthan Systems | *Company of the year - Analytics (2014 Frost and
 Sullivan India ICT)*
 +++



Re: registering an empty RDD as a temp table in a PySpark SQL context

2015-08-18 Thread Hemant Bhanawat
It is definitely not the case for Spark SQL. A temporary table (much like
dataFrame) is a just a logical plan with a name and it is not iterated
unless a query is fired on it.

I am not sure if using rdd.take in py code to verify the schema is a right
approach as it creates a spark job.

BTW, why would you want to update the Spark code? rdd.take in py code is
the problem. All you want is to avoid the schema verification in the
createDataFrame. I do not see any issue in the spark side in the way it
handles a RDD that has no data.


On Tue, Aug 18, 2015 at 1:23 AM, Eric Walker e...@node.io wrote:

 I have an RDD queried from a scan of a data source.  Sometimes the RDD has
 rows and at other times it has none.  I would like to register this RDD as
 a temporary table in a SQL context.  I suspect this will work in Scala, but
 in PySpark some code assumes that the RDD has rows in it, which are used to
 verify the schema:


 https://github.com/apache/spark/blob/branch-1.3/python/pyspark/sql/context.py#L299

 Before I attempt to extend the Scala code to handle an empty RDD or
 provide an empty DataFrame that can be registered, I was wondering what
 people recommend in this case.  Perhaps there's a simple way of registering
 an empty RDD as a temporary table in a PySpark SQL context that I'm
 overlooking.

 An alternative is to add special case logic in the client code to deal
 with an RDD backed by an empty table scan.  But since the SQL will already
 handle that, I was hoping to avoid special case logic.

 Eric




Re: Apache Spark - Parallel Processing of messages from Kafka - Java

2015-08-16 Thread Hemant Bhanawat
In spark, every action (foreach, collect etc.) gets converted into a spark
job and jobs are executed sequentially.

You may want to refactor your code in calculateUseCase? to just run
transformations (map, flatmap) and call a single action in the end.

On Sun, Aug 16, 2015 at 3:19 PM, mohanaugust mohanaug...@gmail.com wrote:

 JavaPairReceiverInputDStreamString, byte[] messages =
 KafkaUtils.createStream(...);
 JavaPairDStreamString, byte[] filteredMessages =
 filterValidMessages(messages);

 JavaDStreamString useCase1 = calculateUseCase1(filteredMessages);
 JavaDStreamString useCase2 = calculateUseCase2(filteredMessages);
 JavaDStreamString useCase3 = calculateUseCase3(filteredMessages);
 JavaDStreamString useCase4 = calculateUseCase4(filteredMessages);
 ...

 I retrieve messages from Kafka, filter that and use the same messages for
 mutiple use-cases. Here useCase1 to 4 are independent of each other and can
 be calculated parallely. However, when i look at the logs, i see that
 calculations are happening sequentially. How can i make them to run
 parallely. Any suggestion would be helpful



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Apache-Spark-Parallel-Processing-of-messages-from-Kafka-Java-tp24284.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Streaming on Exponential Data

2015-08-14 Thread Hemant Bhanawat
What does exponential data means? Does this mean that the amount of the
data that is being received from the stream in a batchinterval is
increasing exponentially as the time progresses?

Does your process have enough memory to handle the data for a batch
interval?

You may want to share Spark task UI snapshots and logs.



On Thu, Aug 13, 2015 at 9:25 PM, UMESH CHAUDHARY umesh9...@gmail.com
wrote:

 Hi,
 I was working with non-reliable receiver version of Spark-Kafka streaming
 i.e.
 KafkaUtils,createStream... where for testing purpose I was getting data at
 constant rate from kafka and it was acting as expected.
 But when there was exponential data in Kafka, my program started crashing
 saying
 Cannot Compute split on input data... also I found on console logs that
 it was adding data continuously in memory while receiving from Kafka.

 How Spark Streaming behaves towards exponential data.



Re: What is the Effect of Serialization within Stages?

2015-08-13 Thread Hemant Bhanawat
A chain of map and flatmap does not cause any
serialization-deserialization.



On Wed, Aug 12, 2015 at 4:02 PM, Mark Heimann mark.heim...@kard.info
wrote:

 Hello everyone,

 I am wondering what the effect of serialization is within a stage.

 My understanding of Spark as an execution engine is that the data flow
 graph is divided into stages and a new stage always starts after an
 operation/transformation that cannot be pipelined (such as groupBy or join)
 because it can only be completed after the whole data set has been taken
 care off. At the end of a stage shuffle files are written and at the
 beginning of the next stage they are read from.

 Within a stage my understanding is that pipelining is used, therefore I
 wonder whether there is any serialization overhead involved when there is
 no shuffling taking place. I am also assuming that my data set fits into
 memory and must not be spilled to disk.

 So if I would chain multiple *map* or *flatMap* operations and they end
 up in the same stage, will there be any serialization overhead for piping
 the result of the first *map* operation as a parameter into the following
 *map* operation?

 Any ideas and feedback appreciated, thanks a lot.

 Best regards,
 Mark



Re: grouping by a partitioned key

2015-08-12 Thread Hemant Bhanawat
Inline..

On Thu, Aug 13, 2015 at 5:06 AM, Eugene Morozov fathers...@list.ru wrote:

 Hemant, William, pls see inlined.

 On 12 Aug 2015, at 18:18, Philip Weaver philip.wea...@gmail.com wrote:

 Yes, I am partitoning using DataFrameWriter.partitionBy, which produces
 the keyed directory structure that you referenced in that link.


 Have you tried to use DataFrame API instead of SQL? I mean smth like
 dataFrame.select(key).agg(count).distinct().agg(sum).
 Could you print explain for this way and for SQL you tried? I’m just
 curious of the difference.


 On Tue, Aug 11, 2015 at 11:54 PM, Hemant Bhanawat hemant9...@gmail.com
 wrote:

 As far as I know, Spark SQL cannot process data on a per-partition-basis.
 DataFrame.foreachPartition is the way.


 What do you mean by “cannot process on per-partition-basis”? DataFrame is
 an RDD on steroids.


I meant that Spark SQL cannot process data of a single partition like you
can do with foreachpartition.



 I haven't tried it, but, following looks like a not-so-sophisticated way
 of making spark sql partition aware.


 http://spark.apache.org/docs/latest/sql-programming-guide.html#partition-discovery


 On Wed, Aug 12, 2015 at 5:00 AM, Philip Weaver philip.wea...@gmail.com
 wrote:

 Thanks.

 In my particular case, I am calculating a distinct count on a key that
 is unique to each partition, so I want to calculate the distinct count
 within each partition, and then sum those. This approach will avoid moving
 the sets of that key around between nodes, which would be very expensive.

 Currently, to accomplish this we are manually reading in the parquet
 files (not through Spark SQL), using a bitset to calculate the unique count
 within each partition, and accumulating that sum. Doing this through Spark
 SQL would be nice, but the naive SELECT distinct(count(...)) approach
 takes 60 times as long :). The approach I mentioned above might be an
 acceptable hybrid solution.

 - Philip


 On Tue, Aug 11, 2015 at 3:27 PM, Eugene Morozov fathers...@list.ru
 wrote:

 Philip,

 If all data per key are inside just one partition, then Spark will
 figure that out. Can you guarantee that’s the case?
 What is it you try to achieve? There might be another way for it, when
 you might be 100% sure what’s happening.

 You can print debugString or explain (for DataFrame) to see what’s
 happening under the hood.


 On 12 Aug 2015, at 01:19, Philip Weaver philip.wea...@gmail.com
 wrote:

 If I have an RDD that happens to already be partitioned by a key, how
 efficient can I expect a groupBy operation to be? I would expect that Spark
 shouldn't have to move data around between nodes, and simply will have a
 small amount of work just checking the partitions to discover that it
 doesn't need to move anything around.

 Now, what if we're talking about a parquet database created by using
 DataFrameWriter.partitionBy(...), then will Spark SQL be smart when I group
 by a key that I'm already partitioned by?

 - Philip


 Eugene Morozov
 fathers...@list.ru








 Eugene Morozov
 fathers...@list.ru







Re: grouping by a partitioned key

2015-08-12 Thread Hemant Bhanawat
As far as I know, Spark SQL cannot process data on a per-partition-basis.
DataFrame.foreachPartition is the way.

I haven't tried it, but, following looks like a not-so-sophisticated way of
making spark sql partition aware.

http://spark.apache.org/docs/latest/sql-programming-guide.html#partition-discovery


On Wed, Aug 12, 2015 at 5:00 AM, Philip Weaver philip.wea...@gmail.com
wrote:

 Thanks.

 In my particular case, I am calculating a distinct count on a key that is
 unique to each partition, so I want to calculate the distinct count within
 each partition, and then sum those. This approach will avoid moving the
 sets of that key around between nodes, which would be very expensive.

 Currently, to accomplish this we are manually reading in the parquet files
 (not through Spark SQL), using a bitset to calculate the unique count
 within each partition, and accumulating that sum. Doing this through Spark
 SQL would be nice, but the naive SELECT distinct(count(...)) approach
 takes 60 times as long :). The approach I mentioned above might be an
 acceptable hybrid solution.

 - Philip


 On Tue, Aug 11, 2015 at 3:27 PM, Eugene Morozov fathers...@list.ru
 wrote:

 Philip,

 If all data per key are inside just one partition, then Spark will figure
 that out. Can you guarantee that’s the case?
 What is it you try to achieve? There might be another way for it, when
 you might be 100% sure what’s happening.

 You can print debugString or explain (for DataFrame) to see what’s
 happening under the hood.


 On 12 Aug 2015, at 01:19, Philip Weaver philip.wea...@gmail.com wrote:

 If I have an RDD that happens to already be partitioned by a key, how
 efficient can I expect a groupBy operation to be? I would expect that Spark
 shouldn't have to move data around between nodes, and simply will have a
 small amount of work just checking the partitions to discover that it
 doesn't need to move anything around.

 Now, what if we're talking about a parquet database created by using
 DataFrameWriter.partitionBy(...), then will Spark SQL be smart when I group
 by a key that I'm already partitioned by?

 - Philip


 Eugene Morozov
 fathers...@list.ru








Re: How to minimize shuffling on Spark dataframe Join?

2015-08-11 Thread Hemant Bhanawat
Is the source of your dataframe partitioned on key? As per your mail, it
looks like it is not. If that is the case,  for partitioning the data, you
will have to shuffle the data anyway.

Another part of your question is - how to co-group data from two dataframes
based on a key? I think for RDD's cogroup in PairRDDFunctions is a way. I
am not sure if something similar is available for DataFrames.

Hemant





On Tue, Aug 11, 2015 at 2:14 PM, Abdullah Anwar 
abdullah.ibn.an...@gmail.com wrote:



 I have two dataframes like this

   student_rdf = (studentid, name, ...)
   student_result_rdf = (studentid, gpa, ...)

 we need to join this two dataframes. we are now doing like this,

 student_rdf.join(student_result_rdf, student_result_rdf[studentid] == 
 student_rdf[studentid])

 So it is simple. But it creates lots of data shuffling across worker
 nodes, but as joining key is similar and if the dataframe could (understand
 the partitionkey) be partitioned using that key (studentid) then there
 suppose not to be any shuffling at all. As similar data (based on partition
 key) would reside in similar node. is it possible, to hint spark to do this?

 So, I am finding the way to partition data based on a column while I read
 a dataframe from input. And If it is possible that Spark would understand
 that two partitionkey of two dataframes are similar, then how?




 --
 Abdullah



Re: Partitioning in spark streaming

2015-08-11 Thread Hemant Bhanawat
Posting a comment from my previous mail post:

When data is received from a stream source, receiver creates blocks of
data.  A new block of data is generated every blockInterval milliseconds. N
blocks of data are created during the batchInterval where N =
batchInterval/blockInterval. A RDD is created on the driver for the blocks
created during the batchInterval. The blocks generated during the
batchInterval are partitions of the RDD.

Now if you want to repartition based on a key, a shuffle is needed.

On Wed, Aug 12, 2015 at 4:36 AM, Mohit Anchlia mohitanch...@gmail.com
wrote:

 How does partitioning in spark work when it comes to streaming? What's the
 best way to partition a time series data grouped by a certain tag like
 categories of product video, music etc.



Re: Spark Streaming - Design considerations/Knobs

2015-05-21 Thread Hemant Bhanawat
Honestly, given the length of my email, I didn't expect a reply. :-) Thanks
for reading and replying. However, I have a follow-up question:

I don't think if I understand the block replication completely. Are the
blocks replicated immediately after they are received by the receiver? Or
are they kept on the receiver node only and are moved only on shuffle? Has
the replication something to do with locality.wait?

Thanks,
Hemant

On Thu, May 21, 2015 at 2:21 AM, Tathagata Das t...@databricks.com wrote:

 Correcting the ones that are incorrect or incomplete. BUT this is good
 list for things to remember about Spark Streaming.


 On Wed, May 20, 2015 at 3:40 AM, Hemant Bhanawat hemant9...@gmail.com
 wrote:

 Hi,

 I have compiled a list (from online sources) of knobs/design
 considerations that need to be taken care of by applications running on
 spark streaming. Is my understanding correct?  Any other important design
 consideration that I should take care of?


- A DStream is associated with a single receiver. For attaining read
parallelism multiple receivers i.e. multiple DStreams need to be created.
- A receiver is run within an executor. It occupies one core. Ensure
that there are enough cores for processing after receiver slots are booked
i.e. spark.cores.max should take the receiver slots into account.
- The receivers are allocated to executors in a round robin fashion.
- When data is received from a stream source, receiver creates blocks
of data.  A new block of data is generated every blockInterval
milliseconds. N blocks of data are created during the batchInterval where 
 N
= batchInterval/blockInterval.
- These blocks are distributed by the BlockManager of the current
executor to the block managers of other executors. After that, the Network
Input Tracker running on the driver is informed about the block locations
for further processing.
- A RDD is created on the driver for the blocks created during the
batchInterval. The blocks generated during the batchInterval are 
 partitions
of the RDD. Each partition is a task in spark. blockInterval==
batchinterval would mean that a single partition is created and probably 
 it
is processed locally.

 The map tasks on the blocks are processed in the executors (one that
 received the block, and another where the block was replicated) that has
 the blocks irrespective of block interval, unless non-local scheduling
 kicks in (as you observed next).


- Having bigger blockinterval means bigger blocks. A high value of
spark.locality.wait increases the chance of processing a block on the 
 local
node. A balance needs to be found out between these two parameters to
ensure that the bigger blocks are processed locally.
- Instead of relying on batchInterval and blockInterval, you can
define the number of partitions by calling dstream.repartition(n). This
reshuffles the data in RDD randomly to create n number of partitions.

 Yes, for greater parallelism. Though comes at the cost of a shuffle.


- An RDD's processing is scheduled by driver's jobscheduler as a job.
At a given point of time only one job is active. So, if one job is
executing the other jobs are queued.


- If you have two dstreams there will be two RDDs formed and there
will be two jobs created which will be scheduled one after the another.


- To avoid this, you can union two dstreams. This will ensure that a
single unionRDD is formed for the two RDDs of the dstreams. This unionRDD
is then considered as a single job. However the partitioning of the RDDs 
 is
not impacted.

 To further clarify, the jobs depend on the number of output operations
 (print, foreachRDD, saveAsXFiles) and the number of RDD actions in those
 output operations.

 dstream1.union(dstream2).foreachRDD { rdd = rdd.count() }// one Spark
 job per batch

 dstream1.union(dstream2).foreachRDD { rdd = { rdd.count() ; rdd.count() }
 }// TWO Spark jobs per batch

 dstream1.foreachRDD { rdd = rdd.count } ; dstream2.foreachRDD { rdd =
 rdd.count }  // TWO Spark jobs per batch






-
- If the batch processing time is more than batchinterval then
obviously the receiver's memory will start filling up and will end up in
throwing exceptions (most probably BlockNotFoundException). Currently 
 there
is  no way to pause the receiver.

 You can limit the rate of receiver using SparkConf config
 spark.streaming.receiver.maxRate


-
- For being fully fault tolerant, spark streaming needs to enable
checkpointing. Checkpointing increases the batch processing time.

 Incomplete. There are two types of checkpointing - data and metadata.
 Only data checkpointing, needed by only some operations, increase batch
 processing time. Read -
 http://spark.apache.org/docs/latest/streaming-programming-guide.html#checkpointing
 Furthemore, with checkpoint you can recover computation, but you

Spark Streaming - Design considerations/Knobs

2015-05-20 Thread Hemant Bhanawat
Hi,

I have compiled a list (from online sources) of knobs/design considerations
that need to be taken care of by applications running on spark streaming.
Is my understanding correct?  Any other important design consideration that
I should take care of?


   - A DStream is associated with a single receiver. For attaining read
   parallelism multiple receivers i.e. multiple DStreams need to be created.
   - A receiver is run within an executor. It occupies one core. Ensure
   that there are enough cores for processing after receiver slots are booked
   i.e. spark.cores.max should take the receiver slots into account.
   - The receivers are allocated to executors in a round robin fashion.
   - When data is received from a stream source, receiver creates blocks of
   data.  A new block of data is generated every blockInterval milliseconds. N
   blocks of data are created during the batchInterval where N =
   batchInterval/blockInterval.
   - These blocks are distributed by the BlockManager of the current
   executor to the block managers of other executors. After that, the Network
   Input Tracker running on the driver is informed about the block locations
   for further processing.
   - A RDD is created on the driver for the blocks created during the
   batchInterval. The blocks generated during the batchInterval are partitions
   of the RDD. Each partition is a task in spark. blockInterval==
   batchinterval would mean that a single partition is created and probably it
   is processed locally.
   - Having bigger blockinterval means bigger blocks. A high value of
   spark.locality.wait increases the chance of processing a block on the local
   node. A balance needs to be found out between these two parameters to
   ensure that the bigger blocks are processed locally.
   - Instead of relying on batchInterval and blockInterval, you can define
   the number of partitions by calling dstream.repartition(n). This reshuffles
   the data in RDD randomly to create n number of partitions.
   - An RDD's processing is scheduled by driver's jobscheduler as a job. At
   a given point of time only one job is active. So, if one job is executing
   the other jobs are queued.
   - If you have two dstreams there will be two RDDs formed and there will
   be two jobs created which will be scheduled one after the another.
   - To avoid this, you can union two dstreams. This will ensure that a
   single unionRDD is formed for the two RDDs of the dstreams. This unionRDD
   is then considered as a single job. However the partitioning of the RDDs is
   not impacted.
   - If the batch processing time is more than batchinterval then obviously
   the receiver's memory will start filling up and will end up in throwing
   exceptions (most probably BlockNotFoundException). Currently there is  no
   way to pause the receiver.
   - For being fully fault tolerant, spark streaming needs to enable
   checkpointing. Checkpointing increases the batch processing time.
   - The frequency of metadata checkpoint cleaning can be controlled using
   spark.cleaner.ttl. But, data checkpoint cleaning happens automatically when
   the RDDs in the checkpoint are no more required.



Thanks,
Hemant