[jira] [Commented] (SPARK-17812) More granular control of starting offsets (assign)

2016-10-15 Thread Apache Spark (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17812?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15579206#comment-15579206
 ] 

Apache Spark commented on SPARK-17812:
--

User 'koeninger' has created a pull request for this issue:
https://github.com/apache/spark/pull/15504

> More granular control of starting offsets (assign)
> --
>
> Key: SPARK-17812
> URL: https://issues.apache.org/jira/browse/SPARK-17812
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Reporter: Michael Armbrust
>Assignee: Cody Koeninger
>
> Right now you can only run a Streaming Query starting from either the 
> earliest or latests offsets available at the moment the query is started.  
> Sometimes this is a lot of data.  It would be nice to be able to do the 
> following:
>  - seek to user specified offsets for manually specified topicpartitions
> currently agreed on plan:
> Mutually exclusive subscription options (only assign is new to this ticket)
> {noformat}
> .option("subscribe","topicFoo,topicBar")
> .option("subscribePattern","topic.*")
> .option("assign","""{"topicfoo": [0, 1],"topicbar": [0, 1]}""")
> {noformat}
> where assign can only be specified that way, no inline offsets
> Single starting position option with three mutually exclusive types of value
> {noformat}
> .option("startingOffsets", "earliest" | "latest" | """{"topicFoo": {"0": 
> 1234, "1": -2}, "topicBar":{"0": -1}}""")
> {noformat}
> startingOffsets with json fails if any topicpartition in the assignments 
> doesn't have an offset.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-17812) More granular control of starting offsets (assign)

2016-10-15 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17812?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15578894#comment-15578894
 ] 

Cody Koeninger commented on SPARK-17812:


As you just said yourself, assign doesn't mean you necessarily know the exact 
starting offsets you want.

> More granular control of starting offsets (assign)
> --
>
> Key: SPARK-17812
> URL: https://issues.apache.org/jira/browse/SPARK-17812
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Reporter: Michael Armbrust
>Assignee: Cody Koeninger
>
> Right now you can only run a Streaming Query starting from either the 
> earliest or latests offsets available at the moment the query is started.  
> Sometimes this is a lot of data.  It would be nice to be able to do the 
> following:
>  - seek to user specified offsets for manually specified topicpartitions
> currently agreed on plan:
> Mutually exclusive subscription options (only assign is new to this ticket)
> {noformat}
> .option("subscribe","topicFoo,topicBar")
> .option("subscribePattern","topic.*")
> .option("assign","""{"topicfoo": [0, 1],"topicbar": [0, 1]}""")
> {noformat}
> where assign can only be specified that way, no inline offsets
> Single starting position option with three mutually exclusive types of value
> {noformat}
> .option("startingOffsets", "earliest" | "latest" | """{"topicFoo": {"0": 
> 1234, "1": -2}, "topicBar":{"0": -1}}""")
> {noformat}
> startingOffsets with json fails if any topicpartition in the assignments 
> doesn't have an offset.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-17812) More granular control of starting offsets (assign)

2016-10-15 Thread Ofir Manor (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17812?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15578699#comment-15578699
 ] 

Ofir Manor commented on SPARK-17812:


I think Michael suggest that if you use {{startingOffsets}} without using 
{assign}}, that could work (use the topic-partition list from the 
startingOffsets), and would simplify the user coding (not needing to specify 
two similar lists, simpler resume etc).
You could keep an explicit {{assign}} for the "more rare?" cases, if someone 
wants to specify a list of topic-partitions but also earliest / latest / 
timestamp.

> More granular control of starting offsets (assign)
> --
>
> Key: SPARK-17812
> URL: https://issues.apache.org/jira/browse/SPARK-17812
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Reporter: Michael Armbrust
>Assignee: Cody Koeninger
>
> Right now you can only run a Streaming Query starting from either the 
> earliest or latests offsets available at the moment the query is started.  
> Sometimes this is a lot of data.  It would be nice to be able to do the 
> following:
>  - seek to user specified offsets for manually specified topicpartitions
> currently agreed on plan:
> Mutually exclusive subscription options (only assign is new to this ticket)
> {noformat}
> .option("subscribe","topicFoo,topicBar")
> .option("subscribePattern","topic.*")
> .option("assign","""{"topicfoo": [0, 1],"topicbar": [0, 1]}""")
> {noformat}
> where assign can only be specified that way, no inline offsets
> Single starting position option with three mutually exclusive types of value
> {noformat}
> .option("startingOffsets", "earliest" | "latest" | """{"topicFoo": {"0": 
> 1234, "1": -2}, "topicBar":{"0": -1}}""")
> {noformat}
> startingOffsets with json fails if any topicpartition in the assignments 
> doesn't have an offset.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-17812) More granular control of starting offsets (assign)

2016-10-14 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17812?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15577022#comment-15577022
 ] 

Cody Koeninger commented on SPARK-17812:


Assign is useful, otherwise you have no way of consuming only particular 
partitions of a topic.

Yeah, I just ended up using jackson tree model directly, as you said the 
catalyst stuff isn't really applicable.

Branch with initial implementation is is at 
https://github.com/koeninger/spark-1/tree/SPARK-17812 , will send a PR once I 
have some tests... trying to figure out if there's a reasonable way of unit 
testing offset out of range, but may just give up on that if it seems flaky.

