[jira] [Updated] (SPARK-17937) Clarify Kafka offset semantics for Structured Streaming

2019-05-20 Thread Hyukjin Kwon (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-17937?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hyukjin Kwon updated SPARK-17937:
-
Labels: bulk-closed  (was: )

> Clarify Kafka offset semantics for Structured Streaming
> ---
>
> Key: SPARK-17937
> URL: https://issues.apache.org/jira/browse/SPARK-17937
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>Reporter: Cody Koeninger
>Priority: Critical
>  Labels: bulk-closed
>
> Possible events for which offsets are needed:
> # New partition is discovered
> # Offset out of range (aka, data has been lost).   It's possible to separate 
> this into offset too small and offset too large, but I'm not sure it matters 
> for us.
> Possible sources of offsets:
> # *Earliest* position in log
> # *Latest* position in log
> # *Fail* and kill the query
> # *Checkpoint* position
> # *User specified* per topicpartition
> # *Kafka commit log*.  Currently unsupported.  This means users who want to 
> migrate from existing kafka jobs need to jump through hoops.  Even if we 
> never want to support it, as soon as we take on SPARK-17815 we need to make 
> sure Kafka commit log state is clearly documented and handled.
> # *Timestamp*.  Currently unsupported.  This could be supported with old, 
> inaccurate Kafka time api, or upcoming time index
> # *X offsets* before or after latest / earliest position.  Currently 
> unsupported.  I think the semantics of this are super unclear by comparison 
> with timestamp, given that Kafka doesn't have a single range of offsets.
> Currently allowed pre-query configuration, all "ORs" are exclusive:
> # startingOffsets: *earliest* OR *latest* OR *User specified* json per 
> topicpartition  (SPARK-17812)
> # failOnDataLoss: true (which implies *Fail* above) OR false (which implies 
> *Earliest* above)  In general, I see no reason this couldn't specify Latest 
> as an option.
> Possible lifecycle times in which an offset-related event may happen:
> # At initial query start
> #* New partition: if startingOffsets is *Earliest* or *Latest*, use that.  If 
> startingOffsets is *User specified* perTopicpartition, and the new partition 
> isn't in the map, *Fail*.  Note that this is effectively undistinguishable 
> from new parititon during query, because partitions may have changed in 
> between pre-query configuration and query start, but we treat it differently, 
> and users in this case are SOL
> #* Offset out of range on driver: We don't technically have behavior for this 
> case yet.  Could use the value of failOnDataLoss, but it's possible people 
> may want to know at startup that something was wrong, even if they're ok with 
> earliest for a during-query out of range
> #* Offset out of range on executor: seems like it should be *Fail* or 
> *Earliest*, based on failOnDataLoss.  but it looks like this setting is 
> currently ignored, and the executor will just fail...
> # During query
> #* New partition:  *Earliest*, only.  This seems to be by fiat, I see no 
> reason this can't be configurable.
> #* Offset out of range on driver:  this _probably_ doesn't happen, because 
> we're doing explicit seeks to the latest position
> #* Offset out of range on executor:  ?
> # At query restart 
> #* New partition: *Checkpoint*, fall back to *Earliest*.  Again, no reason 
> this couldn't be configurable fall back to Latest
> #* Offset out of range on driver:   this _probably_ doesn't happen, because 
> we're doing explicit seeks to the specified position
> #* Offset out of range on executor:  ?
> I've probably missed something, chime in.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-17937) Clarify Kafka offset semantics for Structured Streaming

2016-11-09 Thread Michael Armbrust (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-17937?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Michael Armbrust updated SPARK-17937:
-
Target Version/s:   (was: 2.1.0)

> Clarify Kafka offset semantics for Structured Streaming
> ---
>
> Key: SPARK-17937
> URL: https://issues.apache.org/jira/browse/SPARK-17937
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>Reporter: Cody Koeninger
>Priority: Critical
>
> Possible events for which offsets are needed:
> # New partition is discovered
> # Offset out of range (aka, data has been lost).   It's possible to separate 
> this into offset too small and offset too large, but I'm not sure it matters 
> for us.
> Possible sources of offsets:
> # *Earliest* position in log
> # *Latest* position in log
> # *Fail* and kill the query
> # *Checkpoint* position
> # *User specified* per topicpartition
> # *Kafka commit log*.  Currently unsupported.  This means users who want to 
> migrate from existing kafka jobs need to jump through hoops.  Even if we 
> never want to support it, as soon as we take on SPARK-17815 we need to make 
> sure Kafka commit log state is clearly documented and handled.
> # *Timestamp*.  Currently unsupported.  This could be supported with old, 
> inaccurate Kafka time api, or upcoming time index
> # *X offsets* before or after latest / earliest position.  Currently 
> unsupported.  I think the semantics of this are super unclear by comparison 
> with timestamp, given that Kafka doesn't have a single range of offsets.
> Currently allowed pre-query configuration, all "ORs" are exclusive:
> # startingOffsets: *earliest* OR *latest* OR *User specified* json per 
> topicpartition  (SPARK-17812)
> # failOnDataLoss: true (which implies *Fail* above) OR false (which implies 
> *Earliest* above)  In general, I see no reason this couldn't specify Latest 
> as an option.
> Possible lifecycle times in which an offset-related event may happen:
> # At initial query start
> #* New partition: if startingOffsets is *Earliest* or *Latest*, use that.  If 
> startingOffsets is *User specified* perTopicpartition, and the new partition 
> isn't in the map, *Fail*.  Note that this is effectively undistinguishable 
> from new parititon during query, because partitions may have changed in 
> between pre-query configuration and query start, but we treat it differently, 
> and users in this case are SOL
> #* Offset out of range on driver: We don't technically have behavior for this 
> case yet.  Could use the value of failOnDataLoss, but it's possible people 
> may want to know at startup that something was wrong, even if they're ok with 
> earliest for a during-query out of range
> #* Offset out of range on executor: seems like it should be *Fail* or 
> *Earliest*, based on failOnDataLoss.  but it looks like this setting is 
> currently ignored, and the executor will just fail...
> # During query
> #* New partition:  *Earliest*, only.  This seems to be by fiat, I see no 
> reason this can't be configurable.
> #* Offset out of range on driver:  this _probably_ doesn't happen, because 
> we're doing explicit seeks to the latest position
> #* Offset out of range on executor:  ?
> # At query restart 
> #* New partition: *Checkpoint*, fall back to *Earliest*.  Again, no reason 
> this couldn't be configurable fall back to Latest
> #* Offset out of range on driver:   this _probably_ doesn't happen, because 
> we're doing explicit seeks to the specified position
> #* Offset out of range on executor:  ?
> I've probably missed something, chime in.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-17937) Clarify Kafka offset semantics for Structured Streaming

2016-11-03 Thread Michael Armbrust (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-17937?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Michael Armbrust updated SPARK-17937:
-
Priority: Critical  (was: Major)

> Clarify Kafka offset semantics for Structured Streaming
> ---
>
> Key: SPARK-17937
> URL: https://issues.apache.org/jira/browse/SPARK-17937
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>Reporter: Cody Koeninger
>Priority: Critical
>
> Possible events for which offsets are needed:
> # New partition is discovered
> # Offset out of range (aka, data has been lost).   It's possible to separate 
> this into offset too small and offset too large, but I'm not sure it matters 
> for us.
> Possible sources of offsets:
> # *Earliest* position in log
> # *Latest* position in log
> # *Fail* and kill the query
> # *Checkpoint* position
> # *User specified* per topicpartition
> # *Kafka commit log*.  Currently unsupported.  This means users who want to 
> migrate from existing kafka jobs need to jump through hoops.  Even if we 
> never want to support it, as soon as we take on SPARK-17815 we need to make 
> sure Kafka commit log state is clearly documented and handled.
> # *Timestamp*.  Currently unsupported.  This could be supported with old, 
> inaccurate Kafka time api, or upcoming time index
> # *X offsets* before or after latest / earliest position.  Currently 
> unsupported.  I think the semantics of this are super unclear by comparison 
> with timestamp, given that Kafka doesn't have a single range of offsets.
> Currently allowed pre-query configuration, all "ORs" are exclusive:
> # startingOffsets: *earliest* OR *latest* OR *User specified* json per 
> topicpartition  (SPARK-17812)
> # failOnDataLoss: true (which implies *Fail* above) OR false (which implies 
> *Earliest* above)  In general, I see no reason this couldn't specify Latest 
> as an option.
> Possible lifecycle times in which an offset-related event may happen:
> # At initial query start
> #* New partition: if startingOffsets is *Earliest* or *Latest*, use that.  If 
> startingOffsets is *User specified* perTopicpartition, and the new partition 
> isn't in the map, *Fail*.  Note that this is effectively undistinguishable 
> from new parititon during query, because partitions may have changed in 
> between pre-query configuration and query start, but we treat it differently, 
> and users in this case are SOL
> #* Offset out of range on driver: We don't technically have behavior for this 
> case yet.  Could use the value of failOnDataLoss, but it's possible people 
> may want to know at startup that something was wrong, even if they're ok with 
> earliest for a during-query out of range
> #* Offset out of range on executor: seems like it should be *Fail* or 
> *Earliest*, based on failOnDataLoss.  but it looks like this setting is 
> currently ignored, and the executor will just fail...
> # During query
> #* New partition:  *Earliest*, only.  This seems to be by fiat, I see no 
> reason this can't be configurable.
> #* Offset out of range on driver:  this _probably_ doesn't happen, because 
> we're doing explicit seeks to the latest position
> #* Offset out of range on executor:  ?
> # At query restart 
> #* New partition: *Checkpoint*, fall back to *Earliest*.  Again, no reason 
> this couldn't be configurable fall back to Latest
> #* Offset out of range on driver:   this _probably_ doesn't happen, because 
> we're doing explicit seeks to the specified position
> #* Offset out of range on executor:  ?
> I've probably missed something, chime in.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-17937) Clarify Kafka offset semantics for Structured Streaming

