Standalone Spark, How to find (driver's ) final status for an application

2019-09-25 Thread Nilkanth Patel
I am setting up *Spark 2.2.0 in standalone mode* (
https://spark.apache.org/docs/latest/spark-standalone.html) and submitting
spark jobs programatically using

 SparkLauncher sparkAppLauncher = new
SparkLauncher(userNameMap).setMaster(sparkMaster).setAppName(appName).;
 SparkAppHandle sparkAppHandle = sparkAppLauncher.startApplication();

 I do have* java client program that polls Job status for the jobs
submitted programatically*, for which i am using following REST endpoint.
 curl  http://192.168.1.139:8080/json/ which provide JSON response as
following,

{
  "url" : "spark://192.168.1.139:7077",
  "workers" : [ { "id" : "x", "host" : "x", "port" : x, "webuiaddress" : "x",
  "cores" : x,  "coresused" : x, "coresfree" : x,
"memory" : xx,
  "memoryused" : xx,  "memoryfree" : xx,  "state" :
"x", "lastheartbeat" : x
}, { ...},  ],
  "cores" : x,
  "coresused" : x,
  "memory" : x,
  "memoryused" : x,
  "activeapps" : [ ],
  "completedapps" : [ { "starttime" : x, "id" : "app-xx-", "name"
: "abc", "user" : "xx",
 "memoryperslave" : x, "submitdate" :
"x","state" : "FINISHED OR RUNNING", "duration" : x
  }, {...}],
  "activedrivers" : [ ],
  "status" : "x"}


In above response, I have observed state for *completedapps is always
FINISHED even if application fails*, while on UI (http://master:8080),
associated driver shows a failed state, like.

[image: image.png]

[image: image.png]

Referring to above example, Currently, My java client gets status  for
application (app-20190925115750-0003
) FINISHED,
even though it got failed (encountered exception) and associated driver
shows "FAILED" state.* I intend to show the final status in this case as
FAILED.*
It seems if i can co-relate, an application-id (app-20190925115750-0003
) to driver-id
(driver-20190925115748-0003), I can report a "FAILED" (final) status. I
could not find any co-relation between them (appID --> driver ID).

*Looking forward to your suggestions to resolving this or any possible
approaches to achieve this.* I have also come across some hidden REST APIs
like
http://xx.xx.xx.xx:6066/v1/submissions/status/driver-20190925115748-0003, which
seems have a limited info returned in response.


Thanks in advance.
Nilkanth.


Re: intermittent Kryo serialization failures in Spark

2019-09-25 Thread Jerry Vinokurov
Hi Julien,

Thanks for the suggestion. If we don't do a broadcast, that would
presumably affect the performance of the job, as the model that is failing
to be broadcast is something that we need to be shared across the cluster.
But it may be worth it if the trade-off is not having things run properly.
Vadim's suggestions did not make a difference for me (still hitting this
error several times a day) but I'll try with disabling broadcast and see if
that does anything.

thanks,
Jerry

On Fri, Sep 20, 2019 at 10:00 AM Julien Laurenceau <
julien.laurenc...@pepitedata.com> wrote:

> Hi,
> Did you try without the broadcast ?
> Regards
> JL
>
> Le jeu. 19 sept. 2019 à 06:41, Vadim Semenov 
> a écrit :
>
>> Pre-register your classes:
>>
>> ```
>> import com.esotericsoftware.kryo.Kryo
>> import org.apache.spark.serializer.KryoRegistrator
>>
>> class MyKryoRegistrator extends KryoRegistrator {
>>   override def registerClasses(kryo: Kryo): Unit = {
>> kryo.register(Class.forName("[[B")) // byte[][]
>> kryo.register(classOf[java.lang.Class[_]])
>>   }
>> }
>> ```
>>
>> then run with
>>
>> 'spark.kryo.referenceTracking': 'false',
>> 'spark.kryo.registrationRequired': 'false',
>> 'spark.kryo.registrator': 'com.datadog.spark.MyKryoRegistrator',
>> 'spark.kryo.unsafe': 'false',
>> 'spark.kryoserializer.buffer.max': '256m',
>>
>> On Tue, Sep 17, 2019 at 10:38 AM Jerry Vinokurov 
>> wrote:
>>
>>> Hi folks,
>>>
>>> Posted this some time ago but the problem continues to bedevil us. I'm
>>> including a (slightly edited) stack trace that results from this error. If
>>> anyone can shed any light on what exactly is happening here and what we can
>>> do to avoid it, that would be much appreciated.
>>>
>>> org.apache.spark.SparkException: Failed to register classes with Kryo
at 
 org.apache.spark.serializer.KryoSerializer.newKryo(KryoSerializer.scala:140)
at 
 org.apache.spark.serializer.KryoSerializerInstance.borrowKryo(KryoSerializer.scala:324)
at 
 org.apache.spark.serializer.KryoSerializerInstance.(KryoSerializer.scala:309)
at 
 org.apache.spark.serializer.KryoSerializer.newInstance(KryoSerializer.scala:218)
at 
 org.apache.spark.broadcast.TorrentBroadcast$.blockifyObject(TorrentBroadcast.scala:288)
at 
 org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:127)
at 
 org.apache.spark.broadcast.TorrentBroadcast.(TorrentBroadcast.scala:88)
at 
 org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:34)