> More granular control of starting offsets (assign)
> --
>
> Key: SPARK-17812
> URL: https://issues.apache.org/jira/browse/SPARK-17812
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Reporter: Michael Armbrust
>Assignee: Cody Koeninger
>
> Right now you can only run a Streaming Query starting from either the 
> earliest or latests offsets available at the moment the query is started.  
> Sometimes this is a lot of data.  It would be nice to be able to do the 
> following:
>  - seek to user specified offsets for manually specified topicpartitions



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-17812) More granular control of starting offsets (assign)

2016-10-14 Thread Michael Armbrust (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17812?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15576840#comment-15576840
 ] 

Michael Armbrust commented on SPARK-17812:
--

That sounds pretty good to me, with one question:  Is {{assign}} useful here?  
It seems you know the list of topicpartitions as they are all passed to 
{{startingOffsets}}.  If we get rid of {{assign}}, and keep the offset log 
format consistent with {{startingOffsets}}, then you could resume a query where 
another left off, simply by copying the last batch.  However, if we keep 
{{assign}}, you'll have to type that out manually and I'm not sure what you are 
gaining.

I would use jackson for the JSON stuff, but I would probably not use 
catalyst/encoders since those require code generation thats not going to buy us 
much.

Thanks for working on this!

> More granular control of starting offsets (assign)
> --
>
> Key: SPARK-17812
> URL: https://issues.apache.org/jira/browse/SPARK-17812
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Reporter: Michael Armbrust
>Assignee: Cody Koeninger
>
> Right now you can only run a Streaming Query starting from either the 
> earliest or latests offsets available at the moment the query is started.  
> Sometimes this is a lot of data.  It would be nice to be able to do the 
> following:
>  - seek to user specified offsets for manually specified topicpartitions



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-17812) More granular control of starting offsets (assign)

2016-10-13 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17812?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15573843#comment-15573843
 ] 

Cody Koeninger commented on SPARK-17812:


So I think this is what we're agreed on:

Mutually exclusive subscription options (only assign is new to this ticket)
{noformat}
.option("subscribe","topicFoo,topicBar")
.option("subscribePattern","topic.*")
.option("assign","""{"topicfoo": [0, 1],"topicbar": [0, 1]}""")
{noformat}

where assign can only be specified that way, no inline offsets

Single starting position option with three mutually exclusive types of value
{noformat}
.option("startingOffsets", "earliest" | "latest" | """{"topicFoo": {"0": 1234, 
"1": -2}, "topicBar":{"0": -1}}""")
{noformat}

startingOffsets with json fails if any topicpartition in the assignments 
doesn't have an offset.

Sound right?

I'll go ahead and start on it.  I'm assuming I should try to reuse some of the 
existing catalyst Jackson stuff and keep in mind a format that's potentially 
usable by the checkpoints as well?

I don't think earliest / latest is too unclear as long as there's a way to get 
to the other knobs that auto.offset.reset (should have) provided. Punting the 
tunability of new partitions to another ticket sounds good.  


> More granular control of starting offsets (assign)
> --
>
> Key: SPARK-17812
> URL: https://issues.apache.org/jira/browse/SPARK-17812
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Reporter: Michael Armbrust
>Assignee: Cody Koeninger
>
> Right now you can only run a Streaming Query starting from either the 
> earliest or latests offsets available at the moment the query is started.  
> Sometimes this is a lot of data.  It would be nice to be able to do the 
> following:
>  - seek to user specified offsets for manually specified topicpartitions



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-17812) More granular control of starting offsets (assign)

2016-10-13 Thread Michael Armbrust (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17812?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15573786#comment-15573786
 ] 

Michael Armbrust commented on SPARK-17812:
--

Please do work on it.  It might be good to update the description with a 
summary of this discussion so we can all be sure we are on the same page.

I actually do think its fair to have one configuration for what to do in the 
case of data loss.  This happens when you fall behind or when you come back and 
new partitions are there that have already aged out.  Lets add this in another 
ticket.

I know you are super deep in Kafka and other should chime in if I'm way 
off-base, but I think that {{startingOffsets=earliest}} and 
{{startingOffsets=latest}} is pretty clear what is happening.  I would not 
change {{earliest}} and {{latest}} just to be different from kafka.  We could 
make it query start if this is still confusing, but lets do that soon if that 
is the case.

> More granular control of starting offsets (assign)
> --
>
> Key: SPARK-17812
> URL: https://issues.apache.org/jira/browse/SPARK-17812
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Reporter: Michael Armbrust
>
> Right now you can only run a Streaming Query starting from either the 
> earliest or latests offsets available at the moment the query is started.  
> Sometimes this is a lot of data.  It would be nice to be able to do the 
> following:
>  - seek to user specified offsets for manually specified topicpartitions



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-17812) More granular control of starting offsets (assign)

2016-10-13 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17812?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15573766#comment-15573766
 ] 

Cody Koeninger commented on SPARK-17812:


OK, failing on start is clear (it's really annoying in the case of 
subscribePattern), but at least it's clear.  I think that's enough to get 
started on this ticket, is anyone currently working on it or can I do it?  Ryan 
seemed worried that it wouldn't get done in time for the next release.

It sounds like your current plan is to ignore whatever comes out of KAFKA-3370, 
which is fine as long as whatever you do is both clear and equally tunable.  
Clarity of semantics can't be the only criterion of an API, "You can only start 
at latest offset, period" is clear, but a crap api.

{quote}
the only case where we lack sufficient tunability is "Where do I go when the 
current offsets are invalid due to retention?".
{quote}

