Re: saveAsNewAPIHadoopDataset must not enable speculation for parquet file?

2018-04-26 Thread cane
Thanks Steve!
I will study about links you mentioned!



--
Sent from: http://apache-spark-developers-list.1001551.n3.nabble.com/

-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org



Re: Sorting on a streaming dataframe

2018-04-26 Thread Michael Armbrust
The basic tenet of structured streaming is that a query should return the
same answer in streaming or batch mode. We support sorting in complete mode
because we have all the data and can sort it correctly and return the full
answer.  In update or append mode, sorting would only return a correct
answer if we could promise that records that sort lower are going to arrive
later (and we can't).  Therefore, it is disallowed.

If you are just looking for a unique, stable id and you are already using
kafka as the source, you could just combine the partition id and the
offset. The structured streaming connector to Kafka

exposes both of these in the schema of the streaming DataFrame. (similarly
for kinesis you can use the shard id and sequence number)

If you need the IDs to be contiguous, then this is a somewhat fundamentally
hard problem.  I think the best we could do is add support
for monotonically_increasing_id() in streaming dataframes.

On Tue, Apr 24, 2018 at 1:38 PM, Chayapan Khannabha 
wrote:

> Perhaps your use case fits to Apache Kafka better.
>
> More info at:
> https://kafka.apache.org/documentation/streams/
>
> Everything really comes down to the architecture design and algorithm
> spec. However, from my experience with Spark, there are many good reasons
> why this requirement is not supported ;)
>
> Best,
>
> Chayapan (A)
>
>
> On Apr 24, 2018, at 2:18 PM, Hemant Bhanawat  wrote:
>
> Thanks Chris. There are many ways in which I can solve this problem but
> they are cumbersome. The easiest way would have been to sort the streaming
> dataframe. The reason I asked this question is because I could not find a
> reason why sorting on streaming dataframe is disallowed.
>
> Hemant
>
> On Mon, Apr 16, 2018 at 6:09 PM, Bowden, Chris <
> chris.bow...@microfocus.com> wrote:
>
>> You can happily sort the underlying RDD of InternalRow(s) inside a sink,
>> assuming you are willing to implement and maintain your own sink(s). That
>> is, just grabbing the parquet sink, etc. isn’t going to work out of the
>> box. Alternatively map/flatMapGroupsWithState is probably sufficient and
>> requires less working knowledge to make effective reuse of internals. Just
>> group by foo and then sort accordingly and assign ids. The id counter can
>> be stateful per group. Sometimes this problem may not need to be solved at
>> all. For example, if you are using kafka, a proper partitioning scheme and
>> message offsets may be “good enough”.
>> --
>> *From:* Hemant Bhanawat 
>> *Sent:* Thursday, April 12, 2018 11:42:59 PM
>> *To:* Reynold Xin
>> *Cc:* dev
>> *Subject:* Re: Sorting on a streaming dataframe
>>
>> Well, we want to assign snapshot ids (incrementing counters) to the
>> incoming records. For that, we are zipping the streaming rdds with that
>> counter using a modified version of ZippedWithIndexRDD. We are ok if the
>> records in the streaming dataframe gets counters in random order but the
>> counter should always be incrementing.
>>
>> This is working fine until we have a failure. When we have a failure, we
>> re-assign the records to snapshot ids  and this time same snapshot id can
>> get assigned to a different record. This is a problem because the primary
>> key in our storage engine is . So we want to sort the
>> dataframe so that the records always get the same snapshot id.
>>
>>
>>
>> On Fri, Apr 13, 2018 at 11:43 AM, Reynold Xin 
>> wrote:
>>
>> Can you describe your use case more?
>>
>> On Thu, Apr 12, 2018 at 11:12 PM Hemant Bhanawat 
>> wrote:
>>
>> Hi Guys,
>>
>> Why is sorting on streaming dataframes not supported(unless it is
>> complete mode)? My downstream needs me to sort the streaming dataframe.
>>
>> Hemant
>>
>>
>>
>
>


