Re: Connection issue with AWS S3 from PySpark 2.3.1

2018-12-21 Thread Shuporno Choudhury
Hi,
I don't know whether the following config (that you have tried) are correct:
fs.s3a.awsAccessKeyId
fs.s3a.awsSecretAccessKey

The correct ones probably are:
fs.s3a.access.key
fs.s3a.secret.key

On Fri, 21 Dec 2018 at 13:21, Aakash Basu-2 [via Apache Spark User List] <
ml+s1001560n34217...@n3.nabble.com> wrote:

> Hey Shuporno,
>
> Thanks for a prompt reply. Thanks for noticing the silly mistake, I tried
> this out, but still getting another error, which is related to connectivity
> it seems.
>
> >>> hadoop_conf.set("fs.s3a.awsAccessKeyId", "abcd")
>> >>> hadoop_conf.set("fs.s3a.awsSecretAccessKey", "123abc")
>> >>> a = spark.read.csv("s3a:///test-bucket/breast-cancer-wisconsin.csv",
>> header=True)
>> Traceback (most recent call last):
>>   File "", line 1, in 
>>   File
>> "/Users/aakash/Downloads/spark-2.3.1-bin-hadoop2.7/python/pyspark/sql/readwriter.py",
>> line 441, in csv
>> return
>> self._df(self._jreader.csv(self._spark._sc._jvm.PythonUtils.toSeq(path)))
>>   File
>> "/Users/aakash/Downloads/spark-2.3.1-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py",
>> line 1257, in __call__
>>   File
>> "/Users/aakash/Downloads/spark-2.3.1-bin-hadoop2.7/python/pyspark/sql/utils.py",
>> line 63, in deco
>> return f(*a, **kw)
>>   File
>> "/Users/aakash/Downloads/spark-2.3.1-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py",
>> line 328, in get_return_value
>> py4j.protocol.Py4JJavaError: An error occurred while calling o220.csv.
>> : java.lang.NoClassDefFoundError:
>> com/amazonaws/auth/AWSCredentialsProvider
>> at java.lang.Class.forName0(Native Method)
>> at java.lang.Class.forName(Class.java:348)
>> at
>> org.apache.hadoop.conf.Configuration.getClassByNameOrNull(Configuration.java:2134)
>> at
>> org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:2099)
>> at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2193)
>> at
>> org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2654)
>> at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2667)
>> at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:94)
>> at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2703)
>> at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2685)
>> at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:373)
>> at org.apache.hadoop.fs.Path.getFileSystem(Path.java:295)
>> at
>> org.apache.spark.sql.execution.streaming.FileStreamSink$.hasMetadata(FileStreamSink.scala:45)
>> at
>> org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:354)
>> at
>> org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:239)
>> at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:227)
>> at org.apache.spark.sql.DataFrameReader.csv(DataFrameReader.scala:596)
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>> at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> at java.lang.reflect.Method.invoke(Method.java:498)
>> at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
>> at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
>> at py4j.Gateway.invoke(Gateway.java:282)
>> at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
>> at py4j.commands.CallCommand.execute(CallCommand.java:79)
>> at py4j.GatewayConnection.run(GatewayConnection.java:238)
>> at java.lang.Thread.run(Thread.java:748)
>> Caused by: java.lang.ClassNotFoundException:
>> com.amazonaws.auth.AWSCredentialsProvider
>> at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
>> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>> at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)
>> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>> ... 28 more
>
>
>
> Thanks,
> Aakash.
>
> On Fri, Dec 21, 2018 at 12:51 PM Shuporno Choudhury <[hidden email]
> <http:///user/SendEmail.jtp?type=node=34217=0>> wrote:
>
>>
>>
>> On Fri, 21 Dec 2018 at 12:47, Shuporno Choudhury <[hidden email]
>> <http:///user/SendEmail.jtp?type=node=34217=1>> wrote:
>>
>>> Hi,
>>> Your connection config uses 's3n' but your read command uses 's3a'.
>>> The config for 