2016-11-03 Thread Michael Armbrust (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-17937?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Michael Armbrust updated SPARK-17937:
-
Issue Type: Improvement  (was: Sub-task)
Parent: (was: SPARK-15406)

> Clarify Kafka offset semantics for Structured Streaming
> ---
>
> Key: SPARK-17937
> URL: https://issues.apache.org/jira/browse/SPARK-17937
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>Reporter: Cody Koeninger
>
> Possible events for which offsets are needed:
> # New partition is discovered
> # Offset out of range (aka, data has been lost).   It's possible to separate 
> this into offset too small and offset too large, but I'm not sure it matters 
> for us.
> Possible sources of offsets:
> # *Earliest* position in log
> # *Latest* position in log
> # *Fail* and kill the query
> # *Checkpoint* position
> # *User specified* per topicpartition
> # *Kafka commit log*.  Currently unsupported.  This means users who want to 
> migrate from existing kafka jobs need to jump through hoops.  Even if we 
> never want to support it, as soon as we take on SPARK-17815 we need to make 
> sure Kafka commit log state is clearly documented and handled.
> # *Timestamp*.  Currently unsupported.  This could be supported with old, 
> inaccurate Kafka time api, or upcoming time index
> # *X offsets* before or after latest / earliest position.  Currently 
> unsupported.  I think the semantics of this are super unclear by comparison 
> with timestamp, given that Kafka doesn't have a single range of offsets.
> Currently allowed pre-query configuration, all "ORs" are exclusive:
> # startingOffsets: *earliest* OR *latest* OR *User specified* json per 
> topicpartition  (SPARK-17812)
> # failOnDataLoss: true (which implies *Fail* above) OR false (which implies 
> *Earliest* above)  In general, I see no reason this couldn't specify Latest 
> as an option.
> Possible lifecycle times in which an offset-related event may happen:
> # At initial query start
> #* New partition: if startingOffsets is *Earliest* or *Latest*, use that.  If 
> startingOffsets is *User specified* perTopicpartition, and the new partition 
> isn't in the map, *Fail*.  Note that this is effectively undistinguishable 
> from new parititon during query, because partitions may have changed in 
> between pre-query configuration and query start, but we treat it differently, 
> and users in this case are SOL
> #* Offset out of range on driver: We don't technically have behavior for this 
> case yet.  Could use the value of failOnDataLoss, but it's possible people 
> may want to know at startup that something was wrong, even if they're ok with 
> earliest for a during-query out of range
> #* Offset out of range on executor: seems like it should be *Fail* or 
> *Earliest*, based on failOnDataLoss.  but it looks like this setting is 
> currently ignored, and the executor will just fail...
> # During query
> #* New partition:  *Earliest*, only.  This seems to be by fiat, I see no 
> reason this can't be configurable.
> #* Offset out of range on driver:  this _probably_ doesn't happen, because 
> we're doing explicit seeks to the latest position
> #* Offset out of range on executor:  ?
> # At query restart 
> #* New partition: *Checkpoint*, fall back to *Earliest*.  Again, no reason 
> this couldn't be configurable fall back to Latest
> #* Offset out of range on driver:   this _probably_ doesn't happen, because 
> we're doing explicit seeks to the specified position
> #* Offset out of range on executor:  ?
> I've probably missed something, chime in.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-17937) Clarify Kafka offset semantics for Structured Streaming

2016-11-03 Thread Michael Armbrust (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-17937?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Michael Armbrust updated SPARK-17937:
-
Target Version/s: 2.1.0

> Clarify Kafka offset semantics for Structured Streaming
> ---
>
> Key: SPARK-17937
> URL: https://issues.apache.org/jira/browse/SPARK-17937
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>Reporter: Cody Koeninger
>
> Possible events for which offsets are needed:
> # New partition is discovered
> # Offset out of range (aka, data has been lost).   It's possible to separate 
> this into offset too small and offset too large, but I'm not sure it matters 
> for us.
> Possible sources of offsets:
> # *Earliest* position in log
> # *Latest* position in log
> # *Fail* and kill the query
> # *Checkpoint* position
> # *User specified* per topicpartition
> # *Kafka commit log*.  Currently unsupported.  This means users who want to 
> migrate from existing kafka jobs need to jump through hoops.  Even if we 
> never want to support it, as soon as we take on SPARK-17815 we need to make 
> sure Kafka commit log state is clearly documented and handled.
> # *Timestamp*.  Currently unsupported.  This could be supported with old, 
> inaccurate Kafka time api, or upcoming time index
> # *X offsets* before or after latest / earliest position.  Currently 
> unsupported.  I think the semantics of this are super unclear by comparison 
> with timestamp, given that Kafka doesn't have a single range of offsets.
> Currently allowed pre-query configuration, all "ORs" are exclusive:
> # startingOffsets: *earliest* OR *latest* OR *User specified* json per 
> topicpartition  (SPARK-17812)
> # failOnDataLoss: true (which implies *Fail* above) OR false (which implies 
> *Earliest* above)  In general, I see no reason this couldn't specify Latest 
> as an option.
> Possible lifecycle times in which an offset-related event may happen:
> # At initial query start
> #* New partition: if startingOffsets is *Earliest* or *Latest*, use that.  If 
> startingOffsets is *User specified* perTopicpartition, and the new partition 
> isn't in the map, *Fail*.  Note that this is effectively undistinguishable 
> from new parititon during query, because partitions may have changed in 
> between pre-query configuration and query start, but we treat it differently, 
> and users in this case are SOL
> #* Offset out of range on driver: We don't technically have behavior for this 
> case yet.  Could use the value of failOnDataLoss, but it's possible people 
> may want to know at startup that something was wrong, even if they're ok with 
> earliest for a during-query out of range
> #* Offset out of range on executor: seems like it should be *Fail* or 
> *Earliest*, based on failOnDataLoss.  but it looks like this setting is 
> currently ignored, and the executor will just fail...
> # During query
> #* New partition:  *Earliest*, only.  This seems to be by fiat, I see no 
> reason this can't be configurable.
> #* Offset out of range on driver:  this _probably_ doesn't happen, because 
> we're doing explicit seeks to the latest position
> #* Offset out of range on executor:  ?
> # At query restart 
> #* New partition: *Checkpoint*, fall back to *Earliest*.  Again, no reason 
> this couldn't be configurable fall back to Latest
> #* Offset out of range on driver:   this _probably_ doesn't happen, because 
> we're doing explicit seeks to the specified position
> #* Offset out of range on executor:  ?
> I've probably missed something, chime in.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-17937) Clarify Kafka offset semantics for Structured Streaming