Re: saveAsNewAPIHadoopDataset must not enable speculation for parquet file?

2018-04-26 Thread Steve Loughran

sorry, not noticed this followup. Been busy with other issues

On 3 Apr 2018, at 11:19, cane 
mailto:zhoukang199...@gmail.com>> wrote:

Now, if we use saveAsNewAPIHadoopDataset with speculation enable.It may cause
data loss.
I check the comment of thi api:

 We should make sure our tasks are idempotent when speculation is enabled,
i.e. do
  * not use output committer that writes data directly.
  * There is an example in
https://issues.apache.org/jira/browse/SPARK-10063 to show the bad
  * result of using direct output committer with speculation enabled.
  */

But if this the rule we must follow?
For example,for parquet it will got ParquetOutPutCommitter.
In this case, speculation must disable for parquet?

ParquetOutputCommitter is a subclass of Hadoop's FileOutputCommitter, so you 
get the choice of its two algorithms, as set by 
spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version


algorithm 1 :
- tasks write to _temporary/$jobId/_temporary/$taskId directory,
- task commit to _temporary/$jobId$taskId in what for a real FS is an O(1) 
atomic operation. ; speculation and retry straightforward.
 -job commit: copy the contents of all the task ID directories to the 
destination, create _SUCCESS file
 job commit is non-atomic, If a job fails during commit you need to delete the 
dest dir and try again.

alogirthm2: :
- tasks write to _temporary/$jobId/_temporary/$taskId directory,
 -task commit: merge to dest directory, potentially while other tasks are doing 
a merge at the same time.
 -Job commit does nothing but create the _SUCCESS file, and can be repeated.

you can speculate with either, but if a task using algorithm 2 fails during 
task commit then there's a problem, as the store is in an unknown state. 
Neither MapReduce nor Spark worry about this. Usually its fast so the window of 
failure pretty small, when you are working with object stores that doesn't 
hold. Really they should react to that failure by aborting the job, but as 
object stores tend to have their own issues, this is more of a detail than the 
underlying flaw.

As I said, you can read

https://github.com/steveloughran/zero-rename-committer/releases/download/tag_draft_003/a_zero_rename_committer.pdf

and a precursor attempt to document what goes in the depths of 
FileOutpuitCommitter (which has an error in one of the code samples; I forget 
which. The paper fixes that)

http://hadoop.apache.org/docs/r3.1.0/hadoop-aws/tools/hadoop-aws/committer_architecture.html

+ an IBM paper on their Swift committer for spark:  Stocator: A High 
Performance Object Store Connector for Spark: https://arxiv.org/pdf/1709.01812


I have some issues with that paper, but its worthwhile looking at to see their 
focus on rollback over temp directories
http://steveloughran.blogspot.co.uk/2017/09/stocator-high-performance-object-store.html



Is there some one know the history?

If you check out hadoop, you can get the history after the svn -> git 
migration, though the earlier history is lost in folklore, primarily stories of 
"what went wrong" at Yahoo!.

https://github.com/apache/hadoop/commits/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileOutputCommitter.java
https://github.com/apache/hadoop/commits/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileOutputCommitter.java

For spark, look at
https://issues.apache.org/jira/browse/SPARK-4879

and the git logs of

core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala

Once you start looking at the commit protocols, you end up in a fascinating 
world where things like proof of correctness start to matter. Sadly, everyone 
is constrained not just but our lack of everyday use of the language and tools, 
but by the lack of a foundation of specs of the underlying storage systems. 
There is one for a model of a consistent s3 store, 
https://issues.apache.org/jira/secure/attachment/12865161/objectstore.pdf , but 
I couldn't work out how to define an eventually consistent one in TLA+.. 
Contributions welcome.

-Steve

(*) the Hadoop Filesystem spec is actually Z disguised as Python, but it 
doesn't integrate with any toolchain you can use for correctness proofs. But it 
is read, understood and maintained by developers, which I consider a success. 
It's just, we could do more.