No, you lack sufficient tunability as to where newly discovered partitions 
start.  Keep in mind that those partitions may have been discovered after a 
significant job downtime.  If the point of an API is to provide clear semantics 
to the user, it is not at all clear to me as a user how I can start those 
partitions at latest, which I know is possible in the underlying data model.

The reason I'm belaboring this point now is that you have chosen names 
(earliest, latest) for the API currently under discussion that are confusingly 
similar to the existing auto offset reset functionality, and you have provided 
knobs for some, but not all, of the things auto offset reset currently affects. 
 This is going to confuse people, it already confuses me.



> More granular control of starting offsets (assign)
> --
>
> Key: SPARK-17812
> URL: https://issues.apache.org/jira/browse/SPARK-17812
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Reporter: Michael Armbrust
>
> Right now you can only run a Streaming Query starting from either the 
> earliest or latests offsets available at the moment the query is started.  
> Sometimes this is a lot of data.  It would be nice to be able to do the 
> following:
>  - seek to user specified offsets for manually specified topicpartitions



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-17812) More granular control of starting offsets (assign)

2016-10-13 Thread Michael Armbrust (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17812?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15573677#comment-15573677
 ] 

Michael Armbrust commented on SPARK-17812:
--

bq. with your proposed interface, what, as a user, do you expect to happen when 
you specify startingOffsets for some but not all partitions?

I would probably opt to fail to start the query with advice on how to fix it 
(i.e. "specify {{-1}} for these partitions if you don't care").  We could also 
have a default, but I tend to error on the side of explicit behavior.

bq. Yes, auto.offset.reset is a mess. Have you read 
https://issues.apache.org/jira/browse/KAFKA-3370 What are you going to do when 
that ticket is resolved? It should allow users to answer the questions you 
raised in very specific ways, that your interface does not.

There is clearly a lot of confusing baggage with this configuration option, 
specifically because it is conflating too many unrelated concerns. Furthermore, 
IMHO {{auto.offset.reset}} is a pretty confusing name that does not imply 
anything about where in the stream this query should start. "reset" implies you 
were set somewhere to begin with.

In contrast, {{startingOffsets}} handles one case clearly: it picks the offsets 
that are used as a starting point for the append only table abstraction that 
Spark is providing.

As far as I understand the discussion on the ticket you referenced, the only 
case where we lack sufficient tunability is "Where do I go when the current 
offsets are invalid due to retention?".

In this case, where data has been lost and {{failOnDataLoss=false}}, we 
currently try to minimize the amount of data we lose by starting at the 
earliest offsets available.  We should certainly consider making this behavior 
configurable as well, but that seems like a different concern than what is 
being discussed in this JIRA.

Personally, it seems like if you are falling so far behind that you have to 
skip all the way ahead, something is going very wrong.  However, if users 
request this feature, we should certainly add it. I would not, however, 
shoe-horn it into anything having to do with query start behavior. It seems 
like they have reached a similar conclusion, as they are considering adding a 
new configuration, {{auto.reset.offset.existing}}.

bq. Is the purpose of your interface to do what you think users should be able 
to do, or what they need to be able to do?

The purpose of an interface is to provide clear semantics to the user.

> More granular control of starting offsets (assign)
> --
>
> Key: SPARK-17812
> URL: https://issues.apache.org/jira/browse/SPARK-17812
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Reporter: Michael Armbrust
>
> Right now you can only run a Streaming Query starting from either the 
> earliest or latests offsets available at the moment the query is started.  
> Sometimes this is a lot of data.  It would be nice to be able to do the 
> following:
>  - seek to user specified offsets for manually specified topicpartitions



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-17812) More granular control of starting offsets (assign)

2016-10-13 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17812?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15573563#comment-15573563
 ] 

Cody Koeninger commented on SPARK-17812:


So a short term question
- with your proposed interface, what, as a user, do you expect to happen when 
you specify startingOffsets for some but not all partitions?

A couple of medium term questions:
- Yes, auto.offset.reset is a mess.  Have you read 
https://issues.apache.org/jira/browse/KAFKA-3370
- What are you going to do when that ticket is resolved?  It should allow users 
to answer the questions you raised in very specific ways, that your interface 
does not.

And a really leading long term question:
- Is the purpose of your interface to do what you think users should be able to 
do, or what they need to be able to do?

> More granular control of starting offsets (assign)
> --
>
> Key: SPARK-17812
> URL: https://issues.apache.org/jira/browse/SPARK-17812
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Reporter: Michael Armbrust
>
> Right now you can only run a Streaming Query starting from either the 
> earliest or latests offsets available at the moment the query is started.  
> Sometimes this is a lot of data.  It would be nice to be able to do the 
> following:
>  - seek to user specified offsets for manually specified topicpartitions



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-17812) More granular control of starting offsets (assign)

2016-10-13 Thread Michael Armbrust (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17812?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15573526#comment-15573526
 ] 

Michael Armbrust commented on SPARK-17812:
--

As far as I understand it, {{auto.offset.reset}} is conflating a few things 
that make it hard for me to reason about exactly-once semantics in my query.  
It is answering all of the following:
 - Where do I start when I'm creating this {{group.id}} for the first time?
 - What do I do when a new partition is added to a topic I'm watching?
 - What do I do when the current offset is invalid because of retention?

The model of structured streaming is an append only table, where we are 
computing the same answer incrementally as if you were running a batch query 
over all of the data in the table.  The whole goal is to make it easy to reason 
about correctness and push the hard work of incremental processing and late 
data management into the optimizer / query planner.  As a result, I think we 
are trying to answer a different set of questions than a distributed set of 
consumers that share a {{group.id}}:
 - Should this append only table contain all of the historical data available, 
