Re: is there a way to persist the lineages generated by spark?
This is not quite what you are asking, but I often save intermediate results down to parquet files so I can diagnose problems and rebuild data from a known good state without having to re-run every processing step. On Fri, Apr 7, 2017 at 1:08 AM, kant kodaliwrote: > yes Lineage that is actually replayable is what is needed for Validation > process. So we can address questions like how a system arrived at a state S > at a time T. I guess a good analogy is event sourcing. > > > On Thu, Apr 6, 2017 at 10:30 PM, Jörn Franke wrote: > >> I do think this is the right way, you will have to do testing with test >> data verifying that the expected output of the calculation is the output. >> Even if the logical Plan Is correct your calculation might not be. E.g. >> There can be bugs in Spark, in the UI or (what is very often) the client >> describes a calculation, but in the end the description is wrong. >> >> > On 4. Apr 2017, at 05:19, kant kodali wrote: >> > >> > Hi All, >> > >> > I am wondering if there a way to persist the lineages generated by >> spark underneath? Some of our clients want us to prove if the result of the >> computation that we are showing on a dashboard is correct and for that If >> we can show the lineage of transformations that are executed to get to the >> result then that can be the Q.E.D moment but I am not even sure if this is >> even possible with spark? >> > >> > Thanks, >> > kant >> > >
Re: [Discuss][Spark staging dir] way to disable spark writing to _temporary
Yash, We (Netflix) built a committer that uses the S3 multipart upload API to avoid the copy problem and still handle task failures. You can build and use the copy posted here: https://github.com/rdblue/s3committer You're probably interested in the S3PartitionedOutputCommitter. rb On Thu, Apr 6, 2017 at 10:08 PM, Yash Sharmawrote: > Hi All, > This is another issue that I was facing with the spark - s3 operability > and wanted to ask to the broader community if its faced by anyone else. > > I have a rather simple aggregation query with a basic transformation. The > output however has lot of output partitions (20K partitions). The spark job > runs very fast and reaches the end without any failures. So far the spark > job has been writing to the staging dir and runs alright. > > As soon as spark starts renaming these files it faces 2 issues: > 1. s3 single path renames are insanely slow : and the job spends huge time > renaming these files > 2. Sometimes renames fail : spark probably has checks after writing the > file (not sure) and sometimes few renames fail randomly because of s3's > eventual consistency, causing the job to fail intermittently. [added logs > at end] > > I was wondering what could be some work arounds for this problem or is it > possible to override this behavior and write files directly to the expected > paths (skipping the staging dir _temporary). > > Cheers, > Yash > > {logs} > java.io.IOException: Failed to rename FileStatus{path=s3:// > instances/_temporary/0/task_201704060437_0005_m_52/ > utc_date=2012-06-19/product=obsolete; isDirectory=true; > modification_time=0; access_time=0; owner=; group=; permission=rwxrwxrwx; > isSymlink=false} to s3://instances/utc_date=2012-06-19/product=obsolete > at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter. > renameOrMerge(FileOutputCommitter.java:441) > at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.mergePaths( > FileOutputCommitter.java:432) > at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.mergePaths( > FileOutputCommitter.java:428) > at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.mergePaths( > FileOutputCommitter.java:428) > ... > ... > InsertIntoHadoopFsRelationCommand.scala:115) > at org.apache.spark.sql.execution.datasources. > InsertIntoHadoopFsRelationCommand$$anonfun$run$1.apply( > InsertIntoHadoopFsRelationCommand.scala:115) > at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId( > SQLExecution.scala:57) > at org.apache.spark.sql.execution.datasources. > InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationComm > and.scala:115) > ... > ... > at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run( > ApplicationMaster.scala:627) > 17/04/06 04:41:54 ERROR DynamicPartitionWriterContainer: Job > job_201704060436_ aborted. > 17/04/06 04:41:54 ERROR ActiveInstances$: Exception in running > ActiveInstances. > org.apache.spark.SparkException: Job aborted. > at org.apache.spark.sql.execution.datasources. > InsertIntoHadoopFsRelationCommand$$anonfun$run$1.apply$mcV$sp( > InsertIntoHadoopFsRelationCommand.scala:149) > at org.apache.spark.sql.execution.datasources. > InsertIntoHadoopFsRelationCommand$$anonfun$run$1.apply( > InsertIntoHadoopFsRelationCommand.scala:115) > > {logs} > > > -- Ryan Blue Software Engineer Netflix
Re: is there a way to persist the lineages generated by spark?
yes Lineage that is actually replayable is what is needed for Validation process. So we can address questions like how a system arrived at a state S at a time T. I guess a good analogy is event sourcing. On Thu, Apr 6, 2017 at 10:30 PM, Jörn Frankewrote: > I do think this is the right way, you will have to do testing with test > data verifying that the expected output of the calculation is the output. > Even if the logical Plan Is correct your calculation might not be. E.g. > There can be bugs in Spark, in the UI or (what is very often) the client > describes a calculation, but in the end the description is wrong. > > > On 4. Apr 2017, at 05:19, kant kodali wrote: > > > > Hi All, > > > > I am wondering if there a way to persist the lineages generated by spark > underneath? Some of our clients want us to prove if the result of the > computation that we are showing on a dashboard is correct and for that If > we can show the lineage of transformations that are executed to get to the > result then that can be the Q.E.D moment but I am not even sure if this is > even possible with spark? > > > > Thanks, > > kant >