Apache Spark Machine Learning Unleashed Book Review author: Jillur Quddus

2020-05-30 Thread patrice molinchaeux

@ Jillur Qudus aka Scammer

I know you are hiding on this mailing or at least or your friends are.

@Sean Owen Book/Theatre critic is a profession


When I first saw the following code on the introductory Page https://spark.apache.org/examples.htm

def inside(p):
x, y = random.random(), random.random()
return x*x + y*y < 1

count = sc.parallelize(xrange(0, NUM_SAMPLES)) \
 .filter(inside).count()
print "Pi is roughly %f" % (4.0 * count / NUM_SAMPLES)

I was a bit overwhelmed and thought WTF I better get a book which will explain this thoroughly. By  investing in a book I would get a thorough walk through of real cases and any dependency tools, with Spark  a Framework technology.

 

The Book I purchased was yours. Machine Learning with Apache Spark Quick Start Guide.

Www.Packt.com publishing. Birmingham – Mumbai

Author: Jillur Quddus 

 

You describe yourself as “

lead technical architect polygot software engineer and data scientist with over ten years experience. Architecting and engineering distributed scalable high performance and secure solutions used to combat organized crime,cybercrime and fraud. .. working with central Government Intelligence , law enforcement , banking. Worked across the world including in Japan , Singapore , Malaysia , Hong Kong and New Zealand. Founder of UK based company Keisan specialising in open source distributed technologies and machine learning ..

 

on Page 53 you describe the step of installing ANACONDA { python package manager and python environment }

“

> bash Anaconda3-…

 

on Page 54 

 

> conda install -c conda-forge pyspark 

 

Page 63

“ You will need to restart any Jupyter Notebook instances, and the underlying Terminal Sessions

from which they were spawned, in order for SPARK_HOME environment variable to be successfully recognised and registered by findspark.”

 

I know that Spark Framework is a self contained framework with its own configurations files

so there is no need to set environment variables. If I wanted to use those SPARK_HOME libraries then the code reads like so.

 

import findspark

findspark.init('/path/to/spark-2.4.5-bin-hadoop2.7')

 

 

Page [63] 

“ .. We are now ready to write our first spark Program in python ! .. it may be easier to split the following code 

 

This is your code

 

# (1) import required Python Dependencies 

import findspark

