Re: [Pyspark 2.4] not able to partition the data frame by dates

2019-07-31 Thread Rishi Shah
Thanks for your prompt reply Gourav. I am using Spark 2.4.0 (cloudera
distribution). The job consistently threw this error, so I narrowed down
the dataset by adding a date filter (date rang: 2018-01-01 to 2018-06-30)..
However it's still throwing the same error!

*command*: spark2-submit --master yarn --deploy-mode client
--executor-memory 15G --executor-cores 5 samplerestage.py
cluster: 4 nodes, 32 cores each 256GB RAM

This is the only job running, with 20 executors...

I would really like to know the best practice around creating partitioned
table using pays-ark - every time I need to partition huge dataset, I run
into such issues. Appreciate your help!


On Wed, Jul 31, 2019 at 10:58 PM Gourav Sengupta 
wrote:

> Hi Rishi,
>
> there is no version as 2.4 :), can you please specify the exact SPARK
> version you are using? How are you starting the SPARK session? And what is
> the environment?
>
> I know this issue occurs intermittently over large writes in S3 and has to
> do with S3 eventual consistency issues. Just restarting the job sometimes
> helps.
>
>
> Regards,
> Gourav Sengupta
>
> On Thu, Aug 1, 2019 at 3:55 AM Rishi Shah 
> wrote:
>
>> Hi All,
>>
>> I have a dataframe of size 2.7T (parquet) which I need to partition by
>> date, however below spark program doesn't help - keeps failing due to *file
>> already exists exception..*
>>
>> df = spark.read.parquet(INPUT_PATH)
>>
>> df.repartition('date_field').write.partitionBy('date_field').mode('overwrite').parquet(PATH)
>>
>> I did notice that couple of tasks failed and probably that's why it tried
>> spinning up new ones which write to the same .staging directory?
>>
>> --
>> Regards,
>>
>> Rishi Shah
>>
>

-- 
Regards,

Rishi Shah


Re: [Pyspark 2.4] not able to partition the data frame by dates

2019-07-31 Thread Gourav Sengupta
Hi Rishi,

there is no version as 2.4 :), can you please specify the exact SPARK
version you are using? How are you starting the SPARK session? And what is
the environment?

I know this issue occurs intermittently over large writes in S3 and has to
do with S3 eventual consistency issues. Just restarting the job sometimes
helps.


Regards,
Gourav Sengupta

On Thu, Aug 1, 2019 at 3:55 AM Rishi Shah  wrote:

> Hi All,
>
> I have a dataframe of size 2.7T (parquet) which I need to partition by
> date, however below spark program doesn't help - keeps failing due to *file
> already exists exception..*
>
> df = spark.read.parquet(INPUT_PATH)
>
> df.repartition('date_field').write.partitionBy('date_field').mode('overwrite').parquet(PATH)
>
> I did notice that couple of tasks failed and probably that's why it tried
> spinning up new ones which write to the same .staging directory?
>
> --
> Regards,
>
> Rishi Shah
>


[Pyspark 2.4] not able to partition the data frame by dates

2019-07-31 Thread Rishi Shah
Hi All,

I have a dataframe of size 2.7T (parquet) which I need to partition by
date, however below spark program doesn't help - keeps failing due to *file
already exists exception..*

df = spark.read.parquet(INPUT_PATH)
df.repartition('date_field').write.partitionBy('date_field').mode('overwrite').parquet(PATH)

I did notice that couple of tasks failed and probably that's why it tried
spinning up new ones which write to the same .staging directory?

-- 
Regards,

Rishi Shah


Announcing .NET for Apache Spark 0.4.0

2019-07-31 Thread Terry Kim
We are thrilled to announce that .NET for Apache Spark 0.4.0 has been just
released !



Some of the highlights of this release include:

   - Apache Arrow backed UDFs (Vector UDF, Grouped Map UDF)
   - Robust UDF-related assembly loading
   - Local UDF debugging



The release notes

include the full list of features/improvements of this release.



We would like to thank all those who contributed to this release.



Thanks,

Terry


Re: Spark Image resizing

2019-07-31 Thread Patrick McCarthy
It won't be very efficient but you could write a python UDF using
PythonMagick - https://wiki.python.org/moin/ImageMagick

If you have PyArrow > 0.10 then you might be able to get a boost by saving
images in a column as BinaryType and writing a PandasUDF.

On Wed, Jul 31, 2019 at 6:22 AM Nick Dawes  wrote:

> Any other way of resizing the image before creating the DataFrame in
> Spark? I know opencv does it. But I don't have opencv on my cluster. I have
> Anaconda python packages installed on my cluster.
>
> Any ideas will be appreciated.  Thank you!
>
> On Tue, Jul 30, 2019, 4:17 PM Nick Dawes  wrote:
>
>> Hi
>>
>> I'm new to spark image data source.
>>
>> After creating a dataframe using Spark's image data source, I would like
>> to resize the images in PySpark.
>>
>> df = spark.read.format("image").load(imageDir)
>>
>> Can you please help me with this?
>>
>> Nick
>>
>

-- 


*Patrick McCarthy  *

Senior Data Scientist, Machine Learning Engineering

Dstillery

470 Park Ave South, 17th Floor, NYC 10016


Re: Core allocation is scattered

2019-07-31 Thread Muthu Jayakumar
>I am running a spark job with 20 cores but i did not understand why my
application get 1-2 cores on couple of machines why not it just run on two
nodes like node1=16 cores and node 2=4 cores . but cores are allocated like
node1=2 node =1-node 14=1 like that.

I believe that's the intended behavior for spark. Please refer to
https://spark.apache.org/docs/latest/spark-standalone.html#cluster-launch-scripts
section on 'spark.deploy.spreadOut' mode.If I understand correctly, you may
want " spark.deploy.spreadOut  false".