or do I begin at this moment and start appending?  This is what 
{{startingOffsets}} answers.  I think we should handle {{"earliest"}} (all 
data), {{"latest"}} (only data that arrives after now), and a very specific 
point in time across partitions (probably when some other query stopped 
running).
 - When I get into a situation where data has been deleted by the retention 
mechanism without me seeing it, what should I do?  Fail the query?  Or issue a 
warning and compute best effort on the data available.   This is what 
{{failOnDataLoss}} answers.

In particular, I think the kafka method of configuration makes it confusing to 
do something like, "starting now, compute some aggregation exactly once".  The 
documentation even points out some of the pit falls:
bq. ... If this is set to largest, the consumer may lose some messages when the 
number of partitions, for the topics it subscribes to, changes on the broker. 
To prevent data loss during partition addition, set auto.offset.reset to 
smallest.

Really what I want here is, "begin the query at largest", but "start new 
partitions at smallest (and in fact, tell me if I'm so late joining a new 
partition that I have already lost some data)".



> More granular control of starting offsets (assign)
> --
>
> Key: SPARK-17812
> URL: https://issues.apache.org/jira/browse/SPARK-17812
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Reporter: Michael Armbrust
>
> Right now you can only run a Streaming Query starting from either the 
> earliest or latests offsets available at the moment the query is started.  
> Sometimes this is a lot of data.  It would be nice to be able to do the 
> following:
>  - seek to user specified offsets for manually specified topicpartitions



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-17812) More granular control of starting offsets (assign)

2016-10-13 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17812?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15573479#comment-15573479
 ] 

Cody Koeninger commented on SPARK-17812:


While some decision is better than none, can you help me understand why you 
don't believe me that auto.offset.reset is orthogonal to specifying specific 
starting positions?  Or do you just not believe it's important?

The reasons you guys used a different name from auto.offset.reset are that the 
Kafka project semantics of it are inadequate. But they will fix it, and when 
they do, the fact that you have conflated two unrelated things into one 
configuration in your api is going to cause problems.



> More granular control of starting offsets (assign)
> --
>
> Key: SPARK-17812
> URL: https://issues.apache.org/jira/browse/SPARK-17812
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Reporter: Michael Armbrust
>
> Right now you can only run a Streaming Query starting from either the 
> earliest or latests offsets available at the moment the query is started.  
> Sometimes this is a lot of data.  It would be nice to be able to do the 
> following:
>  - seek to user specified offsets for manually specified topicpartitions



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-17812) More granular control of starting offsets (assign)

2016-10-13 Thread Ofir Manor (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17812?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15573457#comment-15573457
 ] 

Ofir Manor commented on SPARK-17812:


I'm with you - I warned you it is bikeshedding...
I don't have a strong opinion, just a preference, and what you suggested is way 
better then the commited solution, so I'll get out of the loop.
Whatever [~marmbrus] and you are OK with - either way it would be a big step 
forward

> More granular control of starting offsets (assign)
> --
>
> Key: SPARK-17812
> URL: https://issues.apache.org/jira/browse/SPARK-17812
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Reporter: Michael Armbrust
>
> Right now you can only run a Streaming Query starting from either the 
> earliest or latests offsets available at the moment the query is started.  
> Sometimes this is a lot of data.  It would be nice to be able to do the 
> following:
>  - seek to user specified offsets for manually specified topicpartitions



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-17812) More granular control of starting offsets (assign)

2016-10-13 Thread Michael Armbrust (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17812?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15573459#comment-15573459
 ] 

Michael Armbrust commented on SPARK-17812:
--

+1 to the suggested was of subscribing, and for using "assign" as a familiar 
name.

I would probably leave it with a single option like this:
{{.option("startingOffsets", "earliest" | "latest" | """{"topicFoo": {"0": 
1234, "1", 4567\}\}"""}}

> More granular control of starting offsets (assign)
> --
>
> Key: SPARK-17812
> URL: https://issues.apache.org/jira/browse/SPARK-17812
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Reporter: Michael Armbrust
>
> Right now you can only run a Streaming Query starting from either the 
> earliest or latests offsets available at the moment the query is started.  
> Sometimes this is a lot of data.  It would be nice to be able to do the 
> following:
>  - seek to user specified offsets for manually specified topicpartitions



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-17812) More granular control of starting offsets (assign)

2016-10-13 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17812?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15573432#comment-15573432
 ] 

Cody Koeninger commented on SPARK-17812:


If you're seriously worried that people are going to get confused,

{noformat}
.option("startingOffsets", """{"topicFoo": {"0": 1234, "1", 4567}}""")
.option("defaultOffsets", "earliest" | "latest")
{noformat}

let those two at least not be mutually exclusive, and punt on the question of 
precedence until there's an actual startingTime or startingX ticket.



> More granular control of starting offsets (assign)
> --
>
> Key: SPARK-17812
> URL: https://issues.apache.org/jira/browse/SPARK-17812
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Reporter: Michael Armbrust
>
> Right now you can only run a Streaming Query starting from either the 
> earliest or latests offsets available at the moment the query is started.  
> Sometimes this is a lot of data.  It would be nice to be able to do the 
> following:
>  - seek to user specified offsets for manually specified topicpartitions



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-17812) More granular control of starting offsets (assign)

