Re: No space left on device

2018-08-21 Thread Gourav Sengupta
Hi,

The best part about Spark is that it is showing you which configuration to
tweak as well. In case you are using EMR, try to see that the configuration
points to the right location in the cluster "spark.local.dir". If a disk is
mounted across all the systems with a common path (you can do that easily
in EMR) then you can change the configuration to point to that disk
location and thereby overcome the issue.

On another note also try to see why the data is being written to the disk,
is it too much shuffle, can you increase the shuffle memory as shown in the
error message using "spark.shuffle.memoryFraction"?

By any change have you changed from caching to persistent data frames?


Regards,
Gourav Sengupta



On Tue, Aug 21, 2018 at 12:04 PM Vitaliy Pisarev <
vitaliy.pisa...@biocatch.com> wrote:

> The other time when I encountered this I solved it by throwing more
> resources at it (stronger cluster).
> I was not able to understand the root cause though. I'll be happy to hear
> deeper insight as well.
>
> On Mon, Aug 20, 2018 at 7:08 PM, Steve Lewis 
> wrote:
>
>>
>> We are trying to run a job that has previously run on Spark 1.3 on a 
>> different cluster. The job was converted to 2.3 spark and this is a new 
>> cluster.
>>
>> The job dies after completing about a half dozen stages with
>>
>> java.io.IOException: No space left on device
>>
>>
>>It appears that the nodes are using local storage as tmp.
>>
>>
>> I could use help diagnosing the issue and how to fix it.
>>
>>
>> Here are the spark conf properties
>>
>> Spark Conf Properties
>> spark.driver.extraJavaOptions=-Djava.io.tmpdir=/scratch/home/int/eva/zorzan/sparktmp/
>> spark.master=spark://10.141.0.34:7077
>> spark.mesos.executor.memoryOverhead=3128
>> spark.shuffle.consolidateFiles=true
>> spark.shuffle.spill=falsespark.app.name=Anonymous
>> spark.shuffle.manager=sort
>> spark.storage.memoryFraction=0.3
>> spark.jars=file:/home/int/eva/zorzan/bin/SparkHydraV2-master/HydraSparkBuilt.jar
>> spark.ui.killEnabled=true
>> spark.shuffle.spill.compress=true
>> spark.shuffle.sort.bypassMergeThreshold=100
>> com.lordjoe.distributed.marker_property=spark_property_set
>> spark.executor.memory=12g
>> spark.mesos.coarse=true
>> spark.shuffle.memoryFraction=0.4
>> spark.serializer=org.apache.spark.serializer.KryoSerializer
>> spark.kryo.registrator=com.lordjoe.distributed.hydra.HydraKryoSerializer
>> spark.default.parallelism=360
>> spark.io.compression.codec=lz4
>> spark.reducer.maxMbInFlight=128
>> spark.hadoop.validateOutputSpecs=false
>> spark.submit.deployMode=client
>> spark.local.dir=/scratch/home/int/eva/zorzan/sparktmp
>> spark.shuffle.file.buffer.kb=1024
>>
>>
>>
>> --
>> Steven M. Lewis PhD
>> 4221 105th Ave NE
>> 
>> Kirkland, WA 98033
>> 
>> 206-384-1340 (cell)
>> Skype lordjoe_com
>>
>>
>


CBO not predicting cardinality on partition columns for Parquet tables

2018-08-21 Thread rajat mishra
Hi All,