Re: Connection issue with AWS S3 from PySpark 2.3.1

2018-12-20 Thread Shuporno Choudhury
On Fri, 21 Dec 2018 at 12:47, Shuporno Choudhury <
shuporno.choudh...@gmail.com> wrote:

> Hi,
> Your connection config uses 's3n' but your read command uses 's3a'.
> The config for s3a are:
> spark.hadoop.fs.s3a.access.key
> spark.hadoop.fs.s3a.secret.key
>
> I feel this should solve the problem.
>
> On Fri, 21 Dec 2018 at 12:09, Aakash Basu-2 [via Apache Spark User List] <
> ml+s1001560n34215...@n3.nabble.com> wrote:
>
>> Hi,
>>
>> I am trying to connect to AWS S3 and read a csv file (running POC) from a
>> bucket.
>>
>> I have s3cmd and and being able to run ls and other operation from cli.
>>
>> *Present Configuration:*
>> Python 3.7
>> Spark 2.3.1
>>
>> *JARs added:*
>> hadoop-aws-2.7.3.jar (in sync with the hadoop version used with spark)
>> aws-java-sdk-1.11.472.jar
>>
>> Trying out the following code:
>>
>> >>> sc=spark.sparkContext
>>>
>>> >>> hadoop_conf=sc._jsc.hadoopConfiguration()
>>>
>>> >>> hadoop_conf.set("fs.s3n.awsAccessKeyId", "abcd")
>>>
>>> >>> hadoop_conf.set("fs.s3n.awsSecretAccessKey", "xyz123")
>>>
>>> >>> a = spark.read.csv("s3a://test-bucket/breast-cancer-wisconsin.csv",
>>> header=True)
>>>
>>> Traceback (most recent call last):
>>>
>>>   File "", line 1, in 
>>>
>>>   File
>>> "/Users/aakash/Downloads/spark-2.3.1-bin-hadoop2.7/python/pyspark/sql/readwriter.py",
>>> line 441, in csv
>>>
>>> return
>>> self._df(self._jreader.csv(self._spark._sc._jvm.PythonUtils.toSeq(path)))
>>>
>>>   File
>>> "/Users/aakash/Downloads/spark-2.3.1-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py",
>>> line 1257, in __call__
>>>
>>>   File
>>> "/Users/aakash/Downloads/spark-2.3.1-bin-hadoop2.7/python/pyspark/sql/utils.py",
>>> line 63, in deco
>>>
>>> return f(*a, **kw)
>>>
>>>   File
>>> "/Users/aakash/Downloads/spark-2.3.1-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py",
>>> line 328, in get_return_value
>>>
>>> py4j.protocol.Py4JJavaError: An error occurred while calling o33.csv.
>>>
>>> : java.lang.NoClassDefFoundError:
>>> com/amazonaws/auth/AWSCredentialsProvider
>>>
>>> at java.lang.Class.forName0(Native Method)
>>>
>>> at java.lang.Class.forName(Class.java:348)
>>>
>>> at
>>> org.apache.hadoop.conf.Configuration.getClassByNameOrNull(Configuration.java:2134)
>>>
>>> at
>>> org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:2099)
>>>
>>> at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2193)
>>>
>>> at
>>> org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2654)
>>>
>>> at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2667)
>>>
>>> at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:94)
>>>
>>> at
>>> org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2703)
>>>
>>> at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2685)
>>>
>>> at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:373)
>>>
>>> at org.apache.hadoop.fs.Path.getFileSystem(Path.java:295)
>>>
>>> at
>>> org.apache.spark.sql.execution.streaming.FileStreamSink$.hasMetadata(FileStreamSink.scala:45)
>>>
>>> at
>>> org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:354)
>>>
>>> at
>>> org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:239)
>>>
>>> at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:227)
>>>
>>> at org.apache.spark.sql.DataFrameReader.csv(DataFrameReader.scala:596)
>>>
>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>
>>> at
>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>>
>>> at
>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>
>>> at java.lang.reflect.Method.invoke(Method.java:498)
>>>
>>> at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
>>>
>>