2016-11-01 Thread Michael Armbrust (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-17937?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Michael Armbrust updated SPARK-17937:
-
Component/s: Structured Streaming

> Clarify Kafka offset semantics for Structured Streaming
> ---
>
> Key: SPARK-17937
> URL: https://issues.apache.org/jira/browse/SPARK-17937
> Project: Spark
>  Issue Type: Sub-task
>  Components: Structured Streaming
>Reporter: Cody Koeninger
>
> Possible events for which offsets are needed:
> # New partition is discovered
> # Offset out of range (aka, data has been lost).   It's possible to separate 
> this into offset too small and offset too large, but I'm not sure it matters 
> for us.
> Possible sources of offsets:
> # *Earliest* position in log
> # *Latest* position in log
> # *Fail* and kill the query
> # *Checkpoint* position
> # *User specified* per topicpartition
> # *Kafka commit log*.  Currently unsupported.  This means users who want to 
> migrate from existing kafka jobs need to jump through hoops.  Even if we 
> never want to support it, as soon as we take on SPARK-17815 we need to make 
> sure Kafka commit log state is clearly documented and handled.
> # *Timestamp*.  Currently unsupported.  This could be supported with old, 
> inaccurate Kafka time api, or upcoming time index
> # *X offsets* before or after latest / earliest position.  Currently 
> unsupported.  I think the semantics of this are super unclear by comparison 
> with timestamp, given that Kafka doesn't have a single range of offsets.
> Currently allowed pre-query configuration, all "ORs" are exclusive:
> # startingOffsets: *earliest* OR *latest* OR *User specified* json per 
> topicpartition  (SPARK-17812)
> # failOnDataLoss: true (which implies *Fail* above) OR false (which implies 
> *Earliest* above)  In general, I see no reason this couldn't specify Latest 
> as an option.
> Possible lifecycle times in which an offset-related event may happen:
> # At initial query start
> #* New partition: if startingOffsets is *Earliest* or *Latest*, use that.  If 
> startingOffsets is *User specified* perTopicpartition, and the new partition 
> isn't in the map, *Fail*.  Note that this is effectively undistinguishable 
> from new parititon during query, because partitions may have changed in 
> between pre-query configuration and query start, but we treat it differently, 
> and users in this case are SOL
> #* Offset out of range on driver: We don't technically have behavior for this 
> case yet.  Could use the value of failOnDataLoss, but it's possible people 
> may want to know at startup that something was wrong, even if they're ok with 
> earliest for a during-query out of range
> #* Offset out of range on executor: seems like it should be *Fail* or 
> *Earliest*, based on failOnDataLoss.  but it looks like this setting is 
> currently ignored, and the executor will just fail...
> # During query
> #* New partition:  *Earliest*, only.  This seems to be by fiat, I see no 
> reason this can't be configurable.
> #* Offset out of range on driver:  this _probably_ doesn't happen, because 
> we're doing explicit seeks to the latest position
> #* Offset out of range on executor:  ?
> # At query restart 
> #* New partition: *Checkpoint*, fall back to *Earliest*.  Again, no reason 
> this couldn't be configurable fall back to Latest
> #* Offset out of range on driver:   this _probably_ doesn't happen, because 
> we're doing explicit seeks to the specified position
> #* Offset out of range on executor:  ?
> I've probably missed something, chime in.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-17937) Clarify Kafka offset semantics for Structured Streaming

2016-10-14 Thread Cody Koeninger (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-17937?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Cody Koeninger updated SPARK-17937:
---
Description: 
Possible events for which offsets are needed:
# New partition is discovered
# Offset out of range (aka, data has been lost).   It's possible to separate 
this into offset too small and offset too large, but I'm not sure it matters 
for us.

Possible sources of offsets:
# *Earliest* position in log
# *Latest* position in log
# *Fail* and kill the query
# *Checkpoint* position
# *User specified* per topicpartition
# *Kafka commit log*.  Currently unsupported.  This means users who want to 
migrate from existing kafka jobs need to jump through hoops.  Even if we never 
want to support it, as soon as we take on SPARK-17815 we need to make sure 
Kafka commit log state is clearly documented and handled.
# *Timestamp*.  Currently unsupported.  This could be supported with old, 
inaccurate Kafka time api, or upcoming time index
# *X offsets* before or after latest / earliest position.  Currently 
unsupported.  I think the semantics of this are super unclear by comparison 
with timestamp, given that Kafka doesn't have a single range of offsets.

Currently allowed pre-query configuration, all "ORs" are exclusive:
# startingOffsets: *earliest* OR *latest* OR *User specified* json per 
topicpartition  (SPARK-17812)
# failOnDataLoss: true (which implies *Fail* above) OR false (which implies 
*Earliest* above)  In general, I see no reason this couldn't specify Latest as 
an option.

Possible lifecycle times in which an offset-related event may happen:
# At initial query start
#* New partition: if startingOffsets is *Earliest* or *Latest*, use that.  If 
startingOffsets is *User specified* perTopicpartition, and the new partition 
isn't in the map, *Fail*.  Note that this is effectively undistinguishable from 
new parititon during query, because partitions may have changed in between 
pre-query configuration and query start, but we treat it differently, and users 
in this case are SOL
#* Offset out of range on driver: We don't technically have behavior for this 
case yet.  Could use the value of failOnDataLoss, but it's possible people may 
want to know at startup that something was wrong, even if they're ok with 
earliest for a during-query out of range
#* Offset out of range on executor: seems like it should be *Fail* or 
*Earliest*, based on failOnDataLoss.  but it looks like this setting is 
currently ignored, and the executor will just fail...
# During query
#* New partition:  *Earliest*, only.  This seems to be by fiat, I see no reason 
this can't be configurable.
#* Offset out of range on driver:  this _probably_ doesn't happen, because 
we're doing explicit seeks to the latest position
#* Offset out of range on executor:  ?
# At query restart 
#* New partition: *Checkpoint*, fall back to *Earliest*.  Again, no reason this 
couldn't be configurable fall back to Latest
#* Offset out of range on driver:   this _probably_ doesn't happen, because 
we're doing explicit seeks to the specified position
#* Offset out of range on executor:  ?

I've probably missed something, chime in.


  was:
Possible events for which offsets are needed:
# New partition is discovered
# Offset out of range (aka, data has been lost).   It's possible to separate 
this into offset too small and offset too large, but I'm not sure it matters 
for us.

Possible sources of offsets:
# *Earliest* position in log
# *Latest* position in log
# *Fail* and kill the query
# *Checkpoint* position
# *User specified* per topicpartition
# *Kafka commit log*.  Currently unsupported.  This means users who want to 
migrate from existing kafka jobs need to jump through hoops.  Even if we never 
want to support it, as soon as we take on SPARK-17815 we need to make sure 
Kafka commit log state is clearly documented and handled.
# *Timestamp*.  Currently unsupported.  This could be supported with old, 
inaccurate Kafka time api, or upcoming time index
# *X offsets* before or after latest / earliest position.  Currently 
unsupported.  I think the semantics of this are super unclear by comparison 
with timestamp, given that Kafka doesn't have a single range of offsets.

Currently allowed pre-query configuration, all "ORs" are exclusive:
# startingOffsets: *earliest* OR *latest* OR *User specified* json per 
topicpartition  (SPARK-17812)
# failOnDataLoss: true (which implies *Fail* above) OR false (which implies 
*Earliest* above)  In general, I see no reason this couldn't specify Latest as 
an option.

Possible lifecycle times in which an offset-related event may happen:
# At initial query start
#* New partition: if startingOffsets is *Earliest* or *Latest*, use that.  If 
startingOffsets is *User specified* perTopicpartition, and the new partition 
isn't in the map, *Fail*.  Note that this is effectively undistinguishable from 
new parititon during 

[jira] [Updated] (SPARK-17937) Clarify Kafka offset semantics for Structured Streaming

2016-10-14 Thread Cody Koeninger (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-17937?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Cody Koeninger updated SPARK-17937:
---
Description: 
Possible events for which offsets are needed:
# New partition is discovered
# Offset out of range (aka, data has been lost).   It's possible to separate 
this into offset too small and offset too large, but I'm not sure it matters 
for us.

Possible sources of offsets:
# *Earliest* position in log
# *Latest* position in log
# *Fail* and kill the query
# *Checkpoint* position
# *User specified* per topicpartition
# *Kafka commit log*.  Currently unsupported.  This means users who want to 
migrate from existing kafka jobs need to jump through hoops.  Even if we never 
want to support it, as soon as we take on SPARK-17815 we need to make sure 
Kafka commit log state is clearly documented and handled.
# *Timestamp*.  Currently unsupported.  This could be supported with old, 
inaccurate Kafka time api, or upcoming time index
# *X offsets* before or after latest / earliest position.  Currently 
unsupported.  I think the semantics of this are super unclear by comparison 
with timestamp, given that Kafka doesn't have a single range of offsets.

Currently allowed pre-query configuration, all "ORs" are exclusive:
# startingOffsets: *earliest* OR *latest* OR *User specified* json per 
topicpartition  (SPARK-17812)
# failOnDataLoss: true (which implies *Fail* above) OR false (which implies 
*Earliest* above)  In general, I see no reason this couldn't specify Latest as 
an option.