findspark.init(()

 

This will not compile without the slashes because this is PYTHON.

# (2) Instantiate the spark context 

conf = SparkConf()

.setMaster(“spark://192.168.56.10:7077”)

.setAppName(“Calculate Pi”)

sc = SparkContext(conf=conf)

 

 

conf = SparkConf()\

.setMaster("spark://192.168.56.10:7077")\

.setAppName("Calculate Pi")\

sc = SparkContext(conf=conf)

 

 

This will compile because it is copied and pasted from Python example code.

# (3) Calculate the value of Pi i.e. 3.14 …

def inside (p):

x,y = random.random(), random.random()

return x*x + y*y <1

 

num_sample = 100

 

I have set it like so to get a more accurate value of pi

num_samples = 10**8

 

even this guy https://www.sicara.ai/blog/2017-05-02-get-started-pyspark-jupyter-notebook-3-minutes has set it to 

num_samples = 1 

"(I bet you understand what it does!)"


although good code would be num_samples = 10**8 (to the power of )


You also write in the book “To my wife and best friend Jane , for making life worth living. And to the memory of parents, for their sacrifices and giving me the freedom to explore my imagination.”

I understand these books' sale price is in India for $0.25 cents
because they printed there.

You obviously did not respect my freedom to explore and study  
and other who also spent their hard earning money and were SCAMMED like me.   

So I wish you a long life. In that long life I hope you have an accident where you are

parallelised from neck down and live the remainder of your years as a vegetables like Steven Hawkings. You wife has to spoon feed then finally end up and leaving you.

 


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



[bug] Scala reflection "assertion failed: class Byte" in Dataset.toJSON

2020-05-30 Thread Brandon Vincent
Hi all,

I have a job that executes a query and collects the results as JSON using
Dataset.toJSON. For the most part it is stable, but sometimes it fails
randomly with a scala assertion error. Here is the stack trace:

  org.apache.spark.sql.Dataset.toJSON
 Dataset.scala: 3222
org.apache.spark.sql.Encoders$.STRING
Encoders.scala:   96
  org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$.apply
 ExpressionEncoder.scala:   72
   org.apache.spark.sql.catalyst.ScalaReflection$.deserializerFor
 ScalaReflection.scala:  161
   org.apache.spark.sql.catalyst.ScalaReflection$.deserializerFor
 ScalaReflection.scala:  173
  org.apache.spark.sql.catalyst.ScalaReflection$.cleanUpReflectionObjects
 ScalaReflection.scala:   49
  org.apache.spark.sql.catalyst.ScalaReflection.cleanUpReflectionObjects$
 ScalaReflection.scala:  925
   org.apache.spark.sql.catalyst.ScalaReflection.cleanUpReflectionObjects
 ScalaReflection.scala:  926
  scala.reflect.internal.tpe.TypeConstraints$UndoLog.undo
 TypeConstraints.scala:   68
org.apache.spark.sql.catalyst.ScalaReflection$.$anonfun$deserializerFor$1
 ScalaReflection.scala:  260
   org.apache.spark.sql.catalyst.ScalaReflection$.localTypeOf
 ScalaReflection.scala:   49
   org.apache.spark.sql.catalyst.ScalaReflection.localTypeOf$
 ScalaReflection.scala:  939
org.apache.spark.sql.catalyst.ScalaReflection.localTypeOf
 ScalaReflection.scala:  941
   scala.reflect.api.TypeTags$WeakTypeTagImpl.tpe
TypeTags.scala:  237
scala.reflect.api.TypeTags$WeakTypeTagImpl.tpe$lzycompute
TypeTags.scala:  237
  org.apache.spark.sql.catalyst.ScalaReflection$$typecreator7$1.apply
 ScalaReflection.scala:  260
  scala.reflect.internal.Symbols$TypeSymbol.toTypeConstructor
 Symbols.scala: 3081
scala.reflect.internal.Symbols$SymbolContextApiImpl.toTypeConstructor
 Symbols.scala:  194
scala.reflect.internal.Symbols$TypeSymbol.typeConstructor
 Symbols.scala: 3154
  scala.reflect.internal.Symbols$TypeSymbol.setTyconCache
 Symbols.scala: 3163
   scala.reflect.internal.SymbolTable.throwAssertionError
 SymbolTable.scala:  183
java.lang.AssertionError: assertion failed: class Byte

It can also come up with "class Boolean" with the same stack trace.
Any clues on this? I wasn't able to find information about this specific
assertion error.

The spark version is 2.4.4 compiled with scala 2.12.

Thank you.


Re: Dataframe to nested json document

2020-05-30 Thread neeraj bhadani
Hi,
Apologies for missing link in the previous mail.

   You can follow the below link to save your DataFrame as JSON file.

https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrameWriter.json


Regards,
Neeraj


On Sat, May 30, 2020 at 5:01 PM neeraj bhadani 
wrote:

> Hi,
>You can follow this to save your DataFrame as JSON file.
>
> Regards,
> Neeraj
>
> On Sat, May 30, 2020 at 12:44 PM zakaria benzidalmal 
> wrote:
>
>> Hi
>>
>> Just save it as json
>>
>> Le sam. 30 mai 2020 à 13:15, Chidananda Unchi 
>> a écrit :
>>
>>> Hi All,
>>>

 I want to convert dataframe to JSOn Dcoumnet using spark scala.

 Can some one share me sample codes or any suggestions

 Regards,
 Chidananda




Unsubscribe

2020-05-30 Thread Sunil Prabhakara



Re: Dataframe to nested json document

2020-05-30 Thread neeraj bhadani
Hi,
   You can follow this to save your DataFrame as JSON file.

Regards,
Neeraj

On Sat, May 30, 2020 at 12:44 PM zakaria benzidalmal 
wrote:

> Hi
>
> Just save it as json
>
> Le sam. 30 mai 2020 à 13:15, Chidananda Unchi  a
> écrit :
>
>> Hi All,
>>
>>>
>>> I want to convert dataframe to JSOn Dcoumnet using spark scala.
>>>
>>> Can some one share me sample codes or any suggestions
>>>
>>> Regards,
>>> Chidananda
>>>
>>>


Re: Spark dataframe hdfs vs s3

2020-05-30 Thread Anwar AliKhan
Optimisation of Spark applications

Apache Spark  is an in-memory
data processing tool widely used in companies to deal with Big Data issues.
Running a Spark application in production requires user-defined resources.
This article presents several Spark concepts to optimize the use of the
engine, both in the writing of the code and in the selection of execution
parameters. These concepts will be illustrated through a use case with a
focus on best practices for allocating ressources of a Spark applications
in a Hadoop Yarn  environment.
Spark
Cluster: terminologies and modes

Deploying a Spark application in a YARN cluster requires an understanding
of the “master-slave” model as well as the operation of several components:
the Cluster Manager, the Spark Driver, the Spark Executors and the Edge
Node concept.

The “master-slave” model defines two types of entities: the master controls
and centralizes the communications of the slaves. It is a model that is
often applied in the implementation of clusters and/or for parallel
processing. It is also the model used by Spark applications.

The *Cluster Manager* maintains the physical machines on which the Driver
and its Executors are going to run and allocates the requested resources to
the users. Spark supports 4 Cluster Managers: Apache YARN, Mesos,
Standalone and, recently, Kubernetes. We will focus on YARN.

The *Spark Driver* is the entity that manages the execution of the Spark
application (the master), each application is associated with a Driver. Its
role is to interpret the application’s code to transform it into a sequence
of tasks and to maintain all the states and tasks of the Executors.

The *Spark Executors* are the entities responsible for performing the tasks
assigned to them by the Driver (the slaves). They will read these tasks,
execute them and return their states (Success/Fail) and results. The
Executors are linked to only one application at a time.

The *Edge Node* is a physical/virtual machine where users will connect to
instantiate their Spark applications. It serves as an interface between the
cluster and the outside world. It is a comfort zone where components are
pre-installed and most importantly, pre-configured.
Execution
modes

There are different ways to deploy a Spark application:

   - The *Cluster* mode: This is the most common, the user sends a JAR file
   or a Python script to the Cluster Manager. The latter will instantiate a
   Driver and Executors on the different nodes of the cluster. The CM is
   responsible for all processes related to the Spark application. We will use
   it to handle our example: it facilitates the allocation of resources and
   releases them as soon as the application is finished.
   - The *Client* mode: Almost identical to *cluster* mode with the
   difference that the driver is instantiated on the machine where the job is
   submitted, i.e. outside the cluster. It is often used for program
   development because the logs are directly displayed in the current
   terminal, and the instance of the driver is linked to the user’s session.
   This mode is not recommended in production because the Edge Node can
   quickly reach saturation in terms of resources and the Edge Node is a SPOF
   (Single Point Of Failure).
   - The *Local* mode: the Driver and Executors run on the machine on which
   the user is logged in. It is only recommended for the purpose of testing an
   application in a local environment or for executing unit tests.

The number of Executors and their respective resources are provided
directly in the spark-submit command, or via the configuration properties
injected at the creation of the SparkSession object. Once the Executors are
created, they will communicate with the Driver, which will distribute the
processing tasks.

Resources

A Spark application works as follows: data is stored in memory, and the
CPUs are responsible for performing the tasks of an application. The
application is therefore constrained by the resources used, including
memory and CPUs, which are defined for the Driver and Executors.

Spark applications can generally be divided into two types:

   - *Memory-intensive*: Applications involving massive joins or HashMap
   processing. These operations are expensive in terms of memory.
   - *CPU-intensive*: All applications involving sorting operations or
   searching for particular data. These types of jobs become intensive
   depending on the frequency of these operations.

Some applications are both memory intensive and CPU intensive: some models
of Machine Learning, for example, require 

Re: [pyspark 2.3+] Dedupe records

2020-05-30 Thread Anwar AliKhan
What meaning Dataframes are RDDs under the cover ?

What meaning deduplication ?


Please send your  bio data history and past commercial projects.

The Wali Ahad agreed to release 300 million USD for new machine learning
research
Project to centralize government facilities to find better way to offer
Citizen Service with artificial Intelligence Technologies.

I am to find talented Artificial Intelligence Experts.


Shukran



On Sat, 30 May 2020, 05:26 Sonal Goyal,  wrote:

> Hi Rishi,
>
> 1. Dataframes are RDDs under the cover. If you have unstructured data or
> if you know something about the data through which you can optimize the
> computation. you can go with RDDs. Else the Dataframes which are optimized
> by Spark SQL should be fine.
> 2. For incremental deduplication, I guess you can hash your data based on
> some particular values and then only compare the new records against the
> ones which have the same hash. That should reduce the order of comparisons
> drastically provided you can come up with a good indexing/hashing scheme as
> per your dataset.
>
> Thanks,
> Sonal
> Nube Technologies 
>
> 
>
>
>
>
> On Sat, May 30, 2020 at 8:17 AM Rishi Shah 
> wrote:
>
>> Hi All,
>>
>> I have around 100B records where I get new , update & delete records.
>> Update/delete records are not that frequent. I would like to get some
>> advice on below:
>>
>> 1) should I use rdd + reducibly or DataFrame window operation for data of
>> this size? Which one would outperform the other? Which is more reliable and
>> low maintenance?
>> 2) Also how would you suggest we do incremental deduplication? Currently
>> we do full processing once a week and no dedupe during week days to avoid
>> heavy processing. However I would like to explore incremental dedupe option
>> and weight pros/cons.
>>
>> Any input is highly appreciated!
>>
>> --
>> Regards,
>>
>> Rishi Shah
>>
>


