[jira] [Commented] (SPARK-22486) Support synchronous offset commits for Kafka

2019-04-12 Thread Jackson Westeen (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-22486?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16816771#comment-16816771
 ] 

Jackson Westeen commented on SPARK-22486:
-

I have a use case for this, [~c...@koeninger.org] if you'd still consider 
adding support for commitSync.

 

I'm trying to achieve "effectively once" semantics with Spark Streaming for 
batch writes to S3. Only way to do this is to partitionBy(startOffsets) in some 
way, such that re-writes on failure/retry are idempotent; they overwrite the 
past batch if failure occurred before commitAsync was successful.

 

Here's my example:
{code:java}
stream.foreachRDD((rdd:  ConsumerRecord[String, Array[Byte]]) => {
  val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges

  // make dataset, with this batch's offsets included
  spark
.createDataset(inputRdd)
.map(record => from_json(new String(record.value))) // just for example
.write
.mode(SaveMode.Overwrite)
.option("partitionOverwriteMode", "dynamic")
.withColumn("dateKey", from_unixtime($"from_json.timestamp"), "MMDD"))
.withColumn("startOffsets",   
lit(offsetRanges.sortBy(_.partition).map(_.fromOffset).mkString("_"))  )
.partitionBy("dateKey", "startOffsets")
.parquet("s3://mybucket/kafka-parquet")

  stream.asInstanceOf[CanCommitOffsets].commitAsync...
})

{code}
This almost works. The only issue is, I can still end up with 
duplicate/overlapping data if:
 # an initial write to S3 succeeds (batch A)
 # commitAsync takes a long time, eventually fails, *but the job carries on to 
successfully write another batch in the meantime (batch B)*
 # job fails for any reason, we start back at the last committed offsets, 
however now with more data in Kafka to process than before... (batch A' which 
includes A, B, ...)
 # we successfully overwrite the initial batch by startOffsets with (batch A') 
and progress as normal. No data is lost, however (batch B) is leftover in S3 
and contains partially duplicate data.

It would be very nice to have an atomic operation for write and commitOffsets, 
or be able to simulate one with commitSync in Spark Streaming :)

 

> Support synchronous offset commits for Kafka
> 
>
> Key: SPARK-22486
> URL: https://issues.apache.org/jira/browse/SPARK-22486
> Project: Spark
>  Issue Type: Improvement
>  Components: DStreams
>Affects Versions: 2.2.0
>Reporter: Jeremy Beard
>Priority: Major
>
> CanCommitOffsets provides asynchronous offset commits (via 
> Consumer#commitAsync), and it would be useful if it also provided synchronous 
> offset commits (via Consumer#commitSync) for when the desired behavior is to 
> block until it is complete.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-22486) Support synchronous offset commits for Kafka

2017-11-12 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22486?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16248891#comment-16248891
 ] 

Cody Koeninger commented on SPARK-22486:


Can you identify a clear use case for this, given that

- applications have to be tolerant of repeated messages anyway
- the consumer doing the work (on the executor) is not the same as the consumer 
doing the coordination and offset commits (on the driver)

> Support synchronous offset commits for Kafka
> 
>
> Key: SPARK-22486
> URL: https://issues.apache.org/jira/browse/SPARK-22486
> Project: Spark
>  Issue Type: Improvement
>  Components: DStreams
>Affects Versions: 2.2.0
>Reporter: Jeremy Beard
>
> CanCommitOffsets provides asynchronous offset commits (via 
> Consumer#commitAsync), and it would be useful if it also provided synchronous 
> offset commits (via Consumer#commitSync) for when the desired behavior is to 
> block until it is complete.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org