Re: Spark streaming receivers

2020-08-09 Thread Dark Crusader
Hi Russell,
This is super helpful. Thank you so much.

Can you elaborate on the differences between structured streaming vs
dstreams? How would the number of receivers required etc change?

On Sat, 8 Aug, 2020, 10:28 pm Russell Spitzer, 
wrote:

> Note, none of this applies to Direct streaming approaches, only receiver
> based Dstreams.
>
> You can think of a receiver as a long running task that never finishes.
> Each receiver is submitted to an executor slot somewhere, it then runs
> indefinitely and internally has a method which passes records over to a
> block management system. There is a timing that you set which decides when
> each block is "done" and records after that time has passed go into the
> next block (See parameter
> <https://spark.apache.org/docs/latest/configuration.html#spark-streaming>
> spark.streaming.blockInterval)  Once a block is done it can be processed
> in the next Spark batch.. The gap between a block starting and a block
> being finished is why you can lose data in Receiver streaming without
> WriteAheadLoging. Usually your block interval is divisible into your batch
> interval so you'll get X blocks per batch. Each block becomes one partition
> of the job being done in a Streaming batch. Multiple receivers can be
> unified into a single dstream, which just means the blocks produced by all
> of those receivers are handled in the same Streaming batch.
>
> So if you have 5 different receivers, you need at minimum 6 executor
> cores. 1 core for each receiver, and 1 core to actually do your processing
> work. In a real world case you probably want significantly more  cores on
> the processing side than just 1. Without repartitioning you will never have
> more that
>
> A quick example
>
> I run 5 receivers with block interval of 100ms and spark batch interval of
> 1 second. I use union to group them all together, I will most likely end up
> with one Spark Job for each batch every second running with 50 partitions
> (1000ms / 100(ms / partition / receiver) * 5 receivers). If I have a total
> of 10 cores in the system. 5 of them are running receivers, The remaining 5
> must process the 50 partitions of data generated by the last second of work.
>
> And again, just to reiterate, if you are doing a direct streaming approach
> or structured streaming, none of this applies.
>
> On Sat, Aug 8, 2020 at 10:03 AM Dark Crusader <
> relinquisheddra...@gmail.com> wrote:
>
>> Hi,
>>
>> I'm having some trouble figuring out how receivers tie into spark
>> driver-executor structure.
>> Do all executors have a receiver that is blocked as soon as it
>> receives some stream data?
>> Or can multiple streams of data be taken as input into a single executor?
>>
>> I have stream data coming in at every second coming from 5 different
>> sources. I want to aggregate data from each of them. Does this mean I need
>> 5 executors or does it have to do with threads on the executor?
>>
>> I might be mixing in a few concepts here. Any help would be appreciated.
>> Thank you.
>>
>


Spark streaming receivers

2020-08-08 Thread Dark Crusader
Hi,

I'm having some trouble figuring out how receivers tie into spark
driver-executor structure.
Do all executors have a receiver that is blocked as soon as it
receives some stream data?
Or can multiple streams of data be taken as input into a single executor?

I have stream data coming in at every second coming from 5 different
sources. I want to aggregate data from each of them. Does this mean I need
5 executors or does it have to do with threads on the executor?

I might be mixing in a few concepts here. Any help would be appreciated.
Thank you.


Mock spark reads and writes

2020-07-14 Thread Dark Crusader
Sorry I wasn't very clear in my last email.

I have a function like this:

def main( read_file):
df = spark.read.csv(read_file)
** Some other code **
df.write.csv(path)

Which I need to write a unit test for.
Would pythons unittest mock help me here?

When I googled this, I mostly see that we shouldn't mock these reads and
writes, but this doesn't solve the problem of how I unittest helper
functions/main method that will have to read and write files.

An example of the proper way to do this in python would be really helpful.

Thanks a lot.


Mocking pyspark read writes

2020-07-07 Thread Dark Crusader
Hi everyone,

I have a function which reads and writes a parquet file from HDFS. When I'm
writing a unit test for this function, I want to mock this read & write.

How do you achieve this?
Any help would be appreciated. Thank you.


Re: Add python library with native code

2020-06-05 Thread Dark Crusader
Hi Stone,

Have you looked into this article?
https://medium.com/@SSKahani/pyspark-applications-dependencies-99415e0df987

I haven't tried it with .so files however I did use the approach he
recommends to install my other dependencies.
I Hope it helps.

On Fri, Jun 5, 2020 at 1:12 PM Stone Zhong  wrote:

> Hi,
>
> So my pyspark app depends on some python libraries, it is not a problem, I
> pack all the dependencies into a file libs.zip, and then call
> *sc.addPyFile("libs.zip")* and it works pretty well for a while.
>
> Then I encountered a problem, if any of my library has any binary file
> dependency (like .so files), this approach does not work. Mainly because
> when you set PYTHONPATH to a zip file, python does not look up needed
> binary library (e.g. a .so file) inside the zip file, this is a python
> *limitation*. So I got a workaround:
>
> 1) Do not call sc.addPyFile, instead extract the libs.zip into current
> directory
> 2) When my python code starts, manually call *sys.path.insert(0,
> f"{os.getcwd()}/libs")* to set PYTHONPATH
>
> This workaround works well for me. Then I got another problem: what if my
> code in executor need python library that has binary code? Below is am
> example:
>
> def do_something(p):
> ...
>
> rdd = sc.parallelize([
> {"x": 1, "y": 2},
> {"x": 2, "y": 3},
> {"x": 3, "y": 4},
> ])
> a = rdd.map(do_something)
>
> What if the function "do_something" need a python library that has
> binary code? My current solution is, extract libs.zip into a NFS share (or
> a SMB share) and manually do *sys.path.insert(0,
> f"share_mount_dir/libs") *in my "do_something" function, but adding such
> code in each function looks ugly, is there any better/elegant solution?
>
> Thanks,
> Stone
>
>


Re: Spark dataframe hdfs vs s3

2020-05-30 Thread Dark Crusader
Thanks all for the replies.
I am switching to hdfs since it seems like an easier solution.
To answer some of your questions, my hdfs space is a part of my nodes I use
for computation on spark.
>From what I understand, this helps because of the data locality advantage.
Which means that there is less network IO and data redistribution on the
nodes.

Thanks for your help.
Aditya

On Sat, 30 May, 2020, 10:48 am Jörn Franke,  wrote:

> Maybe some aws network optimized instances with higher bandwidth will
> improve the situation.
>
> Am 27.05.2020 um 19:51 schrieb Dark Crusader  >:
>
> 
> Hi Jörn,
>
> Thanks for the reply. I will try to create a easier example to reproduce
> the issue.
>
> I will also try your suggestion to look into the UI. Can you guide on what
> I should be looking for?
>
> I was already using the s3a protocol to compare the times.
>
> My hunch is that multiple reads from S3 are required because of improper
> caching of intermediate data. And maybe hdfs is doing a better job at this.
> Does this make sense?
>
> I would also like to add that we built an extra layer on S3 which might be
> adding to even slower times.
>
> Thanks for your help.
>
> On Wed, 27 May, 2020, 11:03 pm Jörn Franke,  wrote:
>
>> Have you looked in Spark UI why this is the case ?
>> S3 Reading can take more time - it depends also what s3 url you are using
>> : s3a vs s3n vs S3.
>>
>> It could help after some calculation to persist in-memory or on HDFS. You
>> can also initially load from S3 and store on HDFS and work from there .
>>
>> HDFS offers Data locality for the tasks, ie the tasks start on the nodes
>> where the data is. Depending on what s3 „protocol“ you are using you might
>> be also more punished with performance.
>>
>> Try s3a as a protocol (replace all s3n with s3a).
>>
>> You can also use s3 url but this requires a special bucket configuration,
>> a dedicated empty bucket and it lacks some ineroperability with other AWS
>> services.
>>
>> Nevertheless, it could be also something else with the code. Can you post
>> an example reproducing the issue?
>>
>> > Am 27.05.2020 um 18:18 schrieb Dark Crusader <
>> relinquisheddra...@gmail.com>:
>> >
>> > 
>> > Hi all,
>> >
>> > I am reading data from hdfs in the form of parquet files (around 3 GB)
>> and running an algorithm from the spark ml library.
>> >
>> > If I create the same spark dataframe by reading data from S3, the same
>> algorithm takes considerably more time.
>> >
>> > I don't understand why this is happening. Is this a chance occurence or
>> are the spark dataframes created different?
>> >
>> > I don't understand how the data store would effect the algorithm
>> performance.
>> >
>> > Any help would be appreciated. Thanks a lot.
>>
>


Re: Spark dataframe hdfs vs s3

2020-05-27 Thread Dark Crusader
Hi Randy,

Yes, I'm using parquet on both S3 and hdfs.

On Thu, 28 May, 2020, 2:38 am randy clinton,  wrote:

> Is the file Parquet on S3 or is it some other file format?
>
> In general I would assume that HDFS read/writes are more performant for
> spark jobs.
>
> For instance, consider how well partitioned your HDFS file is vs the S3
> file.
>
> On Wed, May 27, 2020 at 1:51 PM Dark Crusader <
> relinquisheddra...@gmail.com> wrote:
>
>> Hi Jörn,
>>
>> Thanks for the reply. I will try to create a easier example to reproduce
>> the issue.
>>
>> I will also try your suggestion to look into the UI. Can you guide on
>> what I should be looking for?
>>
>> I was already using the s3a protocol to compare the times.
>>
>> My hunch is that multiple reads from S3 are required because of improper
>> caching of intermediate data. And maybe hdfs is doing a better job at this.
>> Does this make sense?
>>
>> I would also like to add that we built an extra layer on S3 which might
>> be adding to even slower times.
>>
>> Thanks for your help.
>>
>> On Wed, 27 May, 2020, 11:03 pm Jörn Franke,  wrote:
>>
>>> Have you looked in Spark UI why this is the case ?
>>> S3 Reading can take more time - it depends also what s3 url you are
>>> using : s3a vs s3n vs S3.
>>>
>>> It could help after some calculation to persist in-memory or on HDFS.
>>> You can also initially load from S3 and store on HDFS and work from there .
>>>
>>> HDFS offers Data locality for the tasks, ie the tasks start on the nodes
>>> where the data is. Depending on what s3 „protocol“ you are using you might
>>> be also more punished with performance.
>>>
>>> Try s3a as a protocol (replace all s3n with s3a).
>>>
>>> You can also use s3 url but this requires a special bucket
>>> configuration, a dedicated empty bucket and it lacks some ineroperability
>>> with other AWS services.
>>>
>>> Nevertheless, it could be also something else with the code. Can you
>>> post an example reproducing the issue?
>>>
>>> > Am 27.05.2020 um 18:18 schrieb Dark Crusader <
>>> relinquisheddra...@gmail.com>:
>>> >
>>> > 
>>> > Hi all,
>>> >
>>> > I am reading data from hdfs in the form of parquet files (around 3 GB)
>>> and running an algorithm from the spark ml library.
>>> >
>>> > If I create the same spark dataframe by reading data from S3, the same
>>> algorithm takes considerably more time.
>>> >
>>> > I don't understand why this is happening. Is this a chance occurence
>>> or are the spark dataframes created different?
>>> >
>>> > I don't understand how the data store would effect the algorithm
>>> performance.
>>> >
>>> > Any help would be appreciated. Thanks a lot.
>>>
>>
>
> --
> I appreciate your time,
>
> ~Randy
>


Re: Spark dataframe hdfs vs s3

2020-05-27 Thread Dark Crusader
Hi Jörn,

Thanks for the reply. I will try to create a easier example to reproduce
the issue.

I will also try your suggestion to look into the UI. Can you guide on what
I should be looking for?

I was already using the s3a protocol to compare the times.

My hunch is that multiple reads from S3 are required because of improper
caching of intermediate data. And maybe hdfs is doing a better job at this.
Does this make sense?

I would also like to add that we built an extra layer on S3 which might be
adding to even slower times.

Thanks for your help.

On Wed, 27 May, 2020, 11:03 pm Jörn Franke,  wrote:

> Have you looked in Spark UI why this is the case ?
> S3 Reading can take more time - it depends also what s3 url you are using
> : s3a vs s3n vs S3.
>
> It could help after some calculation to persist in-memory or on HDFS. You
> can also initially load from S3 and store on HDFS and work from there .
>
> HDFS offers Data locality for the tasks, ie the tasks start on the nodes
> where the data is. Depending on what s3 „protocol“ you are using you might
> be also more punished with performance.
>
> Try s3a as a protocol (replace all s3n with s3a).
>
> You can also use s3 url but this requires a special bucket configuration,
> a dedicated empty bucket and it lacks some ineroperability with other AWS
> services.
>
> Nevertheless, it could be also something else with the code. Can you post
> an example reproducing the issue?
>
> > Am 27.05.2020 um 18:18 schrieb Dark Crusader <
> relinquisheddra...@gmail.com>:
> >
> > 
> > Hi all,
> >
> > I am reading data from hdfs in the form of parquet files (around 3 GB)
> and running an algorithm from the spark ml library.
> >
> > If I create the same spark dataframe by reading data from S3, the same
> algorithm takes considerably more time.
> >
> > I don't understand why this is happening. Is this a chance occurence or
> are the spark dataframes created different?
> >
> > I don't understand how the data store would effect the algorithm
> performance.
> >
> > Any help would be appreciated. Thanks a lot.
>


Spark dataframe hdfs vs s3

2020-05-27 Thread Dark Crusader
Hi all,

I am reading data from hdfs in the form of parquet files (around 3 GB) and
running an algorithm from the spark ml library.

If I create the same spark dataframe by reading data from S3, the same
algorithm takes considerably more time.

I don't understand why this is happening. Is this a chance occurence or are
the spark dataframes created different?

I don't understand how the data store would effect the algorithm
performance.

Any help would be appreciated. Thanks a lot.