2016-10-13 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17812?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15573395#comment-15573395
 ] 

Cody Koeninger commented on SPARK-17812:


1. we dont have lists, we have strings.  regexes and valid topic names have 
overlaps (dot is the obvious one).

2. Mapping directly to kafka method names means we don't have to come up with 
some other (weird and possibly overlapping) name when they add more ways to 
subscribe, we just use theirs.

3. I think this is a mess with kafka semantics for the reasons both you and I 
have already expressed.  At any rate, I think Michael already clearly punted 
the "starting X" case to a different topic.

4. I  think it's more than sufficiently clear as suggested, no one is going to 
expect that a specific offset they provided is going to be overruled by a 
general single default.   The implementation is also crystal clear - seek to 
the position identified by startingTime, then seek to any specific offsets for 
specific partitions

Yes, this is all bikeshedding, but it's bikeshedding that directly affects what 
people are actually able to do with the api.  Needlessly restricting it for 
reasons that have nothing to do with safety is just going to piss users off for 
no reason. Just because you don't have a use case that needs it, doesn't mean 
you should arbitrarily prevent users from doing it.

Please, just choose something and let me build it so that people can actually 
use the thing by the next release

> More granular control of starting offsets (assign)
> --
>
> Key: SPARK-17812
> URL: https://issues.apache.org/jira/browse/SPARK-17812
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Reporter: Michael Armbrust
>
> Right now you can only run a Streaming Query starting from either the 
> earliest or latests offsets available at the moment the query is started.  
> Sometimes this is a lot of data.  It would be nice to be able to do the 
> following:
>  - seek to user specified offsets for manually specified topicpartitions



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-17812) More granular control of starting offsets (assign)

2016-10-13 Thread Ofir Manor (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17812?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15573341#comment-15573341
 ] 

Ofir Manor commented on SPARK-17812:


Thanks Cody! great to have a concrete example.
I've some comments, but its mostly bikeshedding
1.  subscribe vs. subscribePattern --> personally, I would combine them both to 
"subscribe" - no need to burden the user with the different Kafka API nuances. 
It can get a list of discreet topics or a pattern.
2. It would be much clearer if "assign" was called subscribeSomething, so the 
user would choose one "subscribe.." and one (or more) "starting...".
Not sure I have a good name though - subscribeCustom?
You can even use the regular subscribe for that (and be smarter with the 
pattern matching) - I think it would just work, and if someone tries to be 
funny (combine astrerix and partitions) we could just error
3. I like startingTime... pretty neat.
We could hypothetically add {{.option("startingMessages", long)}} to support 
Michael's "just start with a 1000 recent messages"...
4. As I said before, I'd rather have all starting* be mutual-exclusive. Yes, it 
blocks some edge cases, on purpose,  but make the API and code way clearer 
(think about startingMessage interacting with startingOffsets etc).
I think that it would be easier to regret and allow multiple starting* in the 
future (opening all sorts of esoteric combinations) than clean it up in the 
future if users find it confusing and not needed.
Anyway, as long as it is functional I'm good with it, even if it less aesthetic.

> More granular control of starting offsets (assign)
> --
>
> Key: SPARK-17812
> URL: https://issues.apache.org/jira/browse/SPARK-17812
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Reporter: Michael Armbrust
>
> Right now you can only run a Streaming Query starting from either the 
> earliest or latests offsets available at the moment the query is started.  
> Sometimes this is a lot of data.  It would be nice to be able to do the 
> following:
>  - seek to user specified offsets for manually specified topicpartitions



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-17812) More granular control of starting offsets (assign)

2016-10-13 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17812?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15573166#comment-15573166
 ] 

Cody Koeninger commented on SPARK-17812:


Here's my concrete suggestion:

3 mutually exclusive ways of subscribing:

{noformat}
.option("subscribe","topicFoo,topicBar")
.option("subscribePattern","topic.*")
.option("assign","""{"topicfoo": [0, 1],"topicbar": [0, 1]}""")
{noformat}

where assign can only be specified that way, no inline offsets

2 non-mutually exclusive ways of specifying starting position, explicit 
startingOffsets obviously take priority:

{noformat}
.option("startingOffsets", """{"topicFoo": {"0": 1234, "1", 4567}""")
.option("startingTime", "earliest" | "latest" | long)
{noformat}
where long is a timestamp, work to be done on that later.
Note that even kafka 0.8 has a (really crappy based on log file modification 
time) api for time so later pursuing timestamps startingTime doesn't 
necessarily exclude it



> More granular control of starting offsets (assign)
> --
>
> Key: SPARK-17812
> URL: https://issues.apache.org/jira/browse/SPARK-17812
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Reporter: Michael Armbrust
>
> Right now you can only run a Streaming Query starting from either the 
> earliest or latests offsets available at the moment the query is started.  
> Sometimes this is a lot of data.  It would be nice to be able to do the 
> following:
>  - seek to user specified offsets for manually specified topicpartitions



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-17812) More granular control of starting offsets (assign)

2016-10-13 Thread Ofir Manor (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17812?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15573109#comment-15573109
 ] 

Ofir Manor commented on SPARK-17812:


Regarding (1) - of course it is *all* data in the source, as of query start. 
Just the same as file system directory or a database table - I'm not sure a 
disclaimer that the directory or table could have had different data in the 
past adds anything but confusion...
Anyway, the startingOffset is confusing because, it seems you want a different 
parameter for "assign" --> to explicitly specify starting offsets.
For you use case, I would add:
5. Give me nnn messages (not last ones). We still do one of the above options 
(trying to go back nnn messages, somehow split between the topic-partitions 
involved), but not provide a more explicit guarantee like "last nnn". 
Generally, the distribution of messages to partitions don't have to be 
round-robin or uniform, it is strongly based on the key (example, could be 
state, could be URL etc).
Anyway, I haven't seen a concrete suggestion on how to specify offsets or 
timestamp, so I think that would be the next step in this ticket (I suggested 
you could condense all to one option to avoid dependencies between options, but 
I don't have an elegant "stringly" suggestion)

> More granular control of starting offsets (assign)
> --
>
> Key: SPARK-17812
> URL: https://issues.apache.org/jira/browse/SPARK-17812
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Reporter: Michael Armbrust
>
> Right now you can only run a Streaming Query starting from either the 
> earliest or latests offsets available at the moment the query is started.  
> Sometimes this is a lot of data.  It would be nice to be able to do the 
> following:
>  - seek to user specified offsets for manually specified topicpartitions



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-17812) More granular control of starting offsets (assign)

2016-10-13 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17812?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15573089#comment-15573089
 ] 

Cody Koeninger commented on SPARK-17812:


One other slightly ugly thing...

{noformat}
// starting topicpartitions, no explicit offset
.option("assign", """{"topicfoo": [0, 1],"topicbar": [0, 1]}"""

// do you allow specifying with explicit offsets in the same config option? 
// or force it all into startingOffsetForRealzYo?
.option("assign", """{ "topicfoo" :{ "0": 1234, "1": 4567 }, "topicbar" : { 
"0": 1234, "1": 4567 }}""")
{noformat}

> More granular control of starting offsets (assign)
> --
>
> Key: SPARK-17812
> URL: https://issues.apache.org/jira/browse/SPARK-17812
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Reporter: Michael Armbrust
>
> Right now you can only run a Streaming Query starting from either the 
> earliest or latests offsets available at the moment the query is started.  
> Sometimes this is a lot of data.  It would be nice to be able to do the 
> following:
>  - seek to user specified offsets for manually specified topicpartitions



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-17812) More granular control of starting offsets (assign)

2016-10-13 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17812?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15572922#comment-15572922
 ] 

Cody Koeninger commented on SPARK-17812:


Sorry, I didn't see this comment until just now.

X offsets back per partition is not a reasonable proxy for time when you're 
dealing with a stream that has multiple topics in it.  Agree we should break 
that out, focus on defining starting offsets in this ticket.

The concern with startingOffsets naming is that, because auto.offset.reset is 
orthogonal to specifying some offsets, you have a situation like this:

.format("kafka")
.option("subscribePattern", "topic.*")
.option("startingOffset", "latest")
.option("startingOffsetForRealzYo", """ { "topicfoo" : { "0": 1234, "1": 4567 
}, "topicbar" : { "0": 1234, "1": 4567 }}""")

where startingOffsetForRealzYo has a more reasonable name that conveys it is 
specifying starting offsets, yet is not confusingly similar to startingOffset

Trying to hack it all into one json as an alternative, with a "default" topic, 
means you're going to have to pick a key that isn't a valid topic, or add yet 
another layer of indirection.  It also makes it harder to make the format 
consistent with SPARK-17829 (which seems like a good thing to keep consistent, 
I agree)

Obviously I think you should just change the name, but it's your show.





> More granular control of starting offsets (assign)
> --
>
> Key: SPARK-17812
> URL: https://issues.apache.org/jira/browse/SPARK-17812
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Reporter: Michael Armbrust
>
> Right now you can only run a Streaming Query starting from either the 
> earliest or latests offsets available at the moment the query is started.  
> Sometimes this is a lot of data.  It would be nice to be able to do the 
> following:
>  - seek to user specified offsets for manually specified topicpartitions



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-17812) More granular control of starting offsets

2016-10-11 Thread Michael Armbrust (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17812?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15566392#comment-15566392
 ] 

Michael Armbrust commented on SPARK-17812:
--

For the seeking back {{X}} offsets use case, I was interactively querying a 
stream and I wanted *some* data, but not *all available data*.  I did not have 
specific offsets in mind, and under the assumption that items are getting 
hashed across partitions, X offsets back is a very reasonable proxy for time.  
I agree actual time would be better.  However, since there is disagreement on 
this case, I'd propose we break that out into its own ticket and focus on 
assign here.

I'm not sure I understand the concern with the {{startingOffsets}} option 
naming (which we can still change, though, it would be nice to do so before a 
release happens).  It affects which offsets will be included in the query and 
it only takes affect when the query is first started.  [~ofirm], currently we 
support  (1) (though I wouldn't say *all* data as we are limited by retention / 
compaction) and (2).  As you said, we can also support (3), though this must be 
done after the fact by adding a predicate to the stream on the timestamp 
column.  For performance it would be nice to push that down into Kafaka, but 
I'd split that optimization into another ticket.

Regarding (4), I like the proposed JSON solution.  It would be nice if this was 
unified with whatever format we decide to use in [SPARK-17829] so that you 
could easily pick up where another query left off.  I'd also suggest we use 
{{-1}} and {{-2}} as special offsets for subscribing to a topicpartition at the 
earliest or latests available offsets at query start time.