Possible lifecycle times in which an offset-related event may happen:
# At initial query start
#* New partition: if startingOffsets is *Earliest* or *Latest*, use that.  If 
startingOffsets is *User specified* perTopicpartition, and the new partition 
isn't in the map, *Fail*.  Note that this is effectively undistinguishable from 
new parititon during query, because partitions may have changed in between 
pre-query configuration and query start, but we treat it differently, and users 
in this case are SOL
#* Offset out of range on driver: We don't technically have behavior for this 
case yet.  Could use the value of failOnDataLoss, but it's possible people may 
want to know at startup that something was wrong, even if they're ok with 
earliest for a during-query out of range
#* Offset out of range on executor: seems like it should be*Fail* or 
*Earliest*, based on failOnDataLoss.  but it looks like this setting is 
currently ignored, and the executor will just fail...
# During query
#* New partition:  *Earliest*, only.  This seems to be by fiat, I see no reason 
this can't be configurable.
#* Offset out of range on driver:  this _probably_ doesn't happen, because 
we're doing explicit seeks to the latest position
#* Offset out of range on executor:  ?
# At query restart 
#* New partition: *Checkpoint*, fall back to *Earliest*.  Again, no reason this 
couldn't be configurable fall back to Latest
#* Offset out of range on driver:   this _probably_ doesn't happen, because 
we're doing explicit seeks to the specified position
#* Offset out of range on executor:  ?

I've probably missed something, chime in.


  was:
Possible events for which offsets are needed:
# New partition is discovered
# Offset out of range (aka, data has been lost).   It's possible to separate 
this into offset too small and offset too large, but I'm not sure it matters 
for us.

Possible sources of offsets:
# *Earliest* position in log
# *Latest* position in log
# *Fail* and kill the query
# *Checkpoint* position
# *User specified* per topicpartition
# *Kafka commit log*.  Currently unsupported.  This means users who want to 
migrate from existing kafka jobs need to jump through hoops.  Even if we never 
want to support it, as soon as we take on SPARK-17815 we need to make sure 
Kafka commit log state is clearly documented and handled.
# *Timestamp*.  Currently unsupported.  This could be supported with old, 
inaccurate Kafka time api, or upcoming time index
# *X offsets* before or after latest / earliest position.  Currently 
unsupported.  I think the semantics of this are super unclear by comparison 
with timestamp, given that Kafka doesn't have a single range of offsets.

Currently allowed pre-query configuration, all "ORs" are exclusive:
# startingOffsets: *earliest* OR *latest* OR *User specified* json per 
topicpartition  (SPARK-17812)
# failOnDataLoss: true (which implies *Fail* above) OR false (which implies 
*Earliest* above)  In general, I see no reason this couldn't specify Latest as 
an option.

Possible lifecycle times in which an offset-related event may happen:
# At initial query start
#* New partition: if startingOffsets is *Earliest* or *Latest*, use that.  If 
startingOffsets is *User specified* perTopicpartition, and the new partition 
isn't in the map, *Fail*.  Note that this is effectively undistinguishable from 
new parititon during 

[jira] [Updated] (SPARK-17937) Clarify Kafka offset semantics for Structured Streaming

2016-10-14 Thread Cody Koeninger (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-17937?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Cody Koeninger updated SPARK-17937:
---
Description: 
Possible events for which offsets are needed:
# New partition is discovered
# Offset out of range (aka, data has been lost).   It's possible to separate 
this into offset too small and offset too large, but I'm not sure it matters 
for us.

Possible sources of offsets:
# *Earliest* position in log
# *Latest* position in log
# *Fail* and kill the query
# *Checkpoint* position
# *User specified* per topicpartition
# *Kafka commit log*.  Currently unsupported.  This means users who want to 
migrate from existing kafka jobs need to jump through hoops.  Even if we never 
want to support it, as soon as we take on SPARK-17815 we need to make sure 
Kafka commit log state is clearly documented and handled.
# *Timestamp*.  Currently unsupported.  This could be supported with old, 
inaccurate Kafka time api, or upcoming time index
# *X offsets* before or after latest / earliest position.  Currently 
unsupported.  I think the semantics of this are super unclear by comparison 
with timestamp, given that Kafka doesn't have a single range of offsets.

Currently allowed pre-query configuration, all "ORs" are exclusive:
# startingOffsets: *earliest* OR *latest* OR *User specified* json per 
topicpartition  (SPARK-17812)
# failOnDataLoss: true (which implies *Fail* above) OR false (which implies 
*Earliest* above)  In general, I see no reason this couldn't specify Latest as 
an option.

Possible lifecycle times in which an offset-related event may happen:
# At initial query start
#* New partition: if startingOffsets is *Earliest* or *Latest*, use that.  If 
startingOffsets is *User specified* perTopicpartition, and the new partition 
isn't in the map, *Fail*.  Note that this is effectively undistinguishable from 
new parititon during query, because partitions may have changed in between 
pre-query configuration and query start, but we treat it differently, and users 
in this case are SOL
#* Offset out of range on driver: We don't technically have behavior for this 
case yet.  Could use the value of failOnDataLoss, but it's possible people may 
want to know at startup that something was wrong, even if they're ok with 
earliest for a during-query out of range
#* Offset out of range on executor: *Fail* or *Earliest*, based on 
failOnDataLoss.
# During query
#* New partition:  *Earliest*, only.  This seems to be by fiat, I see no reason 
this can't be configurable.
#* Offset out of range on driver:  this _probably_ doesn't happen, because 
we're doing explicit seeks to the latest position
#* Offset out of range on executor:  *Fail* or *Earliest*, based on 
failOnDataLoss
# At query restart 
#* New partition: *Checkpoint*, fall back to *Earliest*.  Again, no reason this 
couldn't be configurable fall back to Latest
#* Offset out of range on driver:   this _probably_ doesn't happen, because 
we're doing explicit seeks to the specified position
#* Offset out of range on executor:  *Fail* or *Earliest*, based on 
failOnDataLoss


I've probably missed something, chime in.


  was:
Possible events for which offsets are needed:
# New partition is discovered
# Offset out of range (aka, data has been lost)

Possible sources of offsets:
# *Earliest* position in log
# *Latest* position in log
# *Fail* and kill the query
# *Checkpoint* position
# *User specified* per topicpartition
# *Kafka commit log*.  Currently unsupported.  This means users who want to 
migrate from existing kafka jobs need to jump through hoops.  Even if we never 
want to support it, as soon as we take on SPARK-17815 we need to make sure 
Kafka commit log state is clearly documented and handled.
# *Timestamp*.  Currently unsupported.  This could be supported with old, 
inaccurate Kafka time api, or upcoming time index
# *X offsets* before or after latest / earliest position.  Currently 
unsupported.  I think the semantics of this are super unclear by comparison 
with timestamp, given that Kafka doesn't have a single range of offsets.

Currently allowed pre-query configuration, all "ORs" are exclusive:
# startingOffsets: *earliest* OR *latest* OR *User specified* json per 
topicpartition  (SPARK-17812)
# failOnDataLoss: true (which implies *Fail* above) OR false (which implies 
*Earliest* above)  In general, I see no reason this couldn't specify Latest as 
an option.

Possible lifecycle times in which an offset-related event may happen:
# At initial query start
#* New partition: if startingOffsets is *Earliest* or *Latest*, use that.  If 
startingOffsets is *User specified* perTopicpartition, and the new partition 
isn't in the map, *Fail*.  Note that this is effectively undistinguishable from 
new parititon during query, because partitions may have changed in between 
pre-query configuration and query start, but we treat it differently, and users 
in 

[jira] [Updated] (SPARK-17937) Clarify Kafka offset semantics for Structured Streaming

2016-10-14 Thread Cody Koeninger (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-17937?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Cody Koeninger updated SPARK-17937:
---
Description: 
Possible events for which offsets are needed:
# New partition is discovered
# Offset out of range (aka, data has been lost)

Possible sources of offsets:
# *Earliest* position in log
# *Latest* position in log
# *Fail* and kill the query
# *Checkpoint* position
# *User specified* per topicpartition
# *Kafka commit log*.  Currently unsupported.  This means users who want to 
migrate from existing kafka jobs need to jump through hoops.  Even if we never 
want to support it, as soon as we take on SPARK-17815 we need to make sure 
Kafka commit log state is clearly documented and handled.
# *Timestamp*.  Currently unsupported.  This could be supported with old, 
inaccurate Kafka time api, or upcoming time index
# *X offsets* before or after latest / earliest position.  Currently 
unsupported.  I think the semantics of this are super unclear by comparison 
with timestamp, given that Kafka doesn't have a single range of offsets.

Currently allowed pre-query configuration, all "ORs" are exclusive:
# startingOffsets: *earliest* OR *latest* OR *User specified* json per 
topicpartition  (SPARK-17812)
# failOnDataLoss: true (which implies *Fail* above) OR false (which implies 
*Earliest* above)  In general, I see no reason this couldn't specify Latest as 
an option.