Hope it helps!

Happy Spark(ing).

On Thu, Jul 25, 2019 at 7:22 PM Srikanth Sriram <
sriramsrikanth1...@gmail.com> wrote:

> Hello,
>
> Below is my understanding.
>
> The default configuration parameters which will be considered by the spark
> job if these are not configured at the time of submitting job to the
> required values.
>
> # - SPARK_EXECUTOR_INSTANCES, Number of workers to start (Default: 2)
> # - SPARK_EXECUTOR_CORES, Number of cores for the workers (Default: 1).
> # - SPARK_EXECUTOR_MEMORY, Memory per Worker (e.g. 1000M, 2G) (Default: 1G)
>
> SPARK_EXECUTOR_INSTANCES -> indicates the number of workers to be started,
> it means for a job maximum this many number of executors it can ask/take
> from the cluster resource manager.
>
> SPARK_EXECUTOR_CORES -> indicates the number of cores in each executor, it
> means the spark TaskScheduler will ask this many cores to be
> allocated/blocked in each of the executor machine.
>
> SPARK_EXECUTOR_MEMORY -> indicates the maximum amount of RAM/MEMORY it
> requires in each executor.
>
> All these details are asked by the TastScheduler to the cluster manager
> (it may be a spark standalone, yarn, mesos and can be kubernetes supported
> starting from spark 2.0) to provide before actually the job execution
> starts.
>
> Also, please note that, initial number of executor instances is dependent
> on "--num-executors" but when the data is more to be processed and
> "spark.dynamicAllocation.enabled" set true, then it will be dynamically add
> more executors based on "spark.dynamicAllocation.initialExecutors".
>
> Note: Always "spark.dynamicAllocation.initialExecutors" should be
> configured greater than "--num-executors".
> spark.dynamicAllocation.initialExecutors
> spark.dynamicAllocation.minExecutors Initial number of executors to run
> if dynamic allocation is enabled.
>
> If `--num-executors` (or `spark.executor.instances`) is set and larger
> than this value, it will be used as the initial number of executors.
> spark.executor.memory 1g Amount of memory to use per executor process, in
> the same format as JVM memory strings with a size unit suffix ("k", "m",
> "g" or "t") (e.g. 512m, 2g).
> spark.executor.cores 1 in YARN mode, all the available cores on the
> worker in standalone and Mesos coarse-grained modes. The number of cores
> to use on each executor. In standalone and Mesos coarse-grained modes, for
> more detail, see this description
> 
> .
>
> On Thu, Jul 25, 2019 at 5:54 PM Amit Sharma  wrote:
>
>> I have cluster with 26 nodes having 16 cores on each. I am running a
>> spark job with 20 cores but i did not understand why my application get 1-2
>> cores on couple of machines why not it just run on two nodes like node1=16
>> cores and node 2=4 cores . but cores are allocated like node1=2 node
>> =1-node 14=1 like that. Is there any conf property i need to
>> change. I know with dynamic allocation we can use below but without dynamic
>> allocation is there any?
>> --conf "spark.dynamicAllocation.maxExecutors=2"
>>
>>
>> Thanks
>> Amit
>>
>
>
> --
> Regards,
> Srikanth Sriram
>


Re: Kafka Integration libraries put in the fat jar

2019-07-31 Thread Spico Florin
Hi!
 Thanks to Jacek Laskowski
, I found the
answer here

https://stackoverflow.com/questions/51792203/how-to-get-spark-kafka-org-apache-sparkspark-sql-kafka-0-10-2-112-1-0-dependen

Just add the maven shade plugin:

 
org.apache.maven.plugins
maven-shade-plugin
3.0.0


package

shade




*:*

META-INF/*.SF
META-INF/*.DSA
META-INF/*.RSA






META-INF/services/org.apache.spark.sql.sources.DataSourceRegister



org.apache.spark.examples.sql.streaming.JavaStructuredKafkaWordCount








On Tue, Jul 30, 2019 at 4:38 PM Spico Florin  wrote:

> Hello!
>
> I would like to use the spark structured streaming integrated with Kafka
> the way is described here:
>
> https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html
>
>
> but I got the following issue:
>
> Caused by: org.apache.spark.sql.AnalysisException: Failed to find data
> source: kafka. Please deploy the application as per the deployment section
> of "Structured Streaming + Kafka Integration Guide".;
>
> eventhough  I've added in the generated fat jar the kafka-sql dependencies:
>  
> org.apache.spark
> spark-sql-kafka-0-10_2.11
> 2.4.3
> compile
> 
>
> When I submit with the command
>
> spark-submit  --master spark://spark-master:7077  --class myClass
> --deploy-mode client *--packages
> org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.3
> my-fat-jar-with-dependencies.jar*
>
> the problem is gone.
>
> Since the packages option requires to download the libaries from an
> environment that has access to internet and I don't have it, can you please
> advice what can I do to add kafka dependecies either in the fat jar or
> other solution.
>
> Thank you.
>
> Regards,
>
> Florin
>
>
>


Re: Spark Image resizing

2019-07-31 Thread Nick Dawes
Any other way of resizing the image before creating the DataFrame in Spark?
I know opencv does it. But I don't have opencv on my cluster. I have
Anaconda python packages installed on my cluster.

Any ideas will be appreciated.  Thank you!

On Tue, Jul 30, 2019, 4:17 PM Nick Dawes  wrote:

> Hi
>
> I'm new to spark image data source.
>
> After creating a dataframe using Spark's image data source, I would like
> to resize the images in PySpark.
>
> df = spark.read.format("image").load(imageDir)
>
> Can you please help me with this?
>
> Nick
>