> More granular control of starting offsets
> -
>
> Key: SPARK-17812
> URL: https://issues.apache.org/jira/browse/SPARK-17812
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Reporter: Michael Armbrust
>
> Right now you can only run a Streaming Query starting from either the 
> earliest or latests offsets available at the moment the query is started.  
> Sometimes this is a lot of data.  It would be nice to be able to do the 
> following:
>  - seek back {{X}} offsets in the stream from the moment the query starts
>  - seek to user specified offsets



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-17812) More granular control of starting offsets

2016-10-10 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17812?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15563285#comment-15563285
 ] 

Cody Koeninger commented on SPARK-17812:


No, it's not covered by strict assign.  If you don't have this, you're 
basically saying you can never have well-defined starting offsets for a job 
that starts as SubscribePattern.

The existing DStream already does this, because it doesn't conflate 
auto.offset.reset with user-specified offsets.

> More granular control of starting offsets
> -
>
> Key: SPARK-17812
> URL: https://issues.apache.org/jira/browse/SPARK-17812
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Reporter: Michael Armbrust
>
> Right now you can only run a Streaming Query starting from either the 
> earliest or latests offsets available at the moment the query is started.  
> Sometimes this is a lot of data.  It would be nice to be able to do the 
> following:
>  - seek back {{X}} offsets in the stream from the moment the query starts
>  - seek to user specified offsets



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-17812) More granular control of starting offsets

2016-10-10 Thread Ofir Manor (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17812?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15563273#comment-15563273
 ] 

Ofir Manor commented on SPARK-17812:


If you think it is useful, you should definitely propose something that covers 
it.
Personally, I can't imagine a case where someone will need that - but you have 
much more exposure to different users. 
I can only say that it is already covered by the strict "assign" (using only 
partitions that were explictly used) - the user will open a new consumer group, 
seek to the end, seek specific partitions to its preferred offset and pass the 
explictly all the topic-partitions-offsets to the "assign" option

BTW - just from the top of my head - maybe you want'll to add an helper 
function or two that:
- gets a specific consumer group, and return whatever object / magic string 
that is needed to configure a starting offset from Structured Streaming.
- Same as above, but reads from a Structured Streaming checkpoint and return a 
new consumer group (created, seeked, commited). That relates to the "DRP" 
ticket, and also just passed to the previous helper function (from checkpoint 
to a new streaming job).



> More granular control of starting offsets
> -
>
> Key: SPARK-17812
> URL: https://issues.apache.org/jira/browse/SPARK-17812
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Reporter: Michael Armbrust
>
> Right now you can only run a Streaming Query starting from either the 
> earliest or latests offsets available at the moment the query is started.  
> Sometimes this is a lot of data.  It would be nice to be able to do the 
> following:
>  - seek back {{X}} offsets in the stream from the moment the query starts
>  - seek to user specified offsets



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-17812) More granular control of starting offsets

2016-10-09 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17812?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15560925#comment-15560925
 ] 

Cody Koeninger commented on SPARK-17812:


I want to start a pattern subscription at known good offsets for a
particular topic, and latest available for other topics that match.




> More granular control of starting offsets
> -
>
> Key: SPARK-17812
> URL: https://issues.apache.org/jira/browse/SPARK-17812
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Reporter: Michael Armbrust
>
> Right now you can only run a Streaming Query starting from either the 
> earliest or latests offsets available at the moment the query is started.  
> Sometimes this is a lot of data.  It would be nice to be able to do the 
> following:
>  - seek back {{X}} offsets in the stream from the moment the query starts
>  - seek to user specified offsets



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-17812) More granular control of starting offsets

2016-10-09 Thread Ofir Manor (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17812?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15560865#comment-15560865
 ] 

Ofir Manor commented on SPARK-17812:


Why? If you choose oldest, latest or by timestamp, you can combine it with a 
topic pattern.
If you explictly provide a list of specific topic-partition-offsets, Spark 
should respect that.
Choosing topics and choosing a starting position method are complementary 
choices

> More granular control of starting offsets
> -
>
> Key: SPARK-17812
> URL: https://issues.apache.org/jira/browse/SPARK-17812
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Reporter: Michael Armbrust
>
> Right now you can only run a Streaming Query starting from either the 
> earliest or latests offsets available at the moment the query is started.  
> Sometimes this is a lot of data.  It would be nice to be able to do the 
> following:
>  - seek back {{X}} offsets in the stream from the moment the query starts
>  - seek to user specified offsets



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-17812) More granular control of starting offsets

2016-10-09 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17812?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15560839#comment-15560839
 ] 

Cody Koeninger commented on SPARK-17812:


That totally kills the usability of SubscribePattern.

> More granular control of starting offsets
> -
>
> Key: SPARK-17812
> URL: https://issues.apache.org/jira/browse/SPARK-17812
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Reporter: Michael Armbrust
>
> Right now you can only run a Streaming Query starting from either the 
> earliest or latests offsets available at the moment the query is started.  
> Sometimes this is a lot of data.  It would be nice to be able to do the 
> following:
>  - seek back {{X}} offsets in the stream from the moment the query starts
>  - seek to user specified offsets



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-17812) More granular control of starting offsets

2016-10-09 Thread Ofir Manor (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17812?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15560820#comment-15560820
 ] 

Ofir Manor commented on SPARK-17812:


I'll have to think about it - not sure if it ever makes sense to provide custom 
offsets, skip some partitions, and expect a user-configurable default offsets 
for the missing ones.
I would actually suggest that if a user provide custom offsets, they would be 
used as is. For example, if I provide offsets only for topic 1 partition 
1,4,10, then the consumer group would subscribe to only these three partitions. 
In other words, custom assignment should be respected as is.