Re: Dataframe to nested json document

2020-05-30 Thread zakaria benzidalmal
Hi

Just save it as json

Le sam. 30 mai 2020 à 13:15, Chidananda Unchi  a
écrit :

> Hi All,
>
>>
>> I want to convert dataframe to JSOn Dcoumnet using spark scala.
>>
>> Can some one share me sample codes or any suggestions
>>
>> Regards,
>> Chidananda
>>
>>


Dataframe to nested json document

2020-05-30 Thread Chidananda Unchi
Hi All,

>
> I want to convert dataframe to JSOn Dcoumnet using spark scala.
>
> Can some one share me sample codes or any suggestions
>
> Regards,
> Chidananda
>
>


Re: [pyspark 2.3+] Dedupe records

2020-05-30 Thread Molotch
The performant way would be to partition your dataset into reasonably small
chunks and use a bloom filter to see if the entity might be in your set
before you make a lookup.

Check the bloom filter, if the entity might be in the set, rely on partition
pruning to read and backfill the relevant partition. If the entity isn't in
the set, just save as new data.

Sooner or later you probably would want to compact the appended partitions
to reduce the amount of small files.

Delta Lake has update and compation semantics unless you want to do it
manually.

Since 2.4.0 Spark is also able to prune buckets. But as far as I know
there's no way to backfill a single bucket. If it was the combination of
partition and bucket pruning could dramatically limit the amount data you
needed to read/write from/to disk.