Possible lifecycle times in which an offset-related event may happen:
# At initial query start
#* New partition: if startingOffsets is *Earliest* or *Latest*, use that.  If 
startingOffsets is *User specified* perTopicpartition, and the new partition 
isn't in the map, *Fail*.  Note that this is effectively undistinguishable from 
new parititon during query, because partitions may have changed in between 
pre-query configuration and query start, but we treat it differently, and users 
in this case are SOL
#* Offset out of range on driver: We don't technically have behavior for this 
case yet.  Could use the value of failOnDataLoss, but it's possible people may 
want to know at startup that something was wrong, even if they're ok with 
earliest for a during-query out of range
#* Offset out of range on executor: *Fail* or *Earliest*, based on 
failOnDataLoss.
# During query
#* New partition:  *Earliest*, only.  This seems to be by fiat, I see no reason 
this can't be configurable.
#* Offset out of range on driver:  this _probably_ doesn't happen, because 
we're doing explicit seeks to the latest position
#* Offset out of range on executor:  *Fail* or *Earliest*, based on 
failOnDataLoss
# At query restart 
#* New partition: *Checkpoint*, fall back to *Earliest*.  Again, no reason this 
couldn't be configurable fall back to Latest
#* Offset out of range on driver:   this _probably_ doesn't happen, because 
we're doing explicit seeks to the specified position
#* Offset out of range on executor:  *Fail* or *Earliest*, based on 
failOnDataLoss


I've probably missed something, chime in.


  was:
Possible events for which offsets are needed:
# New partition is discovered
# Offset out of range (aka, data has been lost)

Possible sources of offsets:
# *Earliest* position in log
# *Latest* position in log
# *Fail* and kill the query
# *Checkpoint* position
# *User specified* per topicpartition
# *Kafka commit log*.  Currently unsupported.  This means users who want to 
migrate from existing kafka jobs need to jump through hoops.  Even if we never 
want to support it, as soon as we take on SPARK-17815 we need to make sure 
Kafka commit log state is clearly documented and handled.
# *Timestamp*.  Currently unsupported.  This could be supported with old, 
inaccurate Kafka time api, or upcoming time index
# *X offsets* before or after latest / earliest position.  Currently 
unsupported.  I think the semantics of this are super unclear by comparison 
with timestamp, given that Kafka doesn't have a single range of offsets.

Currently allowed pre-query configuration, all "ORs" are exclusive:
# startingOffsets: *earliest* OR *latest* OR *User specified* json per 
topicpartition  (SPARK-17812)
# failOnDataLoss: true (which implies *Fail* above) OR false (which implies 
*Earliest* above)  In general, I see no reason this couldn't specify Latest as 
an option.

Possible lifecycle times in which an offset-related event may happen:
# At initial query start
#* New partition: if startingOffsets is *Earliest* or *Latest*, use that.  If 
startingOffsets is *User specified* perTopicpartition, and the new partition 
isn't in the map, *Fail*.  Note that this is effectively undistinguishable from 
new parititon during query, because partitions may have changed in between 
pre-query configuration and query start, but we treat it differently, and users 
in this case are SOL
#* Offset out of range on driver: We don't technically have behavior for this 
case yet.  Could use 

[jira] [Updated] (SPARK-17937) Clarify Kafka offset semantics for Structured Streaming

2016-10-14 Thread Cody Koeninger (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-17937?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Cody Koeninger updated SPARK-17937:
---
Description: 
Possible events for which offsets are needed:
# New partition is discovered
# Offset out of range (aka, data has been lost)

Possible sources of offsets:
# Earliest position in log
# Latest position in log
# Fail and kill the query
# Checkpoint position
# User specified per topicpartition
# Kafka commit log.  Currently unsupported.  This means users who want to 
migrate from existing kafka jobs need to jump through hoops.  Even if we never 
want to support it, as soon as we take on SPARK-17815 we need to make sure 
Kafka commit log state is clearly documented and handled.
# Timestamp.  Currently unsupported.  This could be supported with old, 
inaccurate Kafka time api, or upcoming time index
# X offsets before or after latest / earliest position.  Currently unsupported. 
 I think the semantics of this are super unclear by comparison with timestamp, 
given that Kafka doesn't have a single range of offsets.

Currently allowed pre-query configuration, all "ORs" are exclusive:
# startingOffsets: earliest OR latest OR json per topicpartition  (SPARK-17812)
# failOnDataLoss: true (which implies Fail above) OR false (which implies 
Earliest above)  In general, I see no reason this couldn't specify Latest as an 
option.

Possible lifecycle times in which an offset-related event may happen:
# At initial query start
#* New partition: if startingOffsets is earliest or latest, use that.  If 
startingOffsets is perTopicpartition, and the new partition isn't in the map, 
Fail.  Note that this is effectively undistinguishable from new parititon 
during query, because partitions may have changed in between pre-query 
configuration and query start, but we treat it differently, and users in this 
case are SOL
#* Offset out of range on driver: We don't technically have behavior for this 
case yet.  Could use the value of failOnDataLoss, but it's possible people may 
want to know at startup that something was wrong, even if they're ok with 
earliest for a during-query out of range
#* Offset out of range on executor: Fail or Earliest, based on failOnDataLoss.
# During query
#* New partition:  Earliest, only.  This seems to be by fiat, I see no reason 
this can't be configurable.
#* Offset out of range on driver:  this _probably_ doesn't happen, because 
we're doing explicit seeks to the latest position
#* Offset out of range on executor:  Fail or Earliest, based on failOnDataLoss
# At query restart 
#* New partition: Checkpoint, fall back to Earliest.  Again, no reason this 
couldn't be configurable fall back to Latest
#* Offset out of range on driver:  Fail or Earliest, based on FailOnDataLoss
#* Offset out of range on executor:  Fail or Earliest, based on FailOnDataLoss


I've probably missed something, chime in.


  was:
Possible events for which offsets are needed:
# New partition is discovered
# Offset out of range (aka, data has been lost)

Possible sources of offsets:
# Earliest position in log
# Latest position in log
# Fail and kill the query
# Checkpoint position
# User specified per topicpartition
# Kafka commit log.  Currently unsupported.  This means users who want to 
migrate from existing kafka jobs need to jump through hoops.  Even if we never 
want to support it, as soon as we take on SPARK-17815 we need to make sure 
Kafka commit log state is clearly documented and handled.
# Timestamp.  Currently unsupported.  This could be supported with old, 
inaccurate Kafka time api, or upcoming time index
# X offsets before or after latest / earliest position.  Currently unsupported. 
 I think the semantics of this are super unclear by comparison with timestamp, 
given that Kafka doesn't have a single range of offsets.

Currently allowed pre-query configuration, all "ORs" are exclusive:
# startingOffsets: earliest OR latest OR json per topicpartition  (SPARK-17812)
# failOnDataLoss: true (which implies Fail above) OR false (which implies 
Earliest above)  In general, I see no reason this couldn't specify Latest as an 
option.

Possible lifecycle times in which an offset-related event may happen:
# At initial query start
#* New partition: if startingOffsets is earliest or latest, use that.  If 
startingOffsets is perTopicpartition, and the new partition isn't in the map, 
Fail.  Note that this is effectively undistinguishable from new parititon 
during query, because partitions may have changed in between pre-query 
configuration and query start, but we treat it differently, and users in this 
case are SOL
#* Offset out of range on driver: We don't technically have behavior for this 
case yet.  Could use the value of failOnDataLoss, but it's possible people may 
want to know at startup that something was wrong, even if they're ok with 
earliest for a during-query out of range
#* Offset out of range on 

[jira] [Updated] (SPARK-17937) Clarify Kafka offset semantics for Structured Streaming

2016-10-14 Thread Cody Koeninger (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-17937?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Cody Koeninger updated SPARK-17937:
---
Description: 
Possible events for which offsets are needed:
# New partition is discovered
# Offset out of range (aka, data has been lost)

Possible sources of offsets:
# *Earliest* position in log
# *Latest* position in log
# *Fail* and kill the query
# *Checkpoint* position
# *User specified* per topicpartition
# *Kafka commit log*.  Currently unsupported.  This means users who want to 
migrate from existing kafka jobs need to jump through hoops.  Even if we never 
want to support it, as soon as we take on SPARK-17815 we need to make sure 
Kafka commit log state is clearly documented and handled.
# *Timestamp*.  Currently unsupported.  This could be supported with old, 
inaccurate Kafka time api, or upcoming time index
# *X offsets* before or after latest / earliest position.  Currently 
unsupported.  I think the semantics of this are super unclear by comparison 
with timestamp, given that Kafka doesn't have a single range of offsets.

Currently allowed pre-query configuration, all "ORs" are exclusive:
# startingOffsets: earliest OR latest OR User specified json per topicpartition 
 (SPARK-17812)
# failOnDataLoss: true (which implies Fail above) OR false (which implies 
Earliest above)  In general, I see no reason this couldn't specify Latest as an 
option.

