[jira] [Commented] (SPARK-17812) More granular control of starting offsets (assign)
[ https://issues.apache.org/jira/browse/SPARK-17812?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15579206#comment-15579206 ] Apache Spark commented on SPARK-17812: -- User 'koeninger' has created a pull request for this issue: https://github.com/apache/spark/pull/15504 > More granular control of starting offsets (assign) > -- > > Key: SPARK-17812 > URL: https://issues.apache.org/jira/browse/SPARK-17812 > Project: Spark > Issue Type: Sub-task > Components: SQL >Reporter: Michael Armbrust >Assignee: Cody Koeninger > > Right now you can only run a Streaming Query starting from either the > earliest or latests offsets available at the moment the query is started. > Sometimes this is a lot of data. It would be nice to be able to do the > following: > - seek to user specified offsets for manually specified topicpartitions > currently agreed on plan: > Mutually exclusive subscription options (only assign is new to this ticket) > {noformat} > .option("subscribe","topicFoo,topicBar") > .option("subscribePattern","topic.*") > .option("assign","""{"topicfoo": [0, 1],"topicbar": [0, 1]}""") > {noformat} > where assign can only be specified that way, no inline offsets > Single starting position option with three mutually exclusive types of value > {noformat} > .option("startingOffsets", "earliest" | "latest" | """{"topicFoo": {"0": > 1234, "1": -2}, "topicBar":{"0": -1}}""") > {noformat} > startingOffsets with json fails if any topicpartition in the assignments > doesn't have an offset. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-17812) More granular control of starting offsets (assign)
[ https://issues.apache.org/jira/browse/SPARK-17812?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15578894#comment-15578894 ] Cody Koeninger commented on SPARK-17812: As you just said yourself, assign doesn't mean you necessarily know the exact starting offsets you want. > More granular control of starting offsets (assign) > -- > > Key: SPARK-17812 > URL: https://issues.apache.org/jira/browse/SPARK-17812 > Project: Spark > Issue Type: Sub-task > Components: SQL >Reporter: Michael Armbrust >Assignee: Cody Koeninger > > Right now you can only run a Streaming Query starting from either the > earliest or latests offsets available at the moment the query is started. > Sometimes this is a lot of data. It would be nice to be able to do the > following: > - seek to user specified offsets for manually specified topicpartitions > currently agreed on plan: > Mutually exclusive subscription options (only assign is new to this ticket) > {noformat} > .option("subscribe","topicFoo,topicBar") > .option("subscribePattern","topic.*") > .option("assign","""{"topicfoo": [0, 1],"topicbar": [0, 1]}""") > {noformat} > where assign can only be specified that way, no inline offsets > Single starting position option with three mutually exclusive types of value > {noformat} > .option("startingOffsets", "earliest" | "latest" | """{"topicFoo": {"0": > 1234, "1": -2}, "topicBar":{"0": -1}}""") > {noformat} > startingOffsets with json fails if any topicpartition in the assignments > doesn't have an offset. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-17812) More granular control of starting offsets (assign)
[ https://issues.apache.org/jira/browse/SPARK-17812?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15578699#comment-15578699 ] Ofir Manor commented on SPARK-17812: I think Michael suggest that if you use {{startingOffsets}} without using {assign}}, that could work (use the topic-partition list from the startingOffsets), and would simplify the user coding (not needing to specify two similar lists, simpler resume etc). You could keep an explicit {{assign}} for the "more rare?" cases, if someone wants to specify a list of topic-partitions but also earliest / latest / timestamp. > More granular control of starting offsets (assign) > -- > > Key: SPARK-17812 > URL: https://issues.apache.org/jira/browse/SPARK-17812 > Project: Spark > Issue Type: Sub-task > Components: SQL >Reporter: Michael Armbrust >Assignee: Cody Koeninger > > Right now you can only run a Streaming Query starting from either the > earliest or latests offsets available at the moment the query is started. > Sometimes this is a lot of data. It would be nice to be able to do the > following: > - seek to user specified offsets for manually specified topicpartitions > currently agreed on plan: > Mutually exclusive subscription options (only assign is new to this ticket) > {noformat} > .option("subscribe","topicFoo,topicBar") > .option("subscribePattern","topic.*") > .option("assign","""{"topicfoo": [0, 1],"topicbar": [0, 1]}""") > {noformat} > where assign can only be specified that way, no inline offsets > Single starting position option with three mutually exclusive types of value > {noformat} > .option("startingOffsets", "earliest" | "latest" | """{"topicFoo": {"0": > 1234, "1": -2}, "topicBar":{"0": -1}}""") > {noformat} > startingOffsets with json fails if any topicpartition in the assignments > doesn't have an offset. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-17812) More granular control of starting offsets (assign)
[ https://issues.apache.org/jira/browse/SPARK-17812?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15577022#comment-15577022 ] Cody Koeninger commented on SPARK-17812: Assign is useful, otherwise you have no way of consuming only particular partitions of a topic. Yeah, I just ended up using jackson tree model directly, as you said the catalyst stuff isn't really applicable. Branch with initial implementation is is at https://github.com/koeninger/spark-1/tree/SPARK-17812 , will send a PR once I have some tests... trying to figure out if there's a reasonable way of unit testing offset out of range, but may just give up on that if it seems flaky. > More granular control of starting offsets (assign) > -- > > Key: SPARK-17812 > URL: https://issues.apache.org/jira/browse/SPARK-17812 > Project: Spark > Issue Type: Sub-task > Components: SQL >Reporter: Michael Armbrust >Assignee: Cody Koeninger > > Right now you can only run a Streaming Query starting from either the > earliest or latests offsets available at the moment the query is started. > Sometimes this is a lot of data. It would be nice to be able to do the > following: > - seek to user specified offsets for manually specified topicpartitions -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-17812) More granular control of starting offsets (assign)
[ https://issues.apache.org/jira/browse/SPARK-17812?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15576840#comment-15576840 ] Michael Armbrust commented on SPARK-17812: -- That sounds pretty good to me, with one question: Is {{assign}} useful here? It seems you know the list of topicpartitions as they are all passed to {{startingOffsets}}. If we get rid of {{assign}}, and keep the offset log format consistent with {{startingOffsets}}, then you could resume a query where another left off, simply by copying the last batch. However, if we keep {{assign}}, you'll have to type that out manually and I'm not sure what you are gaining. I would use jackson for the JSON stuff, but I would probably not use catalyst/encoders since those require code generation thats not going to buy us much. Thanks for working on this! > More granular control of starting offsets (assign) > -- > > Key: SPARK-17812 > URL: https://issues.apache.org/jira/browse/SPARK-17812 > Project: Spark > Issue Type: Sub-task > Components: SQL >Reporter: Michael Armbrust >Assignee: Cody Koeninger > > Right now you can only run a Streaming Query starting from either the > earliest or latests offsets available at the moment the query is started. > Sometimes this is a lot of data. It would be nice to be able to do the > following: > - seek to user specified offsets for manually specified topicpartitions -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-17812) More granular control of starting offsets (assign)
[ https://issues.apache.org/jira/browse/SPARK-17812?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15573843#comment-15573843 ] Cody Koeninger commented on SPARK-17812: So I think this is what we're agreed on: Mutually exclusive subscription options (only assign is new to this ticket) {noformat} .option("subscribe","topicFoo,topicBar") .option("subscribePattern","topic.*") .option("assign","""{"topicfoo": [0, 1],"topicbar": [0, 1]}""") {noformat} where assign can only be specified that way, no inline offsets Single starting position option with three mutually exclusive types of value {noformat} .option("startingOffsets", "earliest" | "latest" | """{"topicFoo": {"0": 1234, "1": -2}, "topicBar":{"0": -1}}""") {noformat} startingOffsets with json fails if any topicpartition in the assignments doesn't have an offset. Sound right? I'll go ahead and start on it. I'm assuming I should try to reuse some of the existing catalyst Jackson stuff and keep in mind a format that's potentially usable by the checkpoints as well? I don't think earliest / latest is too unclear as long as there's a way to get to the other knobs that auto.offset.reset (should have) provided. Punting the tunability of new partitions to another ticket sounds good. > More granular control of starting offsets (assign) > -- > > Key: SPARK-17812 > URL: https://issues.apache.org/jira/browse/SPARK-17812 > Project: Spark > Issue Type: Sub-task > Components: SQL >Reporter: Michael Armbrust >Assignee: Cody Koeninger > > Right now you can only run a Streaming Query starting from either the > earliest or latests offsets available at the moment the query is started. > Sometimes this is a lot of data. It would be nice to be able to do the > following: > - seek to user specified offsets for manually specified topicpartitions -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-17812) More granular control of starting offsets (assign)
[ https://issues.apache.org/jira/browse/SPARK-17812?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15573786#comment-15573786 ] Michael Armbrust commented on SPARK-17812: -- Please do work on it. It might be good to update the description with a summary of this discussion so we can all be sure we are on the same page. I actually do think its fair to have one configuration for what to do in the case of data loss. This happens when you fall behind or when you come back and new partitions are there that have already aged out. Lets add this in another ticket. I know you are super deep in Kafka and other should chime in if I'm way off-base, but I think that {{startingOffsets=earliest}} and {{startingOffsets=latest}} is pretty clear what is happening. I would not change {{earliest}} and {{latest}} just to be different from kafka. We could make it query start if this is still confusing, but lets do that soon if that is the case. > More granular control of starting offsets (assign) > -- > > Key: SPARK-17812 > URL: https://issues.apache.org/jira/browse/SPARK-17812 > Project: Spark > Issue Type: Sub-task > Components: SQL >Reporter: Michael Armbrust > > Right now you can only run a Streaming Query starting from either the > earliest or latests offsets available at the moment the query is started. > Sometimes this is a lot of data. It would be nice to be able to do the > following: > - seek to user specified offsets for manually specified topicpartitions -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-17812) More granular control of starting offsets (assign)
[ https://issues.apache.org/jira/browse/SPARK-17812?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15573766#comment-15573766 ] Cody Koeninger commented on SPARK-17812: OK, failing on start is clear (it's really annoying in the case of subscribePattern), but at least it's clear. I think that's enough to get started on this ticket, is anyone currently working on it or can I do it? Ryan seemed worried that it wouldn't get done in time for the next release. It sounds like your current plan is to ignore whatever comes out of KAFKA-3370, which is fine as long as whatever you do is both clear and equally tunable. Clarity of semantics can't be the only criterion of an API, "You can only start at latest offset, period" is clear, but a crap api. {quote} the only case where we lack sufficient tunability is "Where do I go when the current offsets are invalid due to retention?". {quote} No, you lack sufficient tunability as to where newly discovered partitions start. Keep in mind that those partitions may have been discovered after a significant job downtime. If the point of an API is to provide clear semantics to the user, it is not at all clear to me as a user how I can start those partitions at latest, which I know is possible in the underlying data model. The reason I'm belaboring this point now is that you have chosen names (earliest, latest) for the API currently under discussion that are confusingly similar to the existing auto offset reset functionality, and you have provided knobs for some, but not all, of the things auto offset reset currently affects. This is going to confuse people, it already confuses me. > More granular control of starting offsets (assign) > -- > > Key: SPARK-17812 > URL: https://issues.apache.org/jira/browse/SPARK-17812 > Project: Spark > Issue Type: Sub-task > Components: SQL >Reporter: Michael Armbrust > > Right now you can only run a Streaming Query starting from either the > earliest or latests offsets available at the moment the query is started. > Sometimes this is a lot of data. It would be nice to be able to do the > following: > - seek to user specified offsets for manually specified topicpartitions -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-17812) More granular control of starting offsets (assign)
[ https://issues.apache.org/jira/browse/SPARK-17812?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15573677#comment-15573677 ] Michael Armbrust commented on SPARK-17812: -- bq. with your proposed interface, what, as a user, do you expect to happen when you specify startingOffsets for some but not all partitions? I would probably opt to fail to start the query with advice on how to fix it (i.e. "specify {{-1}} for these partitions if you don't care"). We could also have a default, but I tend to error on the side of explicit behavior. bq. Yes, auto.offset.reset is a mess. Have you read https://issues.apache.org/jira/browse/KAFKA-3370 What are you going to do when that ticket is resolved? It should allow users to answer the questions you raised in very specific ways, that your interface does not. There is clearly a lot of confusing baggage with this configuration option, specifically because it is conflating too many unrelated concerns. Furthermore, IMHO {{auto.offset.reset}} is a pretty confusing name that does not imply anything about where in the stream this query should start. "reset" implies you were set somewhere to begin with. In contrast, {{startingOffsets}} handles one case clearly: it picks the offsets that are used as a starting point for the append only table abstraction that Spark is providing. As far as I understand the discussion on the ticket you referenced, the only case where we lack sufficient tunability is "Where do I go when the current offsets are invalid due to retention?". In this case, where data has been lost and {{failOnDataLoss=false}}, we currently try to minimize the amount of data we lose by starting at the earliest offsets available. We should certainly consider making this behavior configurable as well, but that seems like a different concern than what is being discussed in this JIRA. Personally, it seems like if you are falling so far behind that you have to skip all the way ahead, something is going very wrong. However, if users request this feature, we should certainly add it. I would not, however, shoe-horn it into anything having to do with query start behavior. It seems like they have reached a similar conclusion, as they are considering adding a new configuration, {{auto.reset.offset.existing}}. bq. Is the purpose of your interface to do what you think users should be able to do, or what they need to be able to do? The purpose of an interface is to provide clear semantics to the user. > More granular control of starting offsets (assign) > -- > > Key: SPARK-17812 > URL: https://issues.apache.org/jira/browse/SPARK-17812 > Project: Spark > Issue Type: Sub-task > Components: SQL >Reporter: Michael Armbrust > > Right now you can only run a Streaming Query starting from either the > earliest or latests offsets available at the moment the query is started. > Sometimes this is a lot of data. It would be nice to be able to do the > following: > - seek to user specified offsets for manually specified topicpartitions -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-17812) More granular control of starting offsets (assign)
[ https://issues.apache.org/jira/browse/SPARK-17812?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15573563#comment-15573563 ] Cody Koeninger commented on SPARK-17812: So a short term question - with your proposed interface, what, as a user, do you expect to happen when you specify startingOffsets for some but not all partitions? A couple of medium term questions: - Yes, auto.offset.reset is a mess. Have you read https://issues.apache.org/jira/browse/KAFKA-3370 - What are you going to do when that ticket is resolved? It should allow users to answer the questions you raised in very specific ways, that your interface does not. And a really leading long term question: - Is the purpose of your interface to do what you think users should be able to do, or what they need to be able to do? > More granular control of starting offsets (assign) > -- > > Key: SPARK-17812 > URL: https://issues.apache.org/jira/browse/SPARK-17812 > Project: Spark > Issue Type: Sub-task > Components: SQL >Reporter: Michael Armbrust > > Right now you can only run a Streaming Query starting from either the > earliest or latests offsets available at the moment the query is started. > Sometimes this is a lot of data. It would be nice to be able to do the > following: > - seek to user specified offsets for manually specified topicpartitions -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-17812) More granular control of starting offsets (assign)
[ https://issues.apache.org/jira/browse/SPARK-17812?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15573526#comment-15573526 ] Michael Armbrust commented on SPARK-17812: -- As far as I understand it, {{auto.offset.reset}} is conflating a few things that make it hard for me to reason about exactly-once semantics in my query. It is answering all of the following: - Where do I start when I'm creating this {{group.id}} for the first time? - What do I do when a new partition is added to a topic I'm watching? - What do I do when the current offset is invalid because of retention? The model of structured streaming is an append only table, where we are computing the same answer incrementally as if you were running a batch query over all of the data in the table. The whole goal is to make it easy to reason about correctness and push the hard work of incremental processing and late data management into the optimizer / query planner. As a result, I think we are trying to answer a different set of questions than a distributed set of consumers that share a {{group.id}}: - Should this append only table contain all of the historical data available, or do I begin at this moment and start appending? This is what {{startingOffsets}} answers. I think we should handle {{"earliest"}} (all data), {{"latest"}} (only data that arrives after now), and a very specific point in time across partitions (probably when some other query stopped running). - When I get into a situation where data has been deleted by the retention mechanism without me seeing it, what should I do? Fail the query? Or issue a warning and compute best effort on the data available. This is what {{failOnDataLoss}} answers. In particular, I think the kafka method of configuration makes it confusing to do something like, "starting now, compute some aggregation exactly once". The documentation even points out some of the pit falls: bq. ... If this is set to largest, the consumer may lose some messages when the number of partitions, for the topics it subscribes to, changes on the broker. To prevent data loss during partition addition, set auto.offset.reset to smallest. Really what I want here is, "begin the query at largest", but "start new partitions at smallest (and in fact, tell me if I'm so late joining a new partition that I have already lost some data)". > More granular control of starting offsets (assign) > -- > > Key: SPARK-17812 > URL: https://issues.apache.org/jira/browse/SPARK-17812 > Project: Spark > Issue Type: Sub-task > Components: SQL >Reporter: Michael Armbrust > > Right now you can only run a Streaming Query starting from either the > earliest or latests offsets available at the moment the query is started. > Sometimes this is a lot of data. It would be nice to be able to do the > following: > - seek to user specified offsets for manually specified topicpartitions -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-17812) More granular control of starting offsets (assign)
[ https://issues.apache.org/jira/browse/SPARK-17812?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15573479#comment-15573479 ] Cody Koeninger commented on SPARK-17812: While some decision is better than none, can you help me understand why you don't believe me that auto.offset.reset is orthogonal to specifying specific starting positions? Or do you just not believe it's important? The reasons you guys used a different name from auto.offset.reset are that the Kafka project semantics of it are inadequate. But they will fix it, and when they do, the fact that you have conflated two unrelated things into one configuration in your api is going to cause problems. > More granular control of starting offsets (assign) > -- > > Key: SPARK-17812 > URL: https://issues.apache.org/jira/browse/SPARK-17812 > Project: Spark > Issue Type: Sub-task > Components: SQL >Reporter: Michael Armbrust > > Right now you can only run a Streaming Query starting from either the > earliest or latests offsets available at the moment the query is started. > Sometimes this is a lot of data. It would be nice to be able to do the > following: > - seek to user specified offsets for manually specified topicpartitions -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-17812) More granular control of starting offsets (assign)
[ https://issues.apache.org/jira/browse/SPARK-17812?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15573457#comment-15573457 ] Ofir Manor commented on SPARK-17812: I'm with you - I warned you it is bikeshedding... I don't have a strong opinion, just a preference, and what you suggested is way better then the commited solution, so I'll get out of the loop. Whatever [~marmbrus] and you are OK with - either way it would be a big step forward > More granular control of starting offsets (assign) > -- > > Key: SPARK-17812 > URL: https://issues.apache.org/jira/browse/SPARK-17812 > Project: Spark > Issue Type: Sub-task > Components: SQL >Reporter: Michael Armbrust > > Right now you can only run a Streaming Query starting from either the > earliest or latests offsets available at the moment the query is started. > Sometimes this is a lot of data. It would be nice to be able to do the > following: > - seek to user specified offsets for manually specified topicpartitions -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-17812) More granular control of starting offsets (assign)
[ https://issues.apache.org/jira/browse/SPARK-17812?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15573459#comment-15573459 ] Michael Armbrust commented on SPARK-17812: -- +1 to the suggested was of subscribing, and for using "assign" as a familiar name. I would probably leave it with a single option like this: {{.option("startingOffsets", "earliest" | "latest" | """{"topicFoo": {"0": 1234, "1", 4567\}\}"""}} > More granular control of starting offsets (assign) > -- > > Key: SPARK-17812 > URL: https://issues.apache.org/jira/browse/SPARK-17812 > Project: Spark > Issue Type: Sub-task > Components: SQL >Reporter: Michael Armbrust > > Right now you can only run a Streaming Query starting from either the > earliest or latests offsets available at the moment the query is started. > Sometimes this is a lot of data. It would be nice to be able to do the > following: > - seek to user specified offsets for manually specified topicpartitions -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-17812) More granular control of starting offsets (assign)
[ https://issues.apache.org/jira/browse/SPARK-17812?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15573432#comment-15573432 ] Cody Koeninger commented on SPARK-17812: If you're seriously worried that people are going to get confused, {noformat} .option("startingOffsets", """{"topicFoo": {"0": 1234, "1", 4567}}""") .option("defaultOffsets", "earliest" | "latest") {noformat} let those two at least not be mutually exclusive, and punt on the question of precedence until there's an actual startingTime or startingX ticket. > More granular control of starting offsets (assign) > -- > > Key: SPARK-17812 > URL: https://issues.apache.org/jira/browse/SPARK-17812 > Project: Spark > Issue Type: Sub-task > Components: SQL >Reporter: Michael Armbrust > > Right now you can only run a Streaming Query starting from either the > earliest or latests offsets available at the moment the query is started. > Sometimes this is a lot of data. It would be nice to be able to do the > following: > - seek to user specified offsets for manually specified topicpartitions -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-17812) More granular control of starting offsets (assign)
[ https://issues.apache.org/jira/browse/SPARK-17812?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15573395#comment-15573395 ] Cody Koeninger commented on SPARK-17812: 1. we dont have lists, we have strings. regexes and valid topic names have overlaps (dot is the obvious one). 2. Mapping directly to kafka method names means we don't have to come up with some other (weird and possibly overlapping) name when they add more ways to subscribe, we just use theirs. 3. I think this is a mess with kafka semantics for the reasons both you and I have already expressed. At any rate, I think Michael already clearly punted the "starting X" case to a different topic. 4. I think it's more than sufficiently clear as suggested, no one is going to expect that a specific offset they provided is going to be overruled by a general single default. The implementation is also crystal clear - seek to the position identified by startingTime, then seek to any specific offsets for specific partitions Yes, this is all bikeshedding, but it's bikeshedding that directly affects what people are actually able to do with the api. Needlessly restricting it for reasons that have nothing to do with safety is just going to piss users off for no reason. Just because you don't have a use case that needs it, doesn't mean you should arbitrarily prevent users from doing it. Please, just choose something and let me build it so that people can actually use the thing by the next release > More granular control of starting offsets (assign) > -- > > Key: SPARK-17812 > URL: https://issues.apache.org/jira/browse/SPARK-17812 > Project: Spark > Issue Type: Sub-task > Components: SQL >Reporter: Michael Armbrust > > Right now you can only run a Streaming Query starting from either the > earliest or latests offsets available at the moment the query is started. > Sometimes this is a lot of data. It would be nice to be able to do the > following: > - seek to user specified offsets for manually specified topicpartitions -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-17812) More granular control of starting offsets (assign)
[ https://issues.apache.org/jira/browse/SPARK-17812?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15573341#comment-15573341 ] Ofir Manor commented on SPARK-17812: Thanks Cody! great to have a concrete example. I've some comments, but its mostly bikeshedding 1. subscribe vs. subscribePattern --> personally, I would combine them both to "subscribe" - no need to burden the user with the different Kafka API nuances. It can get a list of discreet topics or a pattern. 2. It would be much clearer if "assign" was called subscribeSomething, so the user would choose one "subscribe.." and one (or more) "starting...". Not sure I have a good name though - subscribeCustom? You can even use the regular subscribe for that (and be smarter with the pattern matching) - I think it would just work, and if someone tries to be funny (combine astrerix and partitions) we could just error 3. I like startingTime... pretty neat. We could hypothetically add {{.option("startingMessages", long)}} to support Michael's "just start with a 1000 recent messages"... 4. As I said before, I'd rather have all starting* be mutual-exclusive. Yes, it blocks some edge cases, on purpose, but make the API and code way clearer (think about startingMessage interacting with startingOffsets etc). I think that it would be easier to regret and allow multiple starting* in the future (opening all sorts of esoteric combinations) than clean it up in the future if users find it confusing and not needed. Anyway, as long as it is functional I'm good with it, even if it less aesthetic. > More granular control of starting offsets (assign) > -- > > Key: SPARK-17812 > URL: https://issues.apache.org/jira/browse/SPARK-17812 > Project: Spark > Issue Type: Sub-task > Components: SQL >Reporter: Michael Armbrust > > Right now you can only run a Streaming Query starting from either the > earliest or latests offsets available at the moment the query is started. > Sometimes this is a lot of data. It would be nice to be able to do the > following: > - seek to user specified offsets for manually specified topicpartitions -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-17812) More granular control of starting offsets (assign)
[ https://issues.apache.org/jira/browse/SPARK-17812?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15573166#comment-15573166 ] Cody Koeninger commented on SPARK-17812: Here's my concrete suggestion: 3 mutually exclusive ways of subscribing: {noformat} .option("subscribe","topicFoo,topicBar") .option("subscribePattern","topic.*") .option("assign","""{"topicfoo": [0, 1],"topicbar": [0, 1]}""") {noformat} where assign can only be specified that way, no inline offsets 2 non-mutually exclusive ways of specifying starting position, explicit startingOffsets obviously take priority: {noformat} .option("startingOffsets", """{"topicFoo": {"0": 1234, "1", 4567}""") .option("startingTime", "earliest" | "latest" | long) {noformat} where long is a timestamp, work to be done on that later. Note that even kafka 0.8 has a (really crappy based on log file modification time) api for time so later pursuing timestamps startingTime doesn't necessarily exclude it > More granular control of starting offsets (assign) > -- > > Key: SPARK-17812 > URL: https://issues.apache.org/jira/browse/SPARK-17812 > Project: Spark > Issue Type: Sub-task > Components: SQL >Reporter: Michael Armbrust > > Right now you can only run a Streaming Query starting from either the > earliest or latests offsets available at the moment the query is started. > Sometimes this is a lot of data. It would be nice to be able to do the > following: > - seek to user specified offsets for manually specified topicpartitions -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-17812) More granular control of starting offsets (assign)
[ https://issues.apache.org/jira/browse/SPARK-17812?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15573109#comment-15573109 ] Ofir Manor commented on SPARK-17812: Regarding (1) - of course it is *all* data in the source, as of query start. Just the same as file system directory or a database table - I'm not sure a disclaimer that the directory or table could have had different data in the past adds anything but confusion... Anyway, the startingOffset is confusing because, it seems you want a different parameter for "assign" --> to explicitly specify starting offsets. For you use case, I would add: 5. Give me nnn messages (not last ones). We still do one of the above options (trying to go back nnn messages, somehow split between the topic-partitions involved), but not provide a more explicit guarantee like "last nnn". Generally, the distribution of messages to partitions don't have to be round-robin or uniform, it is strongly based on the key (example, could be state, could be URL etc). Anyway, I haven't seen a concrete suggestion on how to specify offsets or timestamp, so I think that would be the next step in this ticket (I suggested you could condense all to one option to avoid dependencies between options, but I don't have an elegant "stringly" suggestion) > More granular control of starting offsets (assign) > -- > > Key: SPARK-17812 > URL: https://issues.apache.org/jira/browse/SPARK-17812 > Project: Spark > Issue Type: Sub-task > Components: SQL >Reporter: Michael Armbrust > > Right now you can only run a Streaming Query starting from either the > earliest or latests offsets available at the moment the query is started. > Sometimes this is a lot of data. It would be nice to be able to do the > following: > - seek to user specified offsets for manually specified topicpartitions -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-17812) More granular control of starting offsets (assign)
[ https://issues.apache.org/jira/browse/SPARK-17812?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15573089#comment-15573089 ] Cody Koeninger commented on SPARK-17812: One other slightly ugly thing... {noformat} // starting topicpartitions, no explicit offset .option("assign", """{"topicfoo": [0, 1],"topicbar": [0, 1]}""" // do you allow specifying with explicit offsets in the same config option? // or force it all into startingOffsetForRealzYo? .option("assign", """{ "topicfoo" :{ "0": 1234, "1": 4567 }, "topicbar" : { "0": 1234, "1": 4567 }}""") {noformat} > More granular control of starting offsets (assign) > -- > > Key: SPARK-17812 > URL: https://issues.apache.org/jira/browse/SPARK-17812 > Project: Spark > Issue Type: Sub-task > Components: SQL >Reporter: Michael Armbrust > > Right now you can only run a Streaming Query starting from either the > earliest or latests offsets available at the moment the query is started. > Sometimes this is a lot of data. It would be nice to be able to do the > following: > - seek to user specified offsets for manually specified topicpartitions -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-17812) More granular control of starting offsets (assign)
[ https://issues.apache.org/jira/browse/SPARK-17812?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15572922#comment-15572922 ] Cody Koeninger commented on SPARK-17812: Sorry, I didn't see this comment until just now. X offsets back per partition is not a reasonable proxy for time when you're dealing with a stream that has multiple topics in it. Agree we should break that out, focus on defining starting offsets in this ticket. The concern with startingOffsets naming is that, because auto.offset.reset is orthogonal to specifying some offsets, you have a situation like this: .format("kafka") .option("subscribePattern", "topic.*") .option("startingOffset", "latest") .option("startingOffsetForRealzYo", """ { "topicfoo" : { "0": 1234, "1": 4567 }, "topicbar" : { "0": 1234, "1": 4567 }}""") where startingOffsetForRealzYo has a more reasonable name that conveys it is specifying starting offsets, yet is not confusingly similar to startingOffset Trying to hack it all into one json as an alternative, with a "default" topic, means you're going to have to pick a key that isn't a valid topic, or add yet another layer of indirection. It also makes it harder to make the format consistent with SPARK-17829 (which seems like a good thing to keep consistent, I agree) Obviously I think you should just change the name, but it's your show. > More granular control of starting offsets (assign) > -- > > Key: SPARK-17812 > URL: https://issues.apache.org/jira/browse/SPARK-17812 > Project: Spark > Issue Type: Sub-task > Components: SQL >Reporter: Michael Armbrust > > Right now you can only run a Streaming Query starting from either the > earliest or latests offsets available at the moment the query is started. > Sometimes this is a lot of data. It would be nice to be able to do the > following: > - seek to user specified offsets for manually specified topicpartitions -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org