RDD vs Dataframe, I'm not sure exactly how and when Tungsten is able to be
used when using RDD:s, if at all. Because of that I always try to use
Dataframes and the built in fucntions as long as possible just to get the
sweet offheap allocation and the "expressions to byte code" thingy along the
Catalyst optimizations. That will probably make more for your performance
than anything else. The memory overhead of JVM objects and GC runs might be
brutal on your performance and memory usage depending on your dataset and
use case.


br,

molotch



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark dataframe hdfs vs s3

2020-05-30 Thread Dark Crusader
Thanks all for the replies.
I am switching to hdfs since it seems like an easier solution.
To answer some of your questions, my hdfs space is a part of my nodes I use
for computation on spark.
>From what I understand, this helps because of the data locality advantage.
Which means that there is less network IO and data redistribution on the
nodes.

Thanks for your help.
Aditya

On Sat, 30 May, 2020, 10:48 am Jörn Franke,  wrote:

> Maybe some aws network optimized instances with higher bandwidth will
> improve the situation.
>
> Am 27.05.2020 um 19:51 schrieb Dark Crusader  >:
>
> 
> Hi Jörn,
>
> Thanks for the reply. I will try to create a easier example to reproduce
> the issue.
>
> I will also try your suggestion to look into the UI. Can you guide on what
> I should be looking for?
>
> I was already using the s3a protocol to compare the times.
>
> My hunch is that multiple reads from S3 are required because of improper
> caching of intermediate data. And maybe hdfs is doing a better job at this.
> Does this make sense?
>
> I would also like to add that we built an extra layer on S3 which might be
> adding to even slower times.
>
> Thanks for your help.
>
> On Wed, 27 May, 2020, 11:03 pm Jörn Franke,  wrote:
>
>> Have you looked in Spark UI why this is the case ?
>> S3 Reading can take more time - it depends also what s3 url you are using
>> : s3a vs s3n vs S3.
>>
>> It could help after some calculation to persist in-memory or on HDFS. You
>> can also initially load from S3 and store on HDFS and work from there .
>>
>> HDFS offers Data locality for the tasks, ie the tasks start on the nodes
>> where the data is. Depending on what s3 „protocol“ you are using you might
>> be also more punished with performance.
>>
>> Try s3a as a protocol (replace all s3n with s3a).
>>
>> You can also use s3 url but this requires a special bucket configuration,
>> a dedicated empty bucket and it lacks some ineroperability with other AWS
>> services.
>>
>> Nevertheless, it could be also something else with the code. Can you post
>> an example reproducing the issue?
>>
>> > Am 27.05.2020 um 18:18 schrieb Dark Crusader <
>> relinquisheddra...@gmail.com>:
>> >
>> > 
>> > Hi all,
>> >
>> > I am reading data from hdfs in the form of parquet files (around 3 GB)
>> and running an algorithm from the spark ml library.
>> >
>> > If I create the same spark dataframe by reading data from S3, the same
>> algorithm takes considerably more time.
>> >
>> > I don't understand why this is happening. Is this a chance occurence or
>> are the spark dataframes created different?
>> >
>> > I don't understand how the data store would effect the algorithm
>> performance.
>> >
>> > Any help would be appreciated. Thanks a lot.
>>
>