Possible lifecycle times in which an offset-related event may happen:
# At initial query start
#* New partition: if startingOffsets is *Earliest* or *Latest*, use that.  If 
startingOffsets is *User specified* perTopicpartition, and the new partition 
isn't in the map, *Fail*.  Note that this is effectively undistinguishable from 
new parititon during query, because partitions may have changed in between 
pre-query configuration and query start, but we treat it differently, and users 
in this case are SOL
#* Offset out of range on driver: We don't technically have behavior for this 
case yet.  Could use the value of failOnDataLoss, but it's possible people may 
want to know at startup that something was wrong, even if they're ok with 
earliest for a during-query out of range
#* Offset out of range on executor: *Fail* or *Earliest*, based on 
failOnDataLoss.
# During query
#* New partition:  *Earliest*, only.  This seems to be by fiat, I see no reason 
this can't be configurable.
#* Offset out of range on driver:  this _probably_ doesn't happen, because 
we're doing explicit seeks to the latest position
#* Offset out of range on executor:  *Fail* or *Earliest*, based on 
failOnDataLoss
# At query restart 
#* New partition: *Checkpoint*, fall back to *Earliest*.  Again, no reason this 
couldn't be configurable fall back to Latest
#* Offset out of range on driver:  *Fail* or *Earliest*, based on failOnDataLoss
#* Offset out of range on executor:  *Fail* or *Earliest*, based on 
failOnDataLoss


I've probably missed something, chime in.


  was:
Possible events for which offsets are needed:
# *New partition* is discovered
# *Offset out of range* (aka, data has been lost)

Possible sources of offsets:
# *Earliest* position in log
# *Latest* position in log
# *Fail* and kill the query
# *Checkpoint* position
# *User specified* per topicpartition
# *Kafka commit log*.  Currently unsupported.  This means users who want to 
migrate from existing kafka jobs need to jump through hoops.  Even if we never 
want to support it, as soon as we take on SPARK-17815 we need to make sure 
Kafka commit log state is clearly documented and handled.
# *Timestamp*.  Currently unsupported.  This could be supported with old, 
inaccurate Kafka time api, or upcoming time index
# *X offsets* before or after latest / earliest position.  Currently 
unsupported.  I think the semantics of this are super unclear by comparison 
with timestamp, given that Kafka doesn't have a single range of offsets.

Currently allowed pre-query configuration, all "ORs" are exclusive:
# startingOffsets: earliest OR latest OR User specified json per topicpartition 
 (SPARK-17812)
# failOnDataLoss: true (which implies Fail above) OR false (which implies 
Earliest above)  In general, I see no reason this couldn't specify Latest as an 
option.

Possible lifecycle times in which an offset-related event may happen:
# At initial query start
#* New partition: if startingOffsets is *Earliest* or *Latest*, use that.  If 
startingOffsets is *User specified* perTopicpartition, and the new partition 
isn't in the map, *Fail*.  Note that this is effectively undistinguishable from 
new parititon during query, because partitions may have changed in between 
pre-query configuration and query start, but we treat it differently, and users 
in this case are SOL
#* Offset out of range on driver: We don't technically have behavior for this 
case yet.  Could use the value of failOnDataLoss, but it's possible people may 
want 

[jira] [Updated] (SPARK-17937) Clarify Kafka offset semantics for Structured Streaming

2016-10-14 Thread Cody Koeninger (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-17937?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Cody Koeninger updated SPARK-17937:
---
Description: 
Possible events for which offsets are needed:
# New partition is discovered
# Offset out of range (aka, data has been lost)

Possible sources of offsets:
# *Earliest* position in log
# *Latest* position in log
# *Fail* and kill the query
# *Checkpoint* position
# *User specified* per topicpartition
# *Kafka commit log*.  Currently unsupported.  This means users who want to 
migrate from existing kafka jobs need to jump through hoops.  Even if we never 
want to support it, as soon as we take on SPARK-17815 we need to make sure 
Kafka commit log state is clearly documented and handled.
# *Timestamp*.  Currently unsupported.  This could be supported with old, 
inaccurate Kafka time api, or upcoming time index
# *X offsets* before or after latest / earliest position.  Currently 
unsupported.  I think the semantics of this are super unclear by comparison 
with timestamp, given that Kafka doesn't have a single range of offsets.

Currently allowed pre-query configuration, all "ORs" are exclusive:
# startingOffsets: *earliest* OR *latest* OR *User specified* json per 
topicpartition  (SPARK-17812)
# failOnDataLoss: true (which implies *Fail* above) OR false (which implies 
*Earliest* above)  In general, I see no reason this couldn't specify Latest as 
an option.

Possible lifecycle times in which an offset-related event may happen:
# At initial query start
#* New partition: if startingOffsets is *Earliest* or *Latest*, use that.  If 
startingOffsets is *User specified* perTopicpartition, and the new partition 
isn't in the map, *Fail*.  Note that this is effectively undistinguishable from 
new parititon during query, because partitions may have changed in between 
pre-query configuration and query start, but we treat it differently, and users 
in this case are SOL
#* Offset out of range on driver: We don't technically have behavior for this 
case yet.  Could use the value of failOnDataLoss, but it's possible people may 
want to know at startup that something was wrong, even if they're ok with 
earliest for a during-query out of range
#* Offset out of range on executor: *Fail* or *Earliest*, based on 
failOnDataLoss.
# During query
#* New partition:  *Earliest*, only.  This seems to be by fiat, I see no reason 
this can't be configurable.
#* Offset out of range on driver:  this _probably_ doesn't happen, because 
we're doing explicit seeks to the latest position
#* Offset out of range on executor:  *Fail* or *Earliest*, based on 
failOnDataLoss
# At query restart 
#* New partition: *Checkpoint*, fall back to *Earliest*.  Again, no reason this 
couldn't be configurable fall back to Latest
#* Offset out of range on driver:  *Fail* or *Earliest*, based on failOnDataLoss
#* Offset out of range on executor:  *Fail* or *Earliest*, based on 
failOnDataLoss


I've probably missed something, chime in.


  was:
Possible events for which offsets are needed:
# New partition is discovered
# Offset out of range (aka, data has been lost)

Possible sources of offsets:
# *Earliest* position in log
# *Latest* position in log
# *Fail* and kill the query
# *Checkpoint* position
# *User specified* per topicpartition
# *Kafka commit log*.  Currently unsupported.  This means users who want to 
migrate from existing kafka jobs need to jump through hoops.  Even if we never 
want to support it, as soon as we take on SPARK-17815 we need to make sure 
Kafka commit log state is clearly documented and handled.
# *Timestamp*.  Currently unsupported.  This could be supported with old, 
inaccurate Kafka time api, or upcoming time index
# *X offsets* before or after latest / earliest position.  Currently 
unsupported.  I think the semantics of this are super unclear by comparison 
with timestamp, given that Kafka doesn't have a single range of offsets.

Currently allowed pre-query configuration, all "ORs" are exclusive:
# startingOffsets: earliest OR latest OR User specified json per topicpartition 
 (SPARK-17812)
# failOnDataLoss: true (which implies Fail above) OR false (which implies 
Earliest above)  In general, I see no reason this couldn't specify Latest as an 
option.

Possible lifecycle times in which an offset-related event may happen:
# At initial query start
#* New partition: if startingOffsets is *Earliest* or *Latest*, use that.  If 
startingOffsets is *User specified* perTopicpartition, and the new partition 
isn't in the map, *Fail*.  Note that this is effectively undistinguishable from 
new parititon during query, because partitions may have changed in between 
pre-query configuration and query start, but we treat it differently, and users 
in this case are SOL
#* Offset out of range on driver: We don't technically have behavior for this 
case yet.  Could use the value of failOnDataLoss, but it's possible people may 

[jira] [Updated] (SPARK-17937) Clarify Kafka offset semantics for Structured Streaming