Re: CSV parser - is there a way to find malformed csv record

2018-10-09 Thread Shuporno Choudhury
Hi,
There is a way to way obtain these malformed/rejected records. Rejection
can happen not only because of column number mismatch but also if the data
type of the data does not match the data type mentioned in the schema.
To obtain the rejected records, you can do the following:
1. Add an extra column (eg: CorruptRecCol) to your schema of type
StringType()
2. In the datadrame reader, add the *mode* 'PERMISSIVE' while
simultaneously adding the column CorruptRecCol to
*columnNameOfCorruptRecord*
3. The column CorruptRecCol will contain the complete record if it is
malformed/corrupted. On the other hand, it will be null if the record is
valid. So you can use a filter (CorruptRecCol is NULL) to obtain the
malformed/corrupted record.
You can use any column name to contain the invalid records. I have used
*CorruptRecCol* just for example.
http://spark.apache.org/docs/2.2.0/api/python/pyspark.sql.html#pyspark.sql.DataFrameReader
This example is for pyspark. Similar example will exist for Java/Scala also.
https://spark.apache.org/docs/2.2.0/api/java/org/apache/spark/sql/DataFrameReader.html


On Tue, 9 Oct 2018 at 00:27, Nirav Patel [via Apache Spark User List] <
ml+s1001560n33643...@n3.nabble.com> wrote:

> I am getting `RuntimeException: Malformed CSV record` while parsing csv
> record and attaching schema at same time. Most likely there are additional
> commas or json data in some field which are not escaped properly. Is there
> a way CSV parser tells me which record is malformed?
>
>
> This is what I am using:
>
> val df2 = sparkSession.read
>   .option("inferSchema", true)
>   .option("multiLine", true)
>   .schema(headerDF.schema) // this only works without column mismatch
>   .csv(dataPath)
>
> Thanks
>
>
>
> [image: What's New with Xactly] <http://www.xactlycorp.com/email-click/>
>
> <https://www.instagram.com/xactlycorp/>
> <https://www.linkedin.com/company/xactly-corporation>
> <https://twitter.com/Xactly>   <https://www.facebook.com/XactlyCorp>
> <http://www.youtube.com/xactlycorporation>
>
> --
> If you reply to this email, your message will be added to the discussion
> below:
>
> http://apache-spark-user-list.1001560.n3.nabble.com/CSV-parser-is-there-a-way-to-find-malformed-csv-record-tp33643.html
> To start a new topic under Apache Spark User List, email
> ml+s1001560n1...@n3.nabble.com
> To unsubscribe from Apache Spark User List, click here
> <http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_code=1=c2h1cG9ybm8uY2hvdWRodXJ5QGdtYWlsLmNvbXwxfC0xODI0MTU0MzQ0>
> .
> NAML
> <http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewer=instant_html%21nabble%3Aemail.naml=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>
>


-- 
--Thanks,
Shuporno Choudhury


Re: Clearing usercache on EMR [pyspark]

2018-08-03 Thread Shuporno Choudhury
 Can anyone please help me with this issue?

On Fri, 3 Aug 2018 at 11:27, Shuporno Choudhury <
shuporno.choudh...@gmail.com> wrote:

> Can anyone please help me with this issue?
>
> On Wed, 1 Aug 2018 at 12:50, Shuporno Choudhury [via Apache Spark User
> List]  wrote:
>
>> Hi everyone,
>> I am running spark jobs on EMR (using pyspark). I noticed that after
>> running jobs, the size of the usercache (basically the filecache folder)
>> keeps on increasing (with directory names as 1,2,3,4,5,...).
>> Directory location: */mnt/yarn/usercache/hadoop/**filecache/*
>> Is there a way to avoid creating these directories or automatically
>> clearing the usercache/filecache after a job/periodically?
>> --
>> --Thanks,
>> Shuporno Choudhury
>>
>>
>> --
>> If you reply to this email, your message will be added to the discussion
>> below:
>>
>> http://apache-spark-user-list.1001560.n3.nabble.com/Clearing-usercache-on-EMR-pyspark-tp33096.html
>> To start a new topic under Apache Spark User List, email
>> ml+s1001560n1...@n3.nabble.com
>> To unsubscribe from Apache Spark User List, click here
>> <http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_code=1=c2h1cG9ybm8uY2hvdWRodXJ5QGdtYWlsLmNvbXwxfC0xODI0MTU0MzQ0>
>> .
>> NAML
>> <http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewer=instant_html%21nabble%3Aemail.naml=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>
>>
>
>
> --
> --Thanks,
> Shuporno Choudhury
>


-- 
--Thanks,
Shuporno Choudhury


Clearing usercache on EMR [pyspark]

2018-08-01 Thread Shuporno Choudhury
Hi everyone,
I am running spark jobs on EMR (using pyspark). I noticed that after
running jobs, the size of the usercache (basically the filecache folder)
keeps on increasing (with directory names as 1,2,3,4,5,...).
Directory location: */mnt/yarn/usercache/hadoop/**filecache/*
Is there a way to avoid creating these directories or automatically
clearing the usercache/filecache after a job/periodically?
-- 
--Thanks,
Shuporno Choudhury


Re: [PySpark] Releasing memory after a spark job is finished

2018-06-04 Thread Shuporno Choudhury
 Hi,
Responding to your queries:
I am using Spark 2.2.1.I have tried with both dynamic resource allocation
turned on and off and have encountered the same behaviour.

The way data is being read is that filepaths (for each independent data
set) are passed to a method, then the method does the processing for those
particular files and writes the result. So, even that doesn't seem to
release memory.
There are multiple independent data sets (for which the method is called
sequentially).
While doing this, memory consumption just keeps stacking up.

You can replicate this behaviour in spark-shell (pyspark:
%SPARK_HOME%/bin/pyspark) by:
1. Creating a method that reads data from filepaths passed to it as
arguments and creates a dataframe on top of that
2. Doing some processing (filter etc) on that dataframe
3. Write the results to a target (can be passed to the method)
4. Try running this method again and again (either by providing different
target paths/deleting target folder before calling the method again) -> to
replicate behaviour of multiple datasets [OR you can provide different data
sets altogether for each run of the method]
You will notice that the memory consumption for that particular JVM started
by spark shell will continuously increase (observe from Task Manager).

Maybe, Jon is right. Probably I need to run different spark-submit for
different data sets (as they are completely independent).

Any other advice would also be really appreciated.

On Tue, 5 Jun 2018 at 10:46, Jörn Franke [via Apache Spark User List] <
ml+s1001560n3246...@n3.nabble.com> wrote:

> Additionally I meant with modularization that jobs that have really
> nothing to do with each other should be in separate python programs
>
> On 5. Jun 2018, at 04:50, Thakrar, Jayesh <[hidden email]
> <http:///user/SendEmail.jtp?type=node=32465=0>> wrote:
>
> Disclaimer - I use Spark with Scala and not Python.
>
>
>
> But I am guessing that Jorn's reference to modularization is to ensure
> that you do the processing inside methods/functions and call those methods
> sequentially.
>
> I believe that as long as an RDD/dataset variable is in scope, its memory
> may not be getting released.
>
> By having functions, they will get out of scope and their memory can be
> released.
>
>
>
> Also, assuming that the variables are not daisy-chained/inter-related as
> that too will not make it easy.
>
>
>
>
>
> *From: *Jay <[hidden email]
> <http:///user/SendEmail.jtp?type=node=32465=1>>
> *Date: *Monday, June 4, 2018 at 9:41 PM
> *To: *Shuporno Choudhury <[hidden email]
> <http:///user/SendEmail.jtp?type=node=32465=2>>
> *Cc: *"Jörn Franke [via Apache Spark User List]" <[hidden email]
> <http:///user/SendEmail.jtp?type=node=32465=3>>, <[hidden email]
> <http:///user/SendEmail.jtp?type=node=32465=4>>
> *Subject: *Re: [PySpark] Releasing memory after a spark job is finished
>
>
>
> Can you tell us what version of Spark you are using and if Dynamic
> Allocation is enabled ?
>
>
>
> Also, how are the files being read ? Is it a single read of all files
> using a file matching regex or are you running different threads in the
> same pyspark job?
>
>
>
>
>
> On Mon 4 Jun, 2018, 1:27 PM Shuporno Choudhury, <[hidden email]
> <http:///user/SendEmail.jtp?type=node=32465=5>> wrote:
>
> Thanks a lot for the insight.
>
> Actually I have the exact same transformations for all the datasets, hence
> only 1 python code.
>
> Now, do you suggest that I run different spark-submit for all the
> different datasets given that I have the exact same transformations?
>
>
>
> On Tue 5 Jun, 2018, 1:48 AM Jörn Franke [via Apache Spark User List], <[hidden
> email] <http:///user/SendEmail.jtp?type=node=32465=6>> wrote:
>
> Yes if they are independent with different transformations then I would
> create a separate python program. Especially for big data processing
> frameworks one should avoid to put everything in one big monotholic
> applications.
>
>
>
>
> On 4. Jun 2018, at 22:02, Shuporno Choudhury <[hidden email]
> <http://user/SendEmail.jtp?type=node=32458=0>> wrote:
>
> Hi,
>
>
>
> Thanks for the input.
>
> I was trying to get the functionality first, hence I was using local mode.
> I will be running on a cluster definitely but later.
>
>
>
> Sorry for my naivety, but can you please elaborate on the modularity
> concept that you mentioned and how it will affect whatever I am already
> doing?
>
> Do you mean running a different spark-submit for each different dataset
> when you say 'an independent python program for each process '?
>
>
>
> On Tue, 5 Jun 2018 at 01:12,

Re: [PySpark] Releasing memory after a spark job is finished

2018-06-04 Thread Shuporno Choudhury
Thanks a lot for the insight.
Actually I have the exact same transformations for all the datasets, hence
only 1 python code.
Now, do you suggest that I run different spark-submit for all the different
datasets given that I have the exact same transformations?

On Tue 5 Jun, 2018, 1:48 AM Jörn Franke [via Apache Spark User List], <
ml+s1001560n32458...@n3.nabble.com> wrote:

> Yes if they are independent with different transformations then I would
> create a separate python program. Especially for big data processing
> frameworks one should avoid to put everything in one big monotholic
> applications.
>
>
> On 4. Jun 2018, at 22:02, Shuporno Choudhury <[hidden email]
> <http:///user/SendEmail.jtp?type=node=32458=0>> wrote:
>
> Hi,
>
> Thanks for the input.
> I was trying to get the functionality first, hence I was using local mode.
> I will be running on a cluster definitely but later.
>
> Sorry for my naivety, but can you please elaborate on the modularity
> concept that you mentioned and how it will affect whatever I am already
> doing?
> Do you mean running a different spark-submit for each different dataset
> when you say 'an independent python program for each process '?
>
> On Tue, 5 Jun 2018 at 01:12, Jörn Franke [via Apache Spark User List] <[hidden
> email] <http:///user/SendEmail.jtp?type=node=32458=1>> wrote:
>
>> Why don’t you modularize your code and write for each process an
>> independent python program that is submitted via Spark?
>>
>> Not sure though if Spark local make sense. If you don’t have a cluster
>> then a normal python program can be much better.
>>
>> On 4. Jun 2018, at 21:37, Shuporno Choudhury <[hidden email]
>> <http:///user/SendEmail.jtp?type=node=32455=0>> wrote:
>>
>> Hi everyone,
>> I am trying to run a pyspark code on some data sets sequentially [basically
>> 1. Read data into a dataframe 2.Perform some join/filter/aggregation 3.
>> Write modified data in parquet format to a target location]
>> Now, while running this pyspark code across *multiple independent data
>> sets sequentially*, the memory usage from the previous data set doesn't
>> seem to get released/cleared and hence spark's memory consumption (JVM
>> memory consumption from Task Manager) keeps on increasing till it fails at
>> some data set.
>> So, is there a way to clear/remove dataframes that I know are not going
>> to be used later?
>> Basically, can I clear out some memory programmatically (in the pyspark
>> code) when processing for a particular data set ends?
>> At no point, I am caching any dataframe (so unpersist() is also not a
>> solution).
>>
>> I am running spark using local[*] as master. There is a single
>> SparkSession that is doing all the processing.
>> If it is not possible to clear out memory, what can be a better approach
>> for this problem?
>>
>> Can someone please help me with this and tell me if I am going wrong
>> anywhere?
>>
>> --Thanks,
>> Shuporno Choudhury
>>
>>
>>
>> --
>> If you reply to this email, your message will be added to the discussion
>> below:
>>
>> http://apache-spark-user-list.1001560.n3.nabble.com/PySpark-Releasing-memory-after-a-spark-job-is-finished-tp32454p32455.html
>> To start a new topic under Apache Spark User List, email [hidden email]
>> <http:///user/SendEmail.jtp?type=node=32458=2>
>> To unsubscribe from Apache Spark User List, click here.
>> NAML
>> <http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewer=instant_html%21nabble%3Aemail.naml=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>
>>
>
>
> --
> --Thanks,
> Shuporno Choudhury
>
>
>
> --
> If you reply to this email, your message will be added to the discussion
> below:
>
> http://apache-spark-user-list.1001560.n3.nabble.com/PySpark-Releasing-memory-after-a-spark-job-is-finished-tp32454p32458.html
> To start a new topic under Apache Spark User List, email
> ml+s1001560n1...@n3.nabble.com
> To unsubscribe from Apache Spark User List, click here
> <http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_code=1=c2h1cG9ybm8uY2hvdWRodXJ5QGdtYWlsLmNvbXwxfC0xODI0MTU0MzQ0>
> .
> NAML
> <http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewer=instant_html%21nabble%3Aemail.naml=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>
>


Re: [PySpark] Releasing memory after a spark job is finished

2018-06-04 Thread Shuporno Choudhury
Hi,

Thanks for the input.
I was trying to get the functionality first, hence I was using local mode.
I will be running on a cluster definitely but later.

Sorry for my naivety, but can you please elaborate on the modularity
concept that you mentioned and how it will affect whatever I am already
doing?
Do you mean running a different spark-submit for each different dataset
when you say 'an independent python program for each process '?

On Tue, 5 Jun 2018 at 01:12, Jörn Franke [via Apache Spark User List] <
ml+s1001560n32455...@n3.nabble.com> wrote:

> Why don’t you modularize your code and write for each process an
> independent python program that is submitted via Spark?
>
> Not sure though if Spark local make sense. If you don’t have a cluster
> then a normal python program can be much better.
>
> On 4. Jun 2018, at 21:37, Shuporno Choudhury <[hidden email]
> <http:///user/SendEmail.jtp?type=node=32455=0>> wrote:
>
> Hi everyone,
> I am trying to run a pyspark code on some data sets sequentially [basically
> 1. Read data into a dataframe 2.Perform some join/filter/aggregation 3.
> Write modified data in parquet format to a target location]
> Now, while running this pyspark code across *multiple independent data
> sets sequentially*, the memory usage from the previous data set doesn't
> seem to get released/cleared and hence spark's memory consumption (JVM
> memory consumption from Task Manager) keeps on increasing till it fails at
> some data set.
> So, is there a way to clear/remove dataframes that I know are not going to
> be used later?
> Basically, can I clear out some memory programmatically (in the pyspark
> code) when processing for a particular data set ends?
> At no point, I am caching any dataframe (so unpersist() is also not a
> solution).
>
> I am running spark using local[*] as master. There is a single
> SparkSession that is doing all the processing.
> If it is not possible to clear out memory, what can be a better approach
> for this problem?
>
> Can someone please help me with this and tell me if I am going wrong
> anywhere?
>
> --Thanks,
> Shuporno Choudhury
>
>
>
> --
> If you reply to this email, your message will be added to the discussion
> below:
>
> http://apache-spark-user-list.1001560.n3.nabble.com/PySpark-Releasing-memory-after-a-spark-job-is-finished-tp32454p32455.html
> To start a new topic under Apache Spark User List, email
> ml+s1001560n1...@n3.nabble.com
> To unsubscribe from Apache Spark User List, click here
> <http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_code=1=c2h1cG9ybm8uY2hvdWRodXJ5QGdtYWlsLmNvbXwxfC0xODI0MTU0MzQ0>
> .
> NAML
> <http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewer=instant_html%21nabble%3Aemail.naml=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>
>


-- 
--Thanks,
Shuporno Choudhury


[PySpark] Releasing memory after a spark job is finished

2018-06-04 Thread Shuporno Choudhury
Hi everyone,
I am trying to run a pyspark code on some data sets sequentially [basically
1. Read data into a dataframe 2.Perform some join/filter/aggregation 3.
Write modified data in parquet format to a target location]
Now, while running this pyspark code across *multiple independent data sets
sequentially*, the memory usage from the previous data set doesn't seem to
get released/cleared and hence spark's memory consumption (JVM memory
consumption from Task Manager) keeps on increasing till it fails at some
data set.
So, is there a way to clear/remove dataframes that I know are not going to
be used later?
Basically, can I clear out some memory programmatically (in the pyspark
code) when processing for a particular data set ends?
At no point, I am caching any dataframe (so unpersist() is also not a
solution).

I am running spark using local[*] as master. There is a single SparkSession
that is doing all the processing.
If it is not possible to clear out memory, what can be a better approach
for this problem?

Can someone please help me with this and tell me if I am going wrong
anywhere?

--Thanks,
Shuporno Choudhury


[pyspark] Read multiple files parallely into a single dataframe

2018-05-04 Thread Shuporno Choudhury
Hi,

I want to read multiple files parallely into 1 dataframe. But the files
have random names and cannot confirm to any pattern (so I can't use
wildcard). Also, the files can be in different directories.
If I provide the file names in a list to the dataframe reader, it reads
then sequentially.
Eg:
df=spark.read.format('csv').load(['/path/to/file1.csv.gz','/path/to/file2.csv.gz','/path/to/file3.csv.gz'])
This reads the files sequentially. What can I do to read the files
parallely?
I noticed that spark reads files parallely if provided directly the
directory location. How can that be extended to multiple random files?
Suppose if my system has 4 cores, how can I make spark read 4 files at a
time?

Please suggest.


Getting Corrupt Records while loading data into dataframe from csv file

2018-04-23 Thread Shuporno Choudhury
Hi all,

I have a manually created schema using which I am loading data from
multiple csv files to a dataframe.
Now, if there are certain records that fail the provided schema, is there a
way to get those rejected records and continue with the process of loading
data into the dataframe?
As of now, it seems the options that I have the are the 3 modes
(PERMISSIVE, DROPMALFORMED and FAILFAST), none of which seem to fulfill the
objective.


-- 
--Thanks,
Shuporno Choudhury


Multiple columns using 'isin' command in pyspark

2018-03-29 Thread Shuporno Choudhury
Hi Spark Users,

I am trying to achieve the 'IN' functionality of SQL using the isin
function in pyspark
Eg: select count(*) from tableA
  where (col1, col2) in ((1, 100),(2, 200), (3,300))

We can very well have 1 column isin statements like:
df.filter(df[0].isin(1,2,3)).count()

But, can I multiple columns in that statement like:
df.filter((df[0],df[1]).isin((1,100),(2,200),(3,300)).count()

Is this possible to achieve?
Or do I have to create multiple isin statements, merge them using '&'
condition and then execute the statemnt to get the final result?

Any help would be really appreciated.

-- 
Thanks,
Shuporno Choudhury