I have an external table in spark whose underlying data files are in
parquet format.
The table is partitioned. When I try to computed the statistics for a query
where partition column is in where clause, the statistics returned contains
only the sizeInBytes and not the no of rows count.

  val ddl = """create external table test_p (Address String, Age
String, CustomerID string, CustomerName string, CustomerSuffix string,
Location string, Mobile String, Occupation String, Salary String )
PARTITIONED BY (Country string) Stored as PARQUET LOCATION
'/dev/test3'"""
  spark.sql(ddl)
  spark.sql("msck repair table test_p")

  spark.sql("Analyze table test_p compute statistics for columns
Address,Age,CustomerID,CustomerName,CustomerSuffix,Location,Mobile,Occupation,Salary,Country").show()
  spark.sql("Analyze table test_p partition(Country) compute statistics").show()

println(spark.sql("select * from test_p where
country='Korea'").queryExecution.toStringWithStats)


The output I get is :

== Parsed Logical Plan ==
'Project [*]
+- 'Filter ('country = Korea)
   +- 'UnresolvedRelation `test_p`

== Analyzed Logical Plan ==
Address: string, Age: string, CustomerID: string, CustomerName:
string, CustomerSuffix: string, Location: string, Mobile: string,
Occupation: string, Salary: string, Country: string
Project [Address#0, Age#1, CustomerID#2, CustomerName#3,
CustomerSuffix#4, Location#5, Mobile#6, Occupation#7, Salary#8,
Country#9]
+- Filter (country#9 = Korea)
   +- SubqueryAlias test_p
  +- 
Relation[Address#0,Age#1,CustomerID#2,CustomerName#3,CustomerSuffix#4,Location#5,Mobile#6,Occupation#7,Salary#8,Country#9]
parquet

== Optimized Logical Plan ==
Project [Address#0, Age#1, CustomerID#2, CustomerName#3,
CustomerSuffix#4, Location#5, Mobile#6, Occupation#7, Salary#8,
Country#9], Statistics(sizeInBytes=2.2 KB, hints=none)
+- Filter (isnotnull(country#9) && (country#9 = Korea)),
Statistics(sizeInBytes=2.2 KB, hints=none)
   +- 
Relation[Address#0,Age#1,CustomerID#2,CustomerName#3,CustomerSuffix#4,Location#5,Mobile#6,Occupation#7,Salary#8,Country#9]
parquet, Statistics(sizeInBytes=2.2 KB, hints=none)

== Physical Plan ==
*FileScan parquet
default.test_p[Address#0,Age#1,CustomerID#2,CustomerName#3,CustomerSuffix#4,Location#5,Mobile#6,Occupation#7,Salary#8,Country#9]
Batched: true, Format: Parquet, Location:
PrunedInMemoryFileIndex[file:/C:/dev/tests2/Country=Korea],
PartitionCount: 1, PartitionFilters: [isnotnull(Country#9), (Country#9
= Korea)], PushedFilters: [], ReadSchema:
struct

Insert a pyspark dataframe in postgresql

2018-08-21 Thread dimitris plakas
Hello everyone here is a case that i am facing,

i have a pyspark application that as it's last step is to create a pyspark
dataframe with two columns
(column1, column2). This dataframe has only one row and i want this row to
be inserted in a postgres db table. In every run this line in the dataframe
may be different or the same.

I want after 10 runs to have 10 rows in my postgres table, so i want to
insert that dataframe in a postgres table in every run. What i have done up
to now is to use the below code but it doesn't insert the new row after
every run of my pyspark application, it just overwrites the old row.

Here is my code:

test_write_df.write.mode('append').options(url='jdbc:postgresql://localhost:5432/Test_Db',
dbtable='test_write_df',driver='org.postgresql.Driver',user='postgres',
password='my_password')

Can you please consult me if is this feasible to be happened and/or how to
achieve this?

Thank you in advance


Re: Structured Streaming on Kubernetes

2018-08-21 Thread puneetloya
Thanks for putting a comprehensive observation about Spark on Kubernetes. In
mesos Spark deployment, it has a property called spark.mesos.extra.cores.
The property means:
*
Set the extra number of cores for an executor to advertise. This does not
result in more cores allocated. It instead means that an executor will
"pretend" it has more cores, so that the driver will send it more tasks. Use
this to increase parallelism. This setting is only used for Mesos
coarse-grained mode*

Can this be used to increase parallelism? Are there other better ways to
increase parallelism in Kubernetes?





--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: No space left on device

2018-08-21 Thread Vitaliy Pisarev
The other time when I encountered this I solved it by throwing more
resources at it (stronger cluster).
I was not able to understand the root cause though. I'll be happy to hear
deeper insight as well.

On Mon, Aug 20, 2018 at 7:08 PM, Steve Lewis  wrote:

>
> We are trying to run a job that has previously run on Spark 1.3 on a 
> different cluster. The job was converted to 2.3 spark and this is a new 
> cluster.
>
> The job dies after completing about a half dozen stages with
>
> java.io.IOException: No space left on device
>
>
>It appears that the nodes are using local storage as tmp.
>
>
> I could use help diagnosing the issue and how to fix it.
>
>
> Here are the spark conf properties
>
> Spark Conf Properties
> spark.driver.extraJavaOptions=-Djava.io.tmpdir=/scratch/home/int/eva/zorzan/sparktmp/
> spark.master=spark://10.141.0.34:7077
> spark.mesos.executor.memoryOverhead=3128
> spark.shuffle.consolidateFiles=true
> spark.shuffle.spill=falsespark.app.name=Anonymous
> spark.shuffle.manager=sort
> spark.storage.memoryFraction=0.3
> spark.jars=file:/home/int/eva/zorzan/bin/SparkHydraV2-master/HydraSparkBuilt.jar
> spark.ui.killEnabled=true
> spark.shuffle.spill.compress=true
> spark.shuffle.sort.bypassMergeThreshold=100
> com.lordjoe.distributed.marker_property=spark_property_set
> spark.executor.memory=12g
> spark.mesos.coarse=true
> spark.shuffle.memoryFraction=0.4
> spark.serializer=org.apache.spark.serializer.KryoSerializer
> spark.kryo.registrator=com.lordjoe.distributed.hydra.HydraKryoSerializer
> spark.default.parallelism=360
> spark.io.compression.codec=lz4
> spark.reducer.maxMbInFlight=128
> spark.hadoop.validateOutputSpecs=false
> spark.submit.deployMode=client
> spark.local.dir=/scratch/home/int/eva/zorzan/sparktmp
> spark.shuffle.file.buffer.kb=1024
>
>
>
> --
> Steven M. Lewis PhD
> 4221 105th Ave NE
> 
> Kirkland, WA 98033
> 
> 206-384-1340 (cell)
> Skype lordjoe_com
>
>