2016-10-14 Thread Cody Koeninger (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-17937?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Cody Koeninger updated SPARK-17937:
---
Description: 
Possible events for which offsets are needed:
# *New partition* is discovered
# *Offset out of range* (aka, data has been lost)

Possible sources of offsets:
# *Earliest* position in log
# *Latest* position in log
# *Fail* and kill the query
# *Checkpoint* position
# *User specified* per topicpartition
# *Kafka commit log*.  Currently unsupported.  This means users who want to 
migrate from existing kafka jobs need to jump through hoops.  Even if we never 
want to support it, as soon as we take on SPARK-17815 we need to make sure 
Kafka commit log state is clearly documented and handled.
# *Timestamp*.  Currently unsupported.  This could be supported with old, 
inaccurate Kafka time api, or upcoming time index
# *X offsets* before or after latest / earliest position.  Currently 
unsupported.  I think the semantics of this are super unclear by comparison 
with timestamp, given that Kafka doesn't have a single range of offsets.

Currently allowed pre-query configuration, all "ORs" are exclusive:
# startingOffsets: earliest OR latest OR User specified json per topicpartition 
 (SPARK-17812)
# failOnDataLoss: true (which implies Fail above) OR false (which implies 
Earliest above)  In general, I see no reason this couldn't specify Latest as an 
option.

Possible lifecycle times in which an offset-related event may happen:
# At initial query start
#* New partition: if startingOffsets is *Earliest* or *Latest*, use that.  If 
startingOffsets is *User specified* perTopicpartition, and the new partition 
isn't in the map, *Fail*.  Note that this is effectively undistinguishable from 
new parititon during query, because partitions may have changed in between 
pre-query configuration and query start, but we treat it differently, and users 
in this case are SOL
#* Offset out of range on driver: We don't technically have behavior for this 
case yet.  Could use the value of failOnDataLoss, but it's possible people may 
want to know at startup that something was wrong, even if they're ok with 
earliest for a during-query out of range
#* Offset out of range on executor: *Fail* or *Earliest*, based on 
failOnDataLoss.
# During query
#* New partition:  *Earliest*, only.  This seems to be by fiat, I see no reason 
this can't be configurable.
#* Offset out of range on driver:  this _probably_ doesn't happen, because 
we're doing explicit seeks to the latest position
#* Offset out of range on executor:  *Fail* or *Earliest*, based on 
failOnDataLoss
# At query restart 
#* New partition: *Checkpoint*, fall back to *Earliest*.  Again, no reason this 
couldn't be configurable fall back to Latest
#* Offset out of range on driver:  *Fail* or *Earliest*, based on failOnDataLoss
#* Offset out of range on executor:  *Fail* or *Earliest*, based on 
failOnDataLoss


I've probably missed something, chime in.


  was:
Possible events for which offsets are needed:
# *New partition* is discovered
# *Offset out of range* (aka, data has been lost)

Possible sources of offsets:
# *Earliest* position in log
# *Latest* position in log
# *Fail* and kill the query
# *Checkpoint* position
# *User specified* per topicpartition
# *Kafka commit log*.  Currently unsupported.  This means users who want to 
migrate from existing kafka jobs need to jump through hoops.  Even if we never 
want to support it, as soon as we take on SPARK-17815 we need to make sure 
Kafka commit log state is clearly documented and handled.
# *Timestamp*.  Currently unsupported.  This could be supported with old, 
inaccurate Kafka time api, or upcoming time index
# *X offsets* before or after latest / earliest position.  Currently 
unsupported.  I think the semantics of this are super unclear by comparison 
with timestamp, given that Kafka doesn't have a single range of offsets.

Currently allowed pre-query configuration, all "ORs" are exclusive:
# startingOffsets: earliest OR latest OR json per topicpartition  (SPARK-17812)
# failOnDataLoss: true (which implies Fail above) OR false (which implies 
Earliest above)  In general, I see no reason this couldn't specify Latest as an 
option.

Possible lifecycle times in which an offset-related event may happen:
# At initial query start
#* New partition: if startingOffsets is *Earliest* or *Latest*, use that.  If 
startingOffsets is *User specified* perTopicpartition, and the new partition 
isn't in the map, *Fail*.  Note that this is effectively undistinguishable from 
new parititon during query, because partitions may have changed in between 
pre-query configuration and query start, but we treat it differently, and users 
in this case are SOL
#* Offset out of range on driver: We don't technically have behavior for this 
case yet.  Could use the value of failOnDataLoss, but it's possible people may 
want to know at 

[jira] [Updated] (SPARK-17937) Clarify Kafka offset semantics for Structured Streaming

2016-10-14 Thread Cody Koeninger (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-17937?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Cody Koeninger updated SPARK-17937:
---
Description: 
Possible events for which offsets are needed:
# *New partition* is discovered
# *Offset out of range* (aka, data has been lost)

Possible sources of offsets:
# *Earliest* position in log
# *Latest* position in log
# *Fail* and kill the query
# *Checkpoint* position
# *User specified* per topicpartition
# *Kafka commit log*.  Currently unsupported.  This means users who want to 
migrate from existing kafka jobs need to jump through hoops.  Even if we never 
want to support it, as soon as we take on SPARK-17815 we need to make sure 
Kafka commit log state is clearly documented and handled.
# *Timestamp*.  Currently unsupported.  This could be supported with old, 
inaccurate Kafka time api, or upcoming time index
# *X offsets* before or after latest / earliest position.  Currently 
unsupported.  I think the semantics of this are super unclear by comparison 
with timestamp, given that Kafka doesn't have a single range of offsets.

Currently allowed pre-query configuration, all "ORs" are exclusive:
# startingOffsets: earliest OR latest OR json per topicpartition  (SPARK-17812)
# failOnDataLoss: true (which implies Fail above) OR false (which implies 
Earliest above)  In general, I see no reason this couldn't specify Latest as an 
option.

Possible lifecycle times in which an offset-related event may happen:
# At initial query start
#* New partition: if startingOffsets is *Earliest* or *Latest*, use that.  If 
startingOffsets is *User specified* perTopicpartition, and the new partition 
isn't in the map, *Fail*.  Note that this is effectively undistinguishable from 
new parititon during query, because partitions may have changed in between 
pre-query configuration and query start, but we treat it differently, and users 
in this case are SOL
#* Offset out of range on driver: We don't technically have behavior for this 
case yet.  Could use the value of failOnDataLoss, but it's possible people may 
want to know at startup that something was wrong, even if they're ok with 
earliest for a during-query out of range
#* Offset out of range on executor: *Fail* or *Earliest*, based on 
failOnDataLoss.
# During query
#* New partition:  *Earliest*, only.  This seems to be by fiat, I see no reason 
this can't be configurable.
#* Offset out of range on driver:  this _probably_ doesn't happen, because 
we're doing explicit seeks to the latest position
#* Offset out of range on executor:  *Fail* or *Earliest*, based on 
failOnDataLoss
# At query restart 
#* New partition: *Checkpoint*, fall back to *Earliest*.  Again, no reason this 
couldn't be configurable fall back to Latest
#* Offset out of range on driver:  *Fail* or *Earliest*, based on failOnDataLoss
#* Offset out of range on executor:  *Fail* or *Earliest*, based on 
failOnDataLoss


I've probably missed something, chime in.


  was:
Possible events for which offsets are needed:
# New partition is discovered
# Offset out of range (aka, data has been lost)

Possible sources of offsets:
# Earliest position in log
# Latest position in log
# Fail and kill the query
# Checkpoint position
# User specified per topicpartition
# Kafka commit log.  Currently unsupported.  This means users who want to 
migrate from existing kafka jobs need to jump through hoops.  Even if we never 
want to support it, as soon as we take on SPARK-17815 we need to make sure 
Kafka commit log state is clearly documented and handled.
# Timestamp.  Currently unsupported.  This could be supported with old, 
inaccurate Kafka time api, or upcoming time index
# X offsets before or after latest / earliest position.  Currently unsupported. 
 I think the semantics of this are super unclear by comparison with timestamp, 
given that Kafka doesn't have a single range of offsets.

Currently allowed pre-query configuration, all "ORs" are exclusive:
# startingOffsets: earliest OR latest OR json per topicpartition  (SPARK-17812)
# failOnDataLoss: true (which implies Fail above) OR false (which implies 
Earliest above)  In general, I see no reason this couldn't specify Latest as an 
option.

Possible lifecycle times in which an offset-related event may happen:
# At initial query start
#* New partition: if startingOffsets is earliest or latest, use that.  If 
startingOffsets is perTopicpartition, and the new partition isn't in the map, 
Fail.  Note that this is effectively undistinguishable from new parititon 
during query, because partitions may have changed in between pre-query 
configuration and query start, but we treat it differently, and users in this 
case are SOL
#* Offset out of range on driver: We don't technically have behavior for this 
case yet.  Could use the value of failOnDataLoss, but it's possible people may 
want to know at startup that something was wrong, even if they're ok with 

[jira] [Updated] (SPARK-17937) Clarify Kafka offset semantics for Structured Streaming