at 
 org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:62)
at org.apache.spark.SparkContext.broadcast(SparkContext.scala:1489)
at 
 org.apache.spark.sql.execution.datasources.csv.CSVFileFormat.buildReader(CSVFileFormat.scala:103)
at 
 org.apache.spark.sql.execution.datasources.FileFormat$class.buildReaderWithPartitionValues(FileFormat.scala:129)
at 
 org.apache.spark.sql.execution.datasources.TextBasedFileFormat.buildReaderWithPartitionValues(FileFormat.scala:165)
at 
 org.apache.spark.sql.execution.FileSourceScanExec.inputRDD$lzycompute(DataSourceScanExec.scala:309)
at 
 org.apache.spark.sql.execution.FileSourceScanExec.inputRDD(DataSourceScanExec.scala:305)
at 
 org.apache.spark.sql.execution.FileSourceScanExec.inputRDDs(DataSourceScanExec.scala:327)
at 
 org.apache.spark.sql.execution.FilterExec.inputRDDs(basicPhysicalOperators.scala:121)
at 
 org.apache.spark.sql.execution.ProjectExec.inputRDDs(basicPhysicalOperators.scala:41)
at 
 org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:627)
at 
 org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
at 
 org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
at 
 org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:156)
at 
 org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at 
 org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
at 
 org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.prepareShuffleDependency(ShuffleExchangeExec.scala:92)
at 
 org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$$anonfun$doExecute$1.apply(ShuffleExchangeExec.scala:128)
at 
 org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$$anonfun$doExecute$1.apply(ShuffleExchangeExec.scala:119)
at 
 org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:52)
at 
 org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.doExecute(ShuffleExchangeExec.scala:119)
  

Intermittently getting "Can not create the managed table error" while creating table from spark 2.4

2019-09-25 Thread abhijeet bedagkar
Hi,

We are facing below error in spark 2.4 intermittently when saving the
managed table from spark.

Error -
pyspark.sql.utils.AnalysisException: u"Can not create the managed
table('`hive_issue`.`table`'). The associated
location('s3://{bucket_name}/EMRFS_WARE_TEST167_new/warehouse/hive_issue.db/table')
already exists.;"

Steps to reproduce--
1. Create dataframe from spark mid size data (30MB CSV file)
2. Save dataframe as a table
3. Terminate the session when above mentioned operation is in progress

Note--
Session termination is just a way to repro this issue. In real time we are
facing this issue intermittently when we are running same spark jobs
multiple times. We use EMRFS and HDFS from EMR cluster and we face the same
issue on both of the systems.
The only ways we can fix this is by deleting the target folder where table
will keep its files which is not option for us and we need to keep
historical information in the table hence we use APPEND mode while writing
to table.


Sample code--
from pyspark.sql import SparkSession
sc = SparkSession.builder.enableHiveSupport().getOrCreate()
df = sc.read.csv("s3://{sample-bucket}1/DATA/consumecomplians.csv")
print "STARTED WRITING TO TABLE"
# Terminate session using ctrl + c after this statement post df.write
action started
df.write.mode("append").saveAsTable("hive_issue.table")
print "COMPLETED WRITING TO TABLE"

We went through the documentation of spark 2.4 [1] and found that spark is
no longer allowing to create manage tables on non empty folders.

1. Any reason behind change in the spatk behaviour
2. To us it looks like a breaking change as despite specifying "overwrite"
option spark in unable to wipe out existing data and create tables
3. Do we have any solution for this issue.

[1]
https://spark.apache.org/docs/latest/sql-migration-guide-upgrade.html

Thanks,
Abhijeet