Re: Using Spark Accumulators with Structured Streaming

2020-05-30 Thread Srinivas V
It’s in constructor

On Sat, May 30, 2020 at 4:15 AM Something Something <
mailinglist...@gmail.com> wrote:

> I mean... I don't see any reference to 'accumulator' in your Class
> *definition*. How can you access it in the class if it's not in your
> definition of class:
>
> public class StateUpdateTask implements MapGroupsWithStateFunction<*String,
> InputEventModel, ModelStateInfo, ModelUpdate*> {.  *--> I was expecting
> to see 'accumulator' here in the definition.*
>
> @Override
> public ModelUpdate call(String productId, Iterator
> eventsIterator, GroupState state) {
> }
> }
>
> On Fri, May 29, 2020 at 1:08 PM Srinivas V  wrote:
>
>>
>> Yes, accumulators are updated in the call method of StateUpdateTask. Like
>> when state times out or when the data is pushed to next Kafka topic etc.
>>
>> On Fri, May 29, 2020 at 11:55 PM Something Something <
>> mailinglist...@gmail.com> wrote:
>>
>>> Thanks! I will take a look at the link. Just one question, you seem to
>>> be passing 'accumulators' in the constructor but where do you use it in the
>>> StateUpdateTask class? I am still missing that connection. Sorry, if my
>>> question is dumb. I must be missing something. Thanks for your help so far.
>>> It's been useful.
>>>
>>>
>>> On Fri, May 29, 2020 at 6:51 AM Srinivas V  wrote:
>>>
 Yes it is application specific class. This is how java Spark Functions
 work.
 You can refer to this code in the documentation:
 https://github.com/apache/spark/blob/master/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredSessionization.java

 public class StateUpdateTask implements
 MapGroupsWithStateFunction>>> ModelUpdate> {

 @Override
 public ModelUpdate call(String productId, Iterator
 eventsIterator, GroupState state) {
 }
 }

 On Thu, May 28, 2020 at 10:59 PM Something Something <
 mailinglist...@gmail.com> wrote:

> I am assuming StateUpdateTask is your application specific class. Does
> it have 'updateState' method or something? I googled but couldn't find any
> documentation about doing it this way. Can you please direct me to some
> documentation. Thanks.
>
> On Thu, May 28, 2020 at 4:43 AM Srinivas V 
> wrote:
>
>> yes, I am using stateful structured streaming. Yes similar to what
>> you do. This is in Java
>> I do it this way:
>> Dataset productUpdates = watermarkedDS
>> .groupByKey(
>> (MapFunction) event
>> -> event.getId(), Encoders.STRING())
>> .mapGroupsWithState(
>> new
>> StateUpdateTask(Long.parseLong(appConfig.getSparkStructuredStreamingConfig().STATE_TIMEOUT),
>> appConfig, accumulators),
>> Encoders.bean(ModelStateInfo.class),
>> Encoders.bean(ModelUpdate.class),
>> GroupStateTimeout.ProcessingTimeTimeout());
>>
>> StateUpdateTask contains the update method.
>>
>> On Thu, May 28, 2020 at 4:41 AM Something Something <
>> mailinglist...@gmail.com> wrote:
>>
>>> Yes, that's exactly how I am creating them.
>>>
>>> Question... Are you using 'Stateful Structured Streaming' in which
>>> you've something like this?
>>>
>>> .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(
>>> updateAcrossEvents
>>>   )
>>>
>>> And updating the Accumulator inside 'updateAcrossEvents'? We're 
>>> experiencing this only under 'Stateful Structured Streaming'. In other 
>>> streaming applications it works as expected.
>>>
>>>
>>>
>>> On Wed, May 27, 2020 at 9:01 AM Srinivas V 
>>> wrote:
>>>
 Yes, I am talking about Application specific Accumulators. Actually
 I am getting the values printed in my driver log as well as sent to
 Grafana. Not sure where and when I saw 0 before. My deploy mode is 
 “client”
 on a yarn cluster(not local Mac) where I submit from master node. It 
 should
 work the same for cluster mode as well.
 Create accumulators like this:
 AccumulatorV2 accumulator = sparkContext.longAccumulator(name);


 On Tue, May 26, 2020 at 8:42 PM Something Something <
 mailinglist...@gmail.com> wrote:

> Hmm... how would they go to Graphana if they are not getting
> computed in your code? I am talking about the Application Specific
> Accumulators. The other standard counters such as
> 'event.progress.inputRowsPerSecond' are getting populated correctly!
>
> On Mon, May 25, 2020 at 8:39 PM Srinivas V 
> wrote:
>
>> Hello,
>> Even for me it comes as 0 when I print in OnQueryProgress. I use
>> LongAccumulator as well. Yes, it prints on my local but not on