> More granular control of starting offsets
> -
>
> Key: SPARK-17812
> URL: https://issues.apache.org/jira/browse/SPARK-17812
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Reporter: Michael Armbrust
>
> Right now you can only run a Streaming Query starting from either the 
> earliest or latests offsets available at the moment the query is started.  
> Sometimes this is a lot of data.  It would be nice to be able to do the 
> following:
>  - seek back {{X}} offsets in the stream from the moment the query starts
>  - seek to user specified offsets



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-17812) More granular control of starting offsets

2016-10-09 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17812?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15560719#comment-15560719
 ] 

Cody Koeninger commented on SPARK-17812:


Generally agree with the direction of what you're saying, but the question of 
where to start for partitions that don't have offsets specified is orthogonal 
to the question of where/how to specify offsets for particular partitions.  
It's not just 4 options.

> More granular control of starting offsets
> -
>
> Key: SPARK-17812
> URL: https://issues.apache.org/jira/browse/SPARK-17812
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Reporter: Michael Armbrust
>
> Right now you can only run a Streaming Query starting from either the 
> earliest or latests offsets available at the moment the query is started.  
> Sometimes this is a lot of data.  It would be nice to be able to do the 
> following:
>  - seek back {{X}} offsets in the stream from the moment the query starts
>  - seek to user specified offsets



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-17812) More granular control of starting offsets

2016-10-09 Thread Ofir Manor (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17812?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15560696#comment-15560696
 ] 

Ofir Manor commented on SPARK-17812:


Cody, you are absolutely right that the option naming is silly and leads to a 
dead end. Maybe it could be fixed now, as this code haven't yet been released.
In general, I see just four useful options for a starting position:
1. Give me all messages - read all messages in the topics.
2. Ignore all current messages - read only new messages from now on.
3. Give me  all messages starting from timestamp t - that could be a filter on 
(1), or in Kafka 0.10.1+, pushed down to Kafka (in that version, the Kafka 
topic can also have either broker-generated timestamps, or user-provided event 
timestamps).
4. Give me all messages from a custom offset - for "advanced" cases

I can suggest something specific (though it will be a matter of taste, stringy 
or not), but generally, I think there should be a single way to specify where 
to start, and it should cover these four alternatives. Having a bunch of 
mutally-exclusive options seems confusing, and giving them the wrong names is 
even more so.

Regarding "last x offsets" - I don't really get it. It seems to assume that 
Kafka has a single offset space, which is quite alien to Kafka (a topic is a 
collection of independent, ordered partitions).
For example, a simple topic with four partitions. What is 1000 offsets back?
1. Last 1000 messages per partition? (4000 in total)
2. Last 250 messages per partition? (definitely NOT the last 1000 messages)
3. Read last 1000 messages per partition, then merge and keep the last 1000 
messages by timestamp? (provide a somewhat meaningful semantics, but is still a 
bit nonsense)
For me, neither of them makes sense, because the user actually says - I want 
random stuff and I don't care what it is... It is as if, for a database source, 
we would start with random 1000 rows, followed by careful work to capture every 
change without missing any.
I believe "last hour" would make a lot more sense, and if someone really wants 
some variation of this "last 1000 messages", he could just create a custom 
offset.

> More granular control of starting offsets
> -
>
> Key: SPARK-17812
> URL: https://issues.apache.org/jira/browse/SPARK-17812
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Reporter: Michael Armbrust
>
> Right now you can only run a Streaming Query starting from either the 
> earliest or latests offsets available at the moment the query is started.  
> Sometimes this is a lot of data.  It would be nice to be able to do the 
> following:
>  - seek back {{X}} offsets in the stream from the moment the query starts
>  - seek to user specified offsets



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-17812) More granular control of starting offsets

2016-10-08 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17812?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15558506#comment-15558506
 ] 

Cody Koeninger commented on SPARK-17812:


So I'm willing to do this work, mostly because I've already done it, but there 
are some user interface issues here that need to get figured out.

You already chose the name "startingOffset" for specifying the equivalent of 
auto.offset.reset.  Now we're looking at actually adding starting offsets.  
Furthermore, it should be possible to specify starting offsets for some 
partitions, while relying on the equivalent of auto.offset.reset for other 
unspecified ones (the existing DStream does this).

What are you expecting configuration of this to look like?  I can see a couple 
of options:

1. Try to cram everything into startingOffset with some horrible string-based 
DSL
2. Have a separate option for specifying starting offsets for real, with a name 
that makes it clear what it is, yet doesn't use "startingoffset".  As for the 
value, I guess in json form of some kind?   { "topicfoo" : { "0": 1234, "1": 
4567 }}

Somewhat related is that Assign needs a way of specifying topicpartitions.

As far as the idea to seek back X offsets, I think it'd be better to look at 
offset time indexing.
If you are going to do the X offsets back idea, the offsets -1L and -2L already 
have special meaning, so it's going to be kind of confusing to allow negative 
numbers in an interface that is specifying offsets.


> More granular control of starting offsets
> -
>
> Key: SPARK-17812
> URL: https://issues.apache.org/jira/browse/SPARK-17812
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Reporter: Michael Armbrust
>
> Right now you can only run a Streaming Query starting from either the 
> earliest or latests offsets available at the moment the query is started.  
> Sometimes this is a lot of data.  It would be nice to be able to do the 
> following:
>  - seek back {{X}} offsets in the stream from the moment the query starts
>  - seek to user specified offsets



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org