2016-10-14 Thread Cody Koeninger (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-17937?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Cody Koeninger updated SPARK-17937:
---
Description: 
Possible events for which offsets are needed:
# New partition is discovered
# Offset out of range (aka, data has been lost)

Possible sources of offsets:
# Earliest position in log
# Latest position in log
# Fail and kill the query
# Checkpoint position
# User specified per topicpartition
# Kafka commit log.  Currently unsupported.  This means users who want to 
migrate from existing kafka jobs need to jump through hoops.  Even if we never 
want to support it, as soon as we take on SPARK-17815 we need to make sure 
Kafka commit log state is clearly documented and handled.
# Timestamp.  Currently unsupported.  This could be supported with old, 
inaccurate Kafka time api, or upcoming time index
# X offsets before or after latest / earliest position.  Currently unsupported. 
 I think the semantics of this are super unclear by comparison with timestamp, 
given that Kafka doesn't have a single range of offsets.

Currently allowed pre-query configuration, all "ORs" are exclusive:
# startingOffsets: earliest OR latest OR json per topicpartition  (SPARK-17812)
# failOnDataLoss: true (which implies Fail above) OR false (which implies 
Earliest above)  In general, I see no reason this couldn't specify Latest as an 
option.

Possible lifecycle times in which an offset-related event may happen:
# At initial query start
#* New partition: if startingOffsets is earliest or latest, use that.  If 
startingOffsets is perTopicpartition, and the new partition isn't in the map, 
Fail.  Note that this is effectively undistinguishable from new parititon 
during query, because partitions may have changed in between pre-query 
configuration and query start, but we treat it differently, and users in this 
case are SOL
#* Offset out of range on driver: We don't technically have behavior for this 
case yet.  Could use the value of failOnDataLoss, but it's possible people may 
want to know at startup that something was wrong, even if they're ok with 
earliest for a during-query out of range
#* Offset out of range on executor: Fail or Earliest, based on failOnDataLoss.
# During query
#* New partition:  Earliest, only.  This seems to be by fiat, I see no reason 
this can't be configurable.
#* Offset out of range on driver:  this ??probably?? doesn't happen, because 
we're doing explicit seeks to the latest position
#* Offset out of range on executor:  Fail or Earliest, based on failOnDataLoss
# At query restart 
#* New partition: Checkpoint, fall back to Earliest.  Again, no reason this 
couldn't be configurable fall back to Latest
#* Offset out of range on driver:  Fail or Earliest, based on FailOnDataLoss
#* Offset out of range on executor:  Fail or Earliest, based on FailOnDataLoss


I've probably missed something, chime in.


  was:
Possible events for which offsets are needed:
# New partition is discovered
# Offset out of range (aka, data has been lost)

Possible sources of offsets:
# Earliest position in log
# Latest position in log
# Fail and kill the query
# Checkpoint position
# User specified per topicpartition
# Kafka commit log.  Currently unsupported.  This means users who want to 
migrate from existing kafka jobs need to jump through hoops.  Even if we never 
want to support it, as soon as we take on SPARK-17815 we need to make sure 
Kafka commit log state is clearly documented and handled.
# Timestamp.  Currently unsupported.  This could be supported with old, 
inaccurate Kafka time api, or upcoming time index
# X offsets before or after latest / earliest position.  Currently unsupported. 
 I think the semantics of this are super unclear by comparison with timestamp, 
given that Kafka doesn't have a single range of offsets.

Currently allowed pre-query configuration, all "ORs" are exclusive:
# startingOffsets: earliest OR latest OR json per topicpartition  (SPARK-17812)
# failOnDataLoss: true (which implies Fail above) OR false (which implies 
Earliest above)  In general, I see no reason this couldn't specify Latest as an 
option.

Possible lifecycle times in which an offset-related event may happen:
# At initial query start
#* New partition: if startingOffsets is earliest or latest, use that.  If 
startingOffsets is perTopicpartition, and the new partition isn't in the map, 
Fail.  Note that this is effectively undistinguishable from 2a below, because 
partitions may have changed in between pre-query configuration and query start, 
but we treat it differently, and users in this case are SOL
#* Offset out of range on driver: We don't technically have behavior for this 
case yet.  Could use the value of failOnDataLoss, but it's possible people may 
want to know at startup that something was wrong, even if they're ok with 
earliest for a during-query out of range
#* Offset out of range on executor: Fail or 

[jira] [Updated] (SPARK-17937) Clarify Kafka offset semantics for Structured Streaming

2016-10-14 Thread Cody Koeninger (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-17937?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Cody Koeninger updated SPARK-17937:
---
Description: 
Possible events for which offsets are needed:
# New partition is discovered
# Offset out of range (aka, data has been lost)

Possible sources of offsets:
# Earliest position in log
# Latest position in log
# Fail and kill the query
# Checkpoint position
# User specified per topicpartition
# Kafka commit log.  Currently unsupported.  This means users who want to 
migrate from existing kafka jobs need to jump through hoops.  Even if we never 
want to support it, as soon as we take on SPARK-17815 we need to make sure 
Kafka commit log state is clearly documented and handled.
# Timestamp.  Currently unsupported.  This could be supported with old, 
inaccurate Kafka time api, or upcoming time index
# X offsets before or after latest / earliest position.  Currently unsupported. 
 I think the semantics of this are super unclear by comparison with timestamp, 
given that Kafka doesn't have a single range of offsets.

Currently allowed pre-query configuration, all "ORs" are exclusive:
# startingOffsets: earliest OR latest OR json per topicpartition  (SPARK-17812)
# failOnDataLoss: true (which implies Fail above) OR false (which implies 
Earliest above)  In general, I see no reason this couldn't specify Latest as an 
option.

Possible lifecycle times in which an offset-related event may happen:
# At initial query start
#* New partition: if startingOffsets is earliest or latest, use that.  If 
startingOffsets is perTopicpartition, and the new partition isn't in the map, 
Fail.  Note that this is effectively undistinguishable from 2a below, because 
partitions may have changed in between pre-query configuration and query start, 
but we treat it differently, and users in this case are SOL
#* Offset out of range on driver: We don't technically have behavior for this 
case yet.  Could use the value of failOnDataLoss, but it's possible people may 
want to know at startup that something was wrong, even if they're ok with 
earliest for a during-query out of range
#* Offset out of range on executor: Fail or Earliest, based on failOnDataLoss.
# During query
#* New partition:  Earliest, only.  This seems to be by fiat, I see no reason 
this can't be configurable.
#* Offset out of range on driver:  this ??probably?? doesn't happen, because 
we're doing explicit seeks to the latest position
#* Offset out of range on executor:  Fail or Earliest, based on failOnDataLoss
# At query restart 
#* New partition: Checkpoint, fall back to Earliest.  Again, no reason this 
couldn't be configurable fall back to Latest
#* Offset out of range on driver:  Fail or Earliest, based on FailOnDataLoss
#* Offset out of range on executor:  Fail or Earliest, based on FailOnDataLoss


I've probably missed something, chime in.


> Clarify Kafka offset semantics for Structured Streaming
> ---
>
> Key: SPARK-17937
> URL: https://issues.apache.org/jira/browse/SPARK-17937
> Project: Spark
>  Issue Type: Sub-task
>Reporter: Cody Koeninger
>
> Possible events for which offsets are needed:
> # New partition is discovered
> # Offset out of range (aka, data has been lost)
> Possible sources of offsets:
> # Earliest position in log
> # Latest position in log
> # Fail and kill the query
> # Checkpoint position
> # User specified per topicpartition
> # Kafka commit log.  Currently unsupported.  This means users who want to 
> migrate from existing kafka jobs need to jump through hoops.  Even if we 
> never want to support it, as soon as we take on SPARK-17815 we need to make 
> sure Kafka commit log state is clearly documented and handled.
> # Timestamp.  Currently unsupported.  This could be supported with old, 
> inaccurate Kafka time api, or upcoming time index
> # X offsets before or after latest / earliest position.  Currently 
> unsupported.  I think the semantics of this are super unclear by comparison 
> with timestamp, given that Kafka doesn't have a single range of offsets.
> Currently allowed pre-query configuration, all "ORs" are exclusive:
> # startingOffsets: earliest OR latest OR json per topicpartition  
> (SPARK-17812)
> # failOnDataLoss: true (which implies Fail above) OR false (which implies 
> Earliest above)  In general, I see no reason this couldn't specify Latest as 
> an option.
> Possible lifecycle times in which an offset-related event may happen:
> # At initial query start
> #* New partition: if startingOffsets is earliest or latest, use that.  If 
> startingOffsets is perTopicpartition, and the new partition isn't in the map, 
> Fail.  Note that this is effectively undistinguishable from 2a below, because 
> partitions may have changed in between pre-query configuration and query 
> start, but we treat it