Re: [DISCUSS] Time to evaluate "continuous mode" in SS?

2020-09-15 Thread Joseph Torres
It's worth noting that the push-based shuffle SPIP currently in progress
addresses a substantial blocker in the area. If you remember when we
removed the half-finished stateful query support, the lack of that
functionality and the challenge of implementing it is basically why it was
half-finished. I can't make a hard commitment, but I do plan to take a look
at how easy it would be to build continuous shuffle support on top of the
SPIP once it's in, and continuous mode is gonna be a lot more useful if
most (all?) queries can run using it.

On Tue, Sep 15, 2020 at 6:37 AM Sean Owen  wrote:

> I think we certainly can't remove it without deprecation and a few
> releases. If there were big problems with it that weren't getting
> fixed, sure maybe, but lack of interest in reviewing minor changes
> isn't necessarily a bad sign. By the same logic you'd delete graphx
> long ago.
>
> Anecdotally, yes there are people using it that I know of at least,
> but I wouldn't know a lot of them.
> I think the question is, is it causing a problem, like a lot of
> maintenance? doesn't sound like it.
>
> On Tue, Sep 15, 2020 at 8:19 AM Jungtaek Lim
>  wrote:
> >
> > Probably it would depend on the meaning of "experimental". My
> understanding of "experimental" is more likely "incubation", which may be
> graduated finally, or may be retired.
> >
> > To be clear, I'm evaluating the continuous mode as "candidate to
> retire", unless there are actual use cases in production and at least a
> couple of community members volunteer to maintain it. As far as I see the
> activity in a year, there's no interest for the continuous mode in
> community members. I can refer to at least three PRs which suffered to find
> reviewers (around 1 year) and closed on inactivity. No improvements/bug
> fixes except trivials. It doesn't seem to get some traction - few questions
> in SO, a few posts in google search results which were all posted around
> the date when continuous mode was introduced. Though I would be convinced
> if someone could provide meaningful numbers of actual use cases.
> >
> > If the answer really has to be taken between un-experimental or not
> (which says retirement is not an option), I'd rather vote to leave as
> experimental, so I just keep forgetting about it. Actually it bothers
> sometimes even if the change is done in micro-batch side (so that's not a
> zero cost to maintain), but still better than officially supporting it.
> >
> >
> > On Tue, Sep 15, 2020 at 9:08 PM Sean Owen  wrote:
> >>
> >> If you're suggesting making it un-Experimental, probably yes, as it is
> >> de facto not going to change much I expect.
> >> If you're saying remove it, probably not? I don't see that it's
> >> anywhere near deprecated, and not sure it's unmaintained - obviously
> >> tests etc still have to keep passing.
> >>
> >> On Mon, Sep 14, 2020 at 11:34 PM Jungtaek Lim
> >>  wrote:
> >> >
> >> > Hi devs,
> >> >
> >> > It was Spark 2.3 in Feb 2018 which introduced continuous mode in
> Structured Streaming as "experimental".
> >> >
> >> > Now we are here at 2.5 years after its release - I feel it would be a
> good time to evaluate the mode, whether the mode has been widely used or
> not, and the mode has been making progress, as the mode is "experimental".
> >> >
> >> > At least from the surface I don't see any active effort for
> continuous mode around the community - the last major effort was stateful
> operation which was incomplete and I removed that. There were some couples
> of bug reports as well as fixes more than a year ago and almost nothing has
> been handled. (A trivial bugfix PR has been merged recently but that's
> all.) The new features introduced to the Structured Streaming (at least
> observable metrics, SS UI) don't apply to continuous mode, and no one made
> "support continuous mode" as a hard requirement on passing review in these
> PRs.
> >> >
> >> > I have no idea how many companies are using the mode in production
> (please add the voice if someone has statistics about this) but I don't see
> any bug reports recently, and see only a few questions in SO, which makes
> me think about cost on maintenance.
> >> >
> >> > I know there's a mood to avoid discontinue support as possible, but
> it sounds weird to keep something as "unmaintained", especially it's still
> "experimental" and main authors are no more active enough to promise
> maintenance/improvement on the module. Thoughts?
> >> >
> >> > Thanks,
> >> > Jungtaek Lim (HeartSaVioR)
>
> -
> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>
>


Re: [VOTE][SPARK-30602] SPIP: Support push-based shuffle to improve shuffle efficiency

2020-09-14 Thread Joseph Torres
+1

On Mon, Sep 14, 2020 at 6:39 PM angers.zhu  wrote:

> +1
>
> angers.zhu
> angers@gmail.com
>
> 
> 签名由 网易邮箱大师  定制
>
> On 09/15/2020 08:21,Xiao Li 
> wrote:
>
> +1
>
> Xiao
>
> DB Tsai  于2020年9月14日周一 下午4:09写道:
>
>> +1
>>
>> On Mon, Sep 14, 2020 at 12:30 PM Chandni Singh  wrote:
>>
>>> +1
>>>
>>> Chandni
>>>
>>> On Mon, Sep 14, 2020 at 11:41 AM Tom Graves 
>>> wrote:
>>>
 +1

 Tom

 On Sunday, September 13, 2020, 10:00:05 PM CDT, Mridul Muralidharan <
 mri...@gmail.com> wrote:


 Hi,

 I'd like to call for a vote on SPARK-30602 - SPIP: Support push-based
 shuffle to improve shuffle efficiency.
 Please take a look at:

- SPIP jira: https://issues.apache.org/jira/browse/SPARK-30602
- SPIP doc:

 https://docs.google.com/document/d/1mYzKVZllA5Flw8AtoX7JUcXBOnNIDADWRbJ7GI6Y71Q/edit
- POC against master and results summary :

 https://docs.google.com/document/d/1Q5m7YAp0HyG_TNFL4p_bjQgzzw33ik5i49Vr86UNZgg/edit

 Active discussions on the jira and SPIP document have settled.

 I will leave the vote open until Friday (the 18th September 2020), 5pm
 CST.

 [ ] +1: Accept the proposal as an official SPIP
 [ ] +0
 [ ] -1: I don't think this is a good idea because ...


 Thanks,
 Mridul

>>>
>>
>> --
>> Sincerely,
>>
>> DB Tsai
>> --
>> Web: https://www.dbtsai.com
>> PGP Key ID: 42E5B25A8F7A82C1
>>
>


Re: comparable and orderable CalendarInterval

2020-02-11 Thread Joseph Torres
The problem is that there isn't a consistent number of seconds an interval
represents - as Wenchen mentioned, a month interval isn't a fixed number of
days. If your use case can account for that, maybe you could add the
interval to a fixed reference date and then compare the result.

On Tue, Feb 11, 2020 at 8:01 AM Enrico Minack 
wrote:

> Hi Devs,
>
> I would like to know what is the current roadmap of making
> CalendarInterval comparable and orderable again (SPARK-29679,
> SPARK-29385, #26337).
>
> With #27262, this got reverted but SPARK-30551 does not mention how to
> go forward in this matter. I have found SPARK-28494, but this seems to
> be stale.
>
> While I find it useful to compare such intervals, I cannot find a way to
> work around the missing comparability. Is there a way to get, e.g. the
> seconds that an interval represents to be able to compare intervals? In
> org.apache.spark.sql.catalyst.util.IntervalUtils there are methods like
> getEpoch or getDuration, which I cannot see are exposed to SQL or in the
> org.apache.spark.sql.functions package.
>
> Thanks for the insights,
> Enrico
>
>
> -
> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>
>


Re: Welcoming some new committers and PMC members

2019-09-09 Thread Joseph Torres
congratulations!

On Mon, Sep 9, 2019 at 6:27 PM 王 斐  wrote:

> congratulations!
>
> 获取 Outlook for iOS 
>
> --
> *发件人:* Ye Xianjin 
> *发送时间:* 星期二, 九月 10, 2019 09:26
> *收件人:* Jeff Zhang
> *抄送:* Saisai Shao; dev
> *主题:* Re: Welcoming some new committers and PMC members
>
> Congratulations!
>
> Sent from my iPhone
>
> On Sep 10, 2019, at 9:19 AM, Jeff Zhang  wrote:
>
> Congratulations!
>
> Saisai Shao  于2019年9月10日周二 上午9:16写道:
>
>> Congratulations!
>>
>> Jungtaek Lim  于2019年9月9日周一 下午6:11写道:
>>
>>> Congratulations! Well deserved!
>>>
>>> On Tue, Sep 10, 2019 at 9:51 AM John Zhuge  wrote:
>>>
 Congratulations!

 On Mon, Sep 9, 2019 at 5:45 PM Shane Knapp  wrote:

> congrats everyone!  :)
>
> On Mon, Sep 9, 2019 at 5:32 PM Matei Zaharia 
> wrote:
> >
> > Hi all,
> >
> > The Spark PMC recently voted to add several new committers and one
> PMC member. Join me in welcoming them to their new roles!
> >
> > New PMC member: Dongjoon Hyun
> >
> > New committers: Ryan Blue, Liang-Chi Hsieh, Gengliang Wang, Yuming
> Wang, Weichen Xu, Ruifeng Zheng
> >
> > The new committers cover lots of important areas including ML, SQL,
> and data sources, so it’s great to have them here. All the best,
> >
> > Matei and the Spark PMC
> >
> >
> > -
> > To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
> >
>
>
> --
> Shane Knapp
> UC Berkeley EECS Research / RISELab Staff Technical Lead
> https://rise.cs.berkeley.edu
>
> -
> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>
>

 --
 John Zhuge

>>>
>>>
>>> --
>>> Name : Jungtaek Lim
>>> Blog : http://medium.com/@heartsavior
>>> Twitter : http://twitter.com/heartsavior
>>> LinkedIn : http://www.linkedin.com/in/heartsavior
>>>
>>
>
> --
> Best Regards
>
> Jeff Zhang
>
>


Re: Partitions at DataSource API V2

2019-03-13 Thread Joseph Torres
The reader necessarily knows the number of partitions, since it's
responsible for generating its output partitions in the first place. I
won't speak for everyone, but it would make sense to me to pass in a
Partitioning instance to the writer, since it's already part of the v2
interface through the reader's SupportsReportPartitioning.

I don't think we can expose execution plans to the data source v2
interface; the exact Java structure of execution plans isn't stable across
even maintenance releases. Even if we could, I don't really see what the
use case would be - what information does the writer need that can't be
made available through either the input data or the input partitioning?
(The built-in Kafka sink, for example, handles metadata such as topic
switching by just accepting topic name as a column along with the data.)

On Wed, Mar 13, 2019 at 1:39 AM JOAQUIN GUANTER GONZALBEZ <
joaquin.guantergonzal...@telefonica.com> wrote:

> I'd like to bump this. I agree with Carlos that there is very little
> information at the DataSoruceWrite/DataSourceReader level. To me, ideally,
> the DataSourceWriter/Reader should have as much information as possible.
> Not only the number of partitions, but also ideally the whole execution
> plan.
>
> This would not only enable things like automatic creation of kafka topics
> with the correct number of partitions (like Carlos mentioned), but it would
> also allow advanced DataSources that, for example, analyze the execution
> plan to choose the correct parameters to implement differential privacy.
>
> CC'ing in Ryan, since he is leading the DataSourceV2 workgroup (sorry I
> can't joint the sync meetings, but I'm in CET time and the time logictics
> of that meeting don't work for Europe).
>
> Ryan, do you think it would be a good idea to provide extra information at
> the DataSourceWriter/Reader level to enable more advanced datasources?
> Would a PR contribution with these changed be a welcome addition?
>
> Thanks,
> Ximo
>
> -Mensaje original-
> De: CARLOS DEL PRADO MOTA 
> Enviado el: jueves, 7 de marzo de 2019 10:19
> Para: dev@spark.apache.org
> Asunto: Partitions at DataSource API V2
>
> Hello, I’m Carlos del Prado, developer at Telefonica.
>
> We are working with Spark's DataSource API V2 building a custom Kafka
> connector that creates the topic upon write. In order to do that, we need
> to know the number of partitions before writing data in each partition, at
> the DataSourceWriter level.
>
> Is there any way for us do that?
>
> King regards,
> Carlos.
>
> 
>
> Este mensaje y sus adjuntos se dirigen exclusivamente a su destinatario,
> puede contener información privilegiada o confidencial y es para uso
> exclusivo de la persona o entidad de destino. Si no es usted. el
> destinatario indicado, queda notificado de que la lectura, utilización,
> divulgación y/o copia sin autorización puede estar prohibida en virtud de
> la legislación vigente. Si ha recibido este mensaje por error, le rogamos
> que nos lo comunique inmediatamente por esta misma vía y proceda a su
> destrucción.
>
> The information contained in this transmission is privileged and
> confidential information intended only for the use of the individual or
> entity named above. If the reader of this message is not the intended
> recipient, you are hereby notified that any dissemination, distribution or
> copying of this communication is strictly prohibited. If you have received
> this transmission in error, do not read it. Please immediately reply to the
> sender that you have received this communication in error and then delete
> it.
>
> Esta mensagem e seus anexos se dirigem exclusivamente ao seu destinatário,
> pode conter informação privilegiada ou confidencial e é para uso exclusivo
> da pessoa ou entidade de destino. Se não é vossa senhoria o destinatário
> indicado, fica notificado de que a leitura, utilização, divulgação e/ou
> cópia sem autorização pode estar proibida em virtude da legislação vigente.
> Se recebeu esta mensagem por erro, rogamos-lhe que nos o comunique
> imediatamente por esta mesma via e proceda a sua destruição
>


Re: [VOTE] Functional DataSourceV2 in Spark 3.0

2019-02-28 Thread Joseph Torres
I'm not worried about rushing. I worry that, without clear parameters for
the amount or types of DSv2 delays that are acceptable, we might end up
holding back 3.0 indefinitely to meet the deadline when we wouldn't have
made that decision de novo. (Or even worse, the PMC eventually feels they
must release 3.0 anyway, and then we're in the same position but everyone's
angry and frustrated.)

I do recognize that I'm particularly allergic to this risk, which is why
I'm not giving a -1 here - one of the first projects I worked on in my
career was delayed for over a year because an incomplete feature was bound
to its release date. But I don't agree that "might not resolve anything" is
the worst possible outcome here.

On Thu, Feb 28, 2019 at 11:48 AM Sean Owen  wrote:

> This is a fine thing to VOTE on. Committers (and community,
> non-binding) can VOTE on what we like; we just don't do it often where
> not required because it's a) overkill overhead over simple lazy
> consensus, and b) it can be hard to say what the binding VOTE binds if
> it's not a discrete commit or release. This is a big enough deal that
> it's not overkill. The question is, what does it bind?
>
> It means the release is definitely blocked until the items here are
> done, but, what's 'done'? It will return to the same questions already
> on the table, like do we need to define just APIs, and to what degree
> of stability. At worst it might not resolve anything.
>
> I don't see much harm in nailing down what appears to be agreement at
> the level of specific goals, even if this isn't a vote on a release
> date or specific commit. I think it's clear these items must be
> resolved to the level of semi-stable API by 3.0, as it's coming soon
> and this is the right time to establish these APIs. It might provide
> necessary clarity and constraints to get it over the line.
>
> To Mark -- yeah, this is asserting that DSv2 is a primary or necessary
> goal of the release, just like a "Blocker" does. Why would this
> argument be different or better if it waited until 3.0 was imminent? I
> get that one might say, well, we ended up working on more important
> stuff in the meantime and now we don't have time. But this VOTE's
> purpose is to declare that this is the important stuff now.
>
> To Jose -- what's the "just a few PRs in review" issue? you worry that
> we might rush DSv2 at the end to meet a deadline? all the better to,
> if anything, agree it's important now. It's also an agreement to delay
> the release for it, not rush it. I don't see that later is a better
> time to make the decision, if rush is a worry?
>
> Given my definition, and understanding of the issues, I'd say +1
>
> On Thu, Feb 28, 2019 at 12:24 PM Ryan Blue 
> wrote:
> >
> > Mark, I disagree. Setting common goals is a critical part of getting
> things done.
> >
> > This doesn't commit the community to push out the release if the goals
> aren't met, but does mean that we will, as a community, seriously consider
> it. This is also an acknowledgement that this is the most important feature
> in the next release (whether major or minor) for many of us. This has been
> in limbo for a very long time, so I think it is important for the community
> to commit to getting it to a functional state.
> >
> > It sounds like your objection is to this commitment for 3.0, but
> remember that 3.0 is the next release so that we can remove deprecated
> APIs. It does not mean that we aren't adding new features in that release
> and aren't considering other goals.
> >
> > On Thu, Feb 28, 2019 at 10:12 AM Mark Hamstra 
> wrote:
> >>
> >> Then I'm -1. Setting new features as blockers of major releases is not
> proper project management, IMO.
> >>
> >> On Thu, Feb 28, 2019 at 10:06 AM Ryan Blue  wrote:
> >>>
> >>> Mark, if this goal is adopted, "we" is the Apache Spark community.
> >>>
> >>> On Thu, Feb 28, 2019 at 9:52 AM Mark Hamstra 
> wrote:
> 
>  Who is "we" in these statements, such as "we should consider a
> functional DSv2 implementation a blocker for Spark 3.0"? If it means those
> contributing to the DSv2 effort want to set their own goals, milestones,
> etc., then that is fine with me. If you mean that the Apache Spark project
> should officially commit to the lack of a functional DSv2 implementation
> being a blocker for the release of Spark 3.0, then I'm -1. A major release
> is just not about adding new features. Rather, it is about making changes
> to the existing public API. As such, I'm opposed to any new feature or any
> API addition being considered a blocker of the 3.0.0 release.
> 
> 
>  On Thu, Feb 28, 2019 at 9:09 AM Matt Cheah 
> wrote:
> >
> > +1 (non-binding)
> >
> >
> >
> > Are identifiers and namespaces going to be rolled under one of those
> six points?
> >
> >
> >
> > From: Ryan Blue 
> > Reply-To: "rb...@netflix.com" 
> > Date: Thursday, February 28, 2019 at 8:39 AM
> > To: Spark Dev List 
> > 

Re: DSv2 question

2019-01-24 Thread Joseph Torres
I wouldn't be opposed to also documenting that we canonicalize the keys as
lowercase, but the case-insensitivity is I think the primary property. It's
important to call out that data source developers don't have to worry about
a semantic difference between option("mykey", "value") and option("myKey",
"value").

On Thu, Jan 24, 2019 at 9:58 AM Gabor Somogyi 
wrote:

> Hi All,
>
> Given org.apache.spark.sql.sources.v2.DataSourceOptions which states the
> following:
>
> * An immutable string-to-string map in which keys are case-insensitive. This 
> is used to represent
> * data source options.
>
> Case-insensitivity can be reached many ways.The implementation provides
> lowercase solution.
>
> I've seen code parts which take advantage of this implementation detail.
> My questions are:
>
> 1. As the class only states case-insensitive is the lowercase a subject to
> change?
> 2. If it's not subject to change wouldn't it be better to change
> case-insensitive to lowercase or something?
>
> I've seen similar pattern on interfaces...
>
> Thanks in advance!
>
> BR,
> G
>
>


Re: [Proposal] New feature: reconfigurable number of partitions on stateful operators in Structured Streaming

2018-08-03 Thread Joseph Torres
I'd agree it might make sense to bundle this into an API. We'd have to
think about whether it's a common enough use case to justify the API
complexity.

It might be worth exploring decoupling state and partitions, but I wouldn't
want to start making decisions based on it without a clearer design
picture. I would expect the decoupling to make it very difficult to ensure
idempotency of state updates.

Jose

On Fri, Aug 3, 2018 at 10:55 AM, Arun Mahadevan  wrote:

> coalesce might work.
>
> Say "spark.sql.shuffle.partitions" = 200, and then "
> input.readStream.map.filter.groupByKey(..).coalesce(2)" would still
> create 200 instances for state but execute just 2 tasks.
>
> However I think further groupByKey operations downstream would need
> similar coalesce.
>
> And this is assuming the user sets the right shuffle partitions upfront.
>
> It maybe worth to bundle this pattern as some builtin api so that it can
> be transparent to the user. I am not sure how were you planning to expose
> the state key groups at api level and if it would be transparent.
>
> IMO, decoupling the state and partitions and making it key based would
> still be worth exploring to support dynamic state rebalancing. May be the
> default HDFS based implementation can maintain the state partition wise and
> not support it, but there could be implementations based on distributed k-v
> store which supports this.
>
> Thanks,
> Arun
>
>
> On 3 August 2018 at 08:21, Joseph Torres 
> wrote:
>
>> A coalesced RDD will definitely maintain any within-partition invariants
>> that the original RDD maintained. It pretty much just runs its input
>> partitions sequentially.
>>
>> There'd still be some Dataframe API work needed to get the coalesce
>> operation where you want it to be, but this is much simpler than
>> introducing a new concept of state key groups. As far as I can tell,
>> state key groups are just the same thing that we currently call partitions
>> of the aggregate RDD.
>>
>> On Fri, Aug 3, 2018 at 8:01 AM, Jungtaek Lim  wrote:
>>
>>> I’m afraid I don’t know about the details on coalesce(), but some
>>> finding resource for coalesce, it looks like helping reducing actual
>>> partitions.
>>>
>>> For streaming aggregation, state for all partitions (by default, 200)
>>> must be initialized and committed even it is being unchanged. Otherwise
>>> error occurred when reading a partition which is excluded in query
>>> previously. Moreover, it can’t find existing row from state or store row in
>>> wrong partition if partition id doesn’t match the expected id via hashing
>>> function.
>>>
>>> Could you verify coalesce() meets such requirements?
>>>
>>> On Fri, 3 Aug 2018 at 22:23 Joseph Torres 
>>> wrote:
>>>
>>>> Scheduling multiple partitions in the same task is basically what
>>>> coalesce() does. Is there a reason that doesn't work here?
>>>>
>>>> On Fri, Aug 3, 2018 at 5:55 AM, Jungtaek Lim  wrote:
>>>>
>>>>> Here's a link for Google docs (anyone can comment):
>>>>> https://docs.google.com/document/d/1DEOW3WQcPUq0YFgazkZx6Ei6
>>>>> EOdj_3pXEsyq4LGpyNs/edit?usp=sharing
>>>>>
>>>>> Please note that I just copied the content to the google docs, so
>>>>> someone could point out lack of details. I would like to start with
>>>>> explanation of the concept, and once we are in agreement on going forward,
>>>>> I could add more detail in doc, or even just start working and detail can
>>>>> be shared with POC code or even WIP patch.
>>>>>
>>>>> Answer inlined for Arun's comments:
>>>>>
>>>>> 2018년 8월 3일 (금) 오후 5:39, Arun Mahadevan 님이 작성:
>>>>>
>>>>>> Can you share this in a google doc to make the discussions easier.?
>>>>>>
>>>>>>
>>>>>>
>>>>>> Thanks for coming up with ideas to improve upon the current
>>>>>> restrictions with the SS state store.
>>>>>>
>>>>>>
>>>>>>
>>>>>> If I understood correctly, the plan is to introduce a logical
>>>>>> partitioning scheme for state storage (based on keys) independent of
>>>>>> spark’s partitioning so that the number of spark partitions can be 
>>>>>> varied.
>>>>>>
>>>>>>
>>>>>>
>>>>>> my 2 cents,
>>>>>>
>

Re: [Proposal] New feature: reconfigurable number of partitions on stateful operators in Structured Streaming

2018-08-03 Thread Joseph Torres
A coalesced RDD will definitely maintain any within-partition invariants
that the original RDD maintained. It pretty much just runs its input
partitions sequentially.

There'd still be some Dataframe API work needed to get the coalesce
operation where you want it to be, but this is much simpler than
introducing a new concept of state key groups. As far as I can tell, state
key groups are just the same thing that we currently call partitions of the
aggregate RDD.

On Fri, Aug 3, 2018 at 8:01 AM, Jungtaek Lim  wrote:

> I’m afraid I don’t know about the details on coalesce(), but some finding
> resource for coalesce, it looks like helping reducing actual partitions.
>
> For streaming aggregation, state for all partitions (by default, 200) must
> be initialized and committed even it is being unchanged. Otherwise error
> occurred when reading a partition which is excluded in query previously.
> Moreover, it can’t find existing row from state or store row in wrong
> partition if partition id doesn’t match the expected id via hashing
> function.
>
> Could you verify coalesce() meets such requirements?
>
> On Fri, 3 Aug 2018 at 22:23 Joseph Torres 
> wrote:
>
>> Scheduling multiple partitions in the same task is basically what
>> coalesce() does. Is there a reason that doesn't work here?
>>
>> On Fri, Aug 3, 2018 at 5:55 AM, Jungtaek Lim  wrote:
>>
>>> Here's a link for Google docs (anyone can comment):
>>> https://docs.google.com/document/d/1DEOW3WQcPUq0YFgazkZx6Ei6EOdj_
>>> 3pXEsyq4LGpyNs/edit?usp=sharing
>>>
>>> Please note that I just copied the content to the google docs, so
>>> someone could point out lack of details. I would like to start with
>>> explanation of the concept, and once we are in agreement on going forward,
>>> I could add more detail in doc, or even just start working and detail can
>>> be shared with POC code or even WIP patch.
>>>
>>> Answer inlined for Arun's comments:
>>>
>>> 2018년 8월 3일 (금) 오후 5:39, Arun Mahadevan 님이 작성:
>>>
>>>> Can you share this in a google doc to make the discussions easier.?
>>>>
>>>>
>>>>
>>>> Thanks for coming up with ideas to improve upon the current
>>>> restrictions with the SS state store.
>>>>
>>>>
>>>>
>>>> If I understood correctly, the plan is to introduce a logical
>>>> partitioning scheme for state storage (based on keys) independent of
>>>> spark’s partitioning so that the number of spark partitions can be varied.
>>>>
>>>>
>>>>
>>>> my 2 cents,
>>>>
>>>>
>>>>
>>>>1. The Partitioning is already a kind of a logical entity in Spark.
>>>>Maybe this can be leveraged to over-partition in advance (similar to
>>>>setting the number of state key groups in your proposal) but make it 
>>>> easy
>>>>to run more than one task (partition) per core (I am not sure how easy 
>>>> this
>>>>is currently). Then we can continue to leverage the existing state
>>>>implementation. This has similar limitations like what you pointed out 
>>>> (the
>>>>max number of partitions has to be fixed upfront). But once the over
>>>>provisioning of partitions is made easy it could be leveraged even for
>>>>non-stateful operations.
>>>>
>>>>
>>> If we could allow assigning multiple partitions in a task (say,
>>> parallelism or maximum concurrency), maybe we could achieve it a bit
>>> easier. I'm not pretty familiar with core of Spark, so I can't imagine how
>>> we could do it. In addition, partitions for downstream operators will be
>>> affected unless we don't shuffle afterwards.
>>>
>>>
>>>>1. Decouple the state from partition completely associate it only
>>>>with the keys. This would be the most flexible option and we can scale 
>>>> the
>>>>partitions up/down as we wish. This needs a scalable distributed state
>>>>store implementation supporting fast look ups /storage by key.
>>>>
>>>>
>>> It can be achievable with couple of external storages like Redis or
>>> HBase or so, but I would avoid the step which requires end users to
>>> maintain other system as well. Spark is coupled with specific version of
>>> Hadoop, so we could expect that end users could run and maintain HDFS.
>>>
>>>
>>>> Thanks,
>>>>
>>>

Re: [Proposal] New feature: reconfigurable number of partitions on stateful operators in Structured Streaming

2018-08-03 Thread Joseph Torres
Scheduling multiple partitions in the same task is basically what
coalesce() does. Is there a reason that doesn't work here?

On Fri, Aug 3, 2018 at 5:55 AM, Jungtaek Lim  wrote:

> Here's a link for Google docs (anyone can comment):
> https://docs.google.com/document/d/1DEOW3WQcPUq0YFgazkZx6Ei6EOdj_
> 3pXEsyq4LGpyNs/edit?usp=sharing
>
> Please note that I just copied the content to the google docs, so someone
> could point out lack of details. I would like to start with explanation of
> the concept, and once we are in agreement on going forward, I could add
> more detail in doc, or even just start working and detail can be shared
> with POC code or even WIP patch.
>
> Answer inlined for Arun's comments:
>
> 2018년 8월 3일 (금) 오후 5:39, Arun Mahadevan 님이 작성:
>
>> Can you share this in a google doc to make the discussions easier.?
>>
>>
>>
>> Thanks for coming up with ideas to improve upon the current restrictions
>> with the SS state store.
>>
>>
>>
>> If I understood correctly, the plan is to introduce a logical
>> partitioning scheme for state storage (based on keys) independent of
>> spark’s partitioning so that the number of spark partitions can be varied.
>>
>>
>>
>> my 2 cents,
>>
>>
>>
>>1. The Partitioning is already a kind of a logical entity in Spark.
>>Maybe this can be leveraged to over-partition in advance (similar to
>>setting the number of state key groups in your proposal) but make it easy
>>to run more than one task (partition) per core (I am not sure how easy 
>> this
>>is currently). Then we can continue to leverage the existing state
>>implementation. This has similar limitations like what you pointed out 
>> (the
>>max number of partitions has to be fixed upfront). But once the over
>>provisioning of partitions is made easy it could be leveraged even for
>>non-stateful operations.
>>
>>
> If we could allow assigning multiple partitions in a task (say,
> parallelism or maximum concurrency), maybe we could achieve it a bit
> easier. I'm not pretty familiar with core of Spark, so I can't imagine how
> we could do it. In addition, partitions for downstream operators will be
> affected unless we don't shuffle afterwards.
>
>
>>1. Decouple the state from partition completely associate it only
>>with the keys. This would be the most flexible option and we can scale the
>>partitions up/down as we wish. This needs a scalable distributed state
>>store implementation supporting fast look ups /storage by key.
>>
>>
> It can be achievable with couple of external storages like Redis or HBase
> or so, but I would avoid the step which requires end users to maintain
> other system as well. Spark is coupled with specific version of Hadoop, so
> we could expect that end users could run and maintain HDFS.
>
>
>> Thanks,
>>
>> Arun
>>
>>
>> On 2 August 2018 at 23:45, Jungtaek Lim  wrote:
>>
>>> Hi Spark devs,
>>>
>>> I have a new feature to propose and hear opinions on community. Not sure
>>> it is such a big change to worth to step on SPIP, so posting to dev mailing
>>> list  instead.
>>>
>>> > Feature
>>>
>>> Reconfigurable number of partitions on state operators in Structured
>>> Streaming
>>>
>>> > Rationalization
>>>
>>> Nowadays, state in structured streaming is stored individually via
>>> partition given such configuration "spark.sql.shuffle.partitions" and
>>> cannot modify the configuration after the query is run once. One
>>> contributor already submitted a patch [1] without knowing why such
>>> restriction came into play.
>>>
>>> Such restriction for state is necessary because state is distributed by
>>> hash function applied to key columns, but as a side-effect of restriction,
>>> we can't change partitions of stateful operators. End users would have
>>> various workloads and also various SLA (and SLA can be changed), so
>>> restricting to specific count of partitions would not satisfy their needs.
>>> Moreover, end users are not easy to indicate the configuration before they
>>> run query, and realize they can't modify it when they try to modify it.
>>>
>>> > Proposal
>>>
>>> The feature proposes decoupling data partitions and operator partitions
>>> via introducing key groups to state, enabling scalability of operator
>>> partitions while state data partitions remain same (so no issue on state
>>> data). This approach is inspired by how Flink supports scalability with
>>> partitioned state.
>>>
>>> The concept itself is simple, while we apply such partitioning
>>> expression to the key columns (simplified):
>>>
>>> hash(key columns) % number of state operator partitions
>>>
>>> it will apply below partitioning expression so that it can be
>>> distributed via state data partitions but each state operator partition
>>> could handle multiple state data partitions.
>>>
>>> (hash(key columns) % number of state key groups) % number of state
>>> operator partitions
>>>
>>> The state data will not still be scalable actually, so the number of
>>> state 

Re: code freeze and branch cut for Apache Spark 2.4

2018-07-31 Thread Joseph Torres
Full continuous processing aggregation support ran into unanticipated
scalability and scheduling problems. We’re planning to overcome those by
using some of the barrier execution machinery, but since barrier execution
itself is still in progress the full support isn’t going to make it into
2.4.

Jose

On Tue, Jul 31, 2018 at 6:07 AM Tomasz Gawęda 
wrote:

> Hi,
>
> what is the status of Continuous Processing + Aggregations? As far as I
> remember, Jose Torres said it should  be easy to perform aggregations if
> coalesce(1) work. IIRC it's already merged to master.
>
> Is this work in progress? If yes, it would be great to have full
> aggregation/join support in Spark 2.4 in CP.
>
> Pozdrawiam / Best regards,
>
> Tomek
>
>
> On 2018-07-31 10:43, Petar Zečević wrote:
> > This one is important to us:
> https://issues.apache.org/jira/browse/SPARK-24020 (Sort-merge join inner
> range optimization) but I think it could be useful to others too.
> >
> > It is finished and is ready to be merged (was ready a month ago at
> least).
> >
> > Do you think you could consider including it in 2.4?
> >
> > Petar
> >
> >
> > Wenchen Fan @ 1970-01-01 01:00 CET:
> >
> >> I went through the open JIRA tickets and here is a list that we should
> consider for Spark 2.4:
> >>
> >> High Priority:
> >> SPARK-24374: Support Barrier Execution Mode in Apache Spark
> >> This one is critical to the Spark ecosystem for deep learning. It only
> has a few remaining works and I think we should have it in Spark 2.4.
> >>
> >> Middle Priority:
> >> SPARK-23899: Built-in SQL Function Improvement
> >> We've already added a lot of built-in functions in this release, but
> there are a few useful higher-order functions in progress, like
> `array_except`, `transform`, etc. It would be great if we can get them in
> Spark 2.4.
> >>
> >> SPARK-14220: Build and test Spark against Scala 2.12
> >> Very close to finishing, great to have it in Spark 2.4.
> >>
> >> SPARK-4502: Spark SQL reads unnecessary nested fields from Parquet
> >> This one is there for years (thanks for your patience Michael!), and is
> also close to finishing. Great to have it in 2.4.
> >>
> >> SPARK-24882: data source v2 API improvement
> >> This is to improve the data source v2 API based on what we learned
> during this release. From the migration of existing sources and design of
> new features, we found some problems in the API and want to address them. I
> believe this should be
> >> the last significant API change to data source v2, so great to have in
> Spark 2.4. I'll send a discuss email about it later.
> >>
> >> SPARK-24252: Add catalog support in Data Source V2
> >> This is a very important feature for data source v2, and is currently
> being discussed in the dev list.
> >>
> >> SPARK-24768: Have a built-in AVRO data source implementation
> >> Most of it is done, but date/timestamp support is still missing. Great
> to have in 2.4.
> >>
> >> SPARK-23243: Shuffle+Repartition on an RDD could lead to incorrect
> answers
> >> This is a long-standing correctness bug, great to have in 2.4.
> >>
> >> There are some other important features like the adaptive execution,
> streaming SQL, etc., not in the list, since I think we are not able to
> finish them before 2.4.
> >>
> >> Feel free to add more things if you think they are important to Spark
> 2.4 by replying to this email.
> >>
> >> Thanks,
> >> Wenchen
> >>
> >> On Mon, Jul 30, 2018 at 11:00 PM Sean Owen  wrote:
> >>
> >>   In theory releases happen on a time-based cadence, so it's pretty
> much wrap up what's ready by the code freeze and ship it. In practice, the
> cadence slips frequently, and it's very much a negotiation about what
> features should push the
> >>   code freeze out a few weeks every time. So, kind of a hybrid approach
> here that works OK.
> >>
> >>   Certainly speak up if you think there's something that really needs
> to get into 2.4. This is that discuss thread.
> >>
> >>   (BTW I updated the page you mention just yesterday, to reflect the
> plan suggested in this thread.)
> >>
> >>   On Mon, Jul 30, 2018 at 9:51 AM Tom Graves
>  wrote:
> >>
> >>   Shouldn't this be a discuss thread?
> >>
> >>   I'm also happy to see more release managers and agree the time is
> getting close, but we should see what features are in progress and see how
> close things are and propose a date based on that.  Cutting a branch to
> soon just creates
> >>   more work for committers to push to more branches.
> >>
> >>http://spark.apache.org/versioning-policy.html mentioned the code
> freeze and release branch cut mid-august.
> >>
> >>   Tom
> >
> > -
> > To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
> >
>
>


Re: JDBC Data Source and customSchema option but DataFrameReader.assertNoSpecifiedSchema?

2018-07-16 Thread Joseph Torres
I guess the question is partly about the semantics of
DataFrameReader.schema. If it's supposed to mean "the loaded dataframe will
definitely have exactly this schema", that doesn't quite match the behavior
of the customSchema option. If it's only meant to be an arbitrary schema
input which the source can interpret however it wants, it'd be fine.

The second semantic is IMO more useful, so I'm in favor here.

On Mon, Jul 16, 2018 at 3:43 AM, Jacek Laskowski  wrote:

> Hi,
>
> I think there is a sort of inconsistency in how DataFrameReader.jdbc deals
> with a user-defined schema as it makes sure that there's no user-specified
> schema [1][2] yet allows for setting one using customSchema option [3]. Why
> is so? Has this been merely overlooked or similar?
>
> I think assertNoSpecifiedSchema should be removed from
> DataFrameReader.jdbc and support for DataFrameReader.schema for jdbc should
> be added (with the customSchema option marked as deprecated to be removed
> in 2.4 or 3.0).
>
> Should I file an issue in Spark JIRA and do the changes? WDYT?
>
> [1] https://github.com/apache/spark/blob/v2.3.1/sql/core/
> src/main/scala/org/apache/spark/sql/DataFrameReader.
> scala?utf8=%E2%9C%93#L249
> [2] https://github.com/apache/spark/blob/v2.3.1/sql/core/
> src/main/scala/org/apache/spark/sql/DataFrameReader.
> scala?utf8=%E2%9C%93#L320
> [3] https://github.com/apache/spark/blob/v2.3.1/sql/core/
> src/main/scala/org/apache/spark/sql/execution/
> datasources/jdbc/JDBCOptions.scala#L167
>
> Pozdrawiam,
> Jacek Laskowski
> 
> https://about.me/JacekLaskowski
> Mastering Spark SQL https://bit.ly/mastering-spark-sql
> Spark Structured Streaming https://bit.ly/spark-structured-streaming
> Mastering Kafka Streams https://bit.ly/mastering-kafka-streams
> Follow me at https://twitter.com/jaceklaskowski
>


Re: TextSocketMicroBatchReader no longer supports nc utility

2018-06-04 Thread Joseph Torres
I tend to agree that this is a bug. It's kinda silly that nc does this, but
a socket connector that doesn't work with netcat will surely seem broken to
users. It wouldn't be a huge change to defer opening the socket until a
read is actually required.

On Sun, Jun 3, 2018 at 9:55 PM, Jungtaek Lim  wrote:

> Hi devs,
>
> Not sure I can hear back the response sooner since Spark summit is just
> around the corner, but just would want to post and wait.
>
> While playing with Spark 2.4.0-SNAPSHOT, I found nc command exits before
> reading actual data so the query also exits with error.
>
> The reason is due to launching temporary reader for reading schema, and
> closing reader, and re-opening reader. While reliable socket server should
> be able to handle this without any issue, nc command normally can't handle
> multiple connections and simply exits when closing temporary reader.
>
> I would like to file an issue and contribute on fixing this if we think
> this is a bug (otherwise we need to replace nc utility with another one,
> maybe our own implementation?), but not sure we are happy to apply
> workaround for specific source.
>
> Would like to hear opinions before giving a shot.
>
> Thanks,
> Jungtaek Lim (HeartSaVioR)
>


Design proposal for streaming APIs in data source V2

2018-05-24 Thread Joseph Torres
Hi all,

https://docs.google.com/document/d/1VzxEuvpLfuHKL6vJO9qJ6ug0x9J_gLoLSH_vJL3-Cho

I've finished a full design proposal for streaming APIs in data source V2,
following up on my earlier doc with just the writer. Please take a look.

(Note that slightly different versions of the APIs already exist as
prototypes - this proposal is intended to override them, now that we have a
better handle on what's required.)


Jose


Design for continuous processing shuffle

2018-05-04 Thread Joseph Torres
Hi all,

A few of us have been working on a design for how to do shuffling in
continuous processing. Feel free to chip in if you have any comments or
questions.

doc:
https://docs.google.com/document/d/1IL4kJoKrZWeyIhklKUJqsW-yEN7V7aL05MmM65AYOfE

continuous processing SPIP:
https://issues.apache.org/jira/browse/SPARK-20928


Jose


Re: [Structured streaming, V2] commit on ContinuousReader

2018-05-03 Thread Joseph Torres
In the master branch, we currently call this method in
ContinuousExecution.commit().

Note that the ContinuousReader API is experimental and undergoing active
design work. We will definitely include some kind of functionality to
back-commit data once it's been processed, but the handle we eventually
stabilize won't necessarily be `*commit(end: Offset)`.*

On Thu, May 3, 2018 at 10:43 AM, Jiří Syrový  wrote:

> Version: 2.3, DataSourceV2, ContinuousReader
>
> Hi,
>
> We're creating a new data source to fetch data from streaming source that
> requires commiting received data and we would like to commit data once in a
> while after it has been retrieved and correctly processed and then fetch
> more.
>
> One option could be to rely on spark committing already read data using 
> *commit(end:
> Offset)* that is present in *ContinuousReader (v2.reader.streaming)*, but
> it seems that this method is never called.
>
> The question is if this method *commit(end: Offset) is ever* used and
> when? I went through part of Spark code base, but haven't really found any
> place where it could be called.
>
> Thanks,
> Jiri
>
>


Re: Datasource API V2 and checkpointing

2018-05-01 Thread Joseph Torres
I agree that Spark should fully handle state serialization and recovery for
most sources. This is how it works in V1, and we definitely wouldn't want
or need to change that in V2.* The question is just whether we should have
an escape hatch for the sources that don't want Spark to do that, and if so
what the escape hatch should look like.

I don't think a watermark checkpoint would work, because there's no
guarantee (especially considering the "maxFilesPerTrigger" option) that all
files with the same timestamp will be in the same batch. But in general,
hanging the fundamental mechanics of how file sources take checkpoints
seems like it would impose a serious risk of performance regressions, which
I don't think are a desirable risk when performing an API migration that's
going to swap out users' queries from under them. I would be very
uncomfortable merging a V2 file source which we can't confidently assert
has the same performance characteristics as the existing one.


* Technically, most current sources do write their initial offset to the
checkpoint directory, but this is just a workaround to the fact that the V1
API has no handle to give Spark the initial offset. So if you e.g. start a
Kafka stream from latest offsets, and it fails in the first batch, Spark
won't know to restart the stream from the initial offset which was
originally generated. That's easily fixable in V2, and then no source will
have to even look at the checkpoint directory if it doesn't want to.

On Tue, May 1, 2018 at 10:26 AM, Ryan Blue <rb...@netflix.com> wrote:

> I think there's a difference. You're right that we wanted to clean up the
> API in V2 to avoid file sources using side channels. But there's a big
> difference between adding, for example, a way to report partitioning and
> designing for sources that need unbounded state. It's a judgment call, but
> I think unbounded state is definitely not something that we should design
> around. Another way to think about it: yes, we want to design a better API
> using existing sources as guides, but we don't need to assume that
> everything those sources do should to be supported. It is reasonable to say
> that this is a case we don't want to design for and the source needs to
> change. Why can't we use a high watermark of files' modified timestamps?
>
> For most sources, I think Spark should handle state serialization and
> recovery. Maybe we can find a good way to make the file source with
> unbounded state work, but this shouldn't be one of the driving cases for
> the design and consequently a reason for every source to need to manage its
> own state in a checkpoint directory.
>
> rb
>
> On Mon, Apr 30, 2018 at 12:37 PM, Joseph Torres <
> joseph.tor...@databricks.com> wrote:
>
>> I'd argue that letting bad cases influence the design is an explicit goal
>> of DataSourceV2. One of the primary motivations for the project was that
>> file sources hook into a series of weird internal side channels, with
>> favorable performance characteristics that are difficult to match in the
>> API we actually declare to Spark users. So a design that we can't migrate
>> file sources to without a side channel would be worrying; won't we end up
>> regressing to the same situation?
>>
>> On Mon, Apr 30, 2018 at 11:59 AM, Ryan Blue <rb...@netflix.com> wrote:
>>
>>> Should we really plan the API for a source with state that grows
>>> indefinitely? It sounds like we're letting a bad case influence the
>>> design, when we probably shouldn't.
>>>
>>> On Mon, Apr 30, 2018 at 11:05 AM, Joseph Torres <
>>> joseph.tor...@databricks.com> wrote:
>>>
>>>> Offset is just a type alias for arbitrary JSON-serializable state. Most
>>>> implementations should (and do) just toss the blob at Spark and let Spark
>>>> handle recovery on its own.
>>>>
>>>> In the case of file streams, the obstacle is that the conceptual offset
>>>> is very large: a list of every file which the stream has ever read. In
>>>> order to parse this efficiently, the stream connector needs detailed
>>>> control over how it's stored; the current implementation even has complex
>>>> compactification and retention logic.
>>>>
>>>>
>>>> On Mon, Apr 30, 2018 at 10:48 AM, Ryan Blue <rb...@netflix.com> wrote:
>>>>
>>>>> Why don't we just have the source return a Serializable of state when
>>>>> it reports offsets? Then Spark could handle storing the source's state and
>>>>> the source wouldn't need to worry about file system paths. I think that
>>>>> would be easier for implementations and better for recovery because i

Re: Datasource API V2 and checkpointing

2018-04-30 Thread Joseph Torres
I'd argue that letting bad cases influence the design is an explicit goal
of DataSourceV2. One of the primary motivations for the project was that
file sources hook into a series of weird internal side channels, with
favorable performance characteristics that are difficult to match in the
API we actually declare to Spark users. So a design that we can't migrate
file sources to without a side channel would be worrying; won't we end up
regressing to the same situation?

On Mon, Apr 30, 2018 at 11:59 AM, Ryan Blue <rb...@netflix.com> wrote:

> Should we really plan the API for a source with state that grows
> indefinitely? It sounds like we're letting a bad case influence the
> design, when we probably shouldn't.
>
> On Mon, Apr 30, 2018 at 11:05 AM, Joseph Torres <
> joseph.tor...@databricks.com> wrote:
>
>> Offset is just a type alias for arbitrary JSON-serializable state. Most
>> implementations should (and do) just toss the blob at Spark and let Spark
>> handle recovery on its own.
>>
>> In the case of file streams, the obstacle is that the conceptual offset
>> is very large: a list of every file which the stream has ever read. In
>> order to parse this efficiently, the stream connector needs detailed
>> control over how it's stored; the current implementation even has complex
>> compactification and retention logic.
>>
>>
>> On Mon, Apr 30, 2018 at 10:48 AM, Ryan Blue <rb...@netflix.com> wrote:
>>
>>> Why don't we just have the source return a Serializable of state when it
>>> reports offsets? Then Spark could handle storing the source's state and the
>>> source wouldn't need to worry about file system paths. I think that would
>>> be easier for implementations and better for recovery because it wouldn't
>>> leave unknown state on a single machine's file system.
>>>
>>> rb
>>>
>>> On Fri, Apr 27, 2018 at 9:23 AM, Joseph Torres <
>>> joseph.tor...@databricks.com> wrote:
>>>
>>>> The precise interactions with the DataSourceV2 API haven't yet been
>>>> hammered out in design. But much of this comes down to the core of
>>>> Structured Streaming rather than the API details.
>>>>
>>>> The execution engine handles checkpointing and recovery. It asks the
>>>> streaming data source for offsets, and then determines that batch N
>>>> contains the data between offset A and offset B. On recovery, if batch N
>>>> needs to be re-run, the execution engine just asks the source for the same
>>>> offset range again. Sources also get a handle to their own subfolder of the
>>>> checkpoint, which they can use as scratch space if they need. For example,
>>>> Spark's FileStreamReader keeps a log of all the files it's seen, so its
>>>> offsets can be simply indices into the log rather than huge strings
>>>> containing all the paths.
>>>>
>>>> SPARK-23323 is orthogonal. That commit coordinator is responsible for
>>>> ensuring that, within a single Spark job, two different tasks can't commit
>>>> the same partition.
>>>>
>>>> On Fri, Apr 27, 2018 at 8:53 AM, Thakrar, Jayesh <
>>>> jthak...@conversantmedia.com> wrote:
>>>>
>>>>> Wondering if this issue is related to SPARK-23323?
>>>>>
>>>>>
>>>>>
>>>>> Any pointers will be greatly appreciated….
>>>>>
>>>>>
>>>>>
>>>>> Thanks,
>>>>>
>>>>> Jayesh
>>>>>
>>>>>
>>>>>
>>>>> *From: *"Thakrar, Jayesh" <jthak...@conversantmedia.com>
>>>>> *Date: *Monday, April 23, 2018 at 9:49 PM
>>>>> *To: *"dev@spark.apache.org" <dev@spark.apache.org>
>>>>> *Subject: *Datasource API V2 and checkpointing
>>>>>
>>>>>
>>>>>
>>>>> I was wondering when checkpointing is enabled, who does the actual
>>>>> work?
>>>>>
>>>>> The streaming datasource or the execution engine/driver?
>>>>>
>>>>>
>>>>>
>>>>> I have written a small/trivial datasource that just generates strings.
>>>>>
>>>>> After enabling checkpointing, I do see a folder being created under
>>>>> the checkpoint folder, but there's nothing else in there.
>>>>>
>>>>>
>>>>>
>>>>> Same question for write-ahead and recovery?
>>>>>
>>>>> And on a restart from a failed streaming session - who should set the
>>>>> offsets?
>>>>>
>>>>> The driver/Spark or the datasource?
>>>>>
>>>>>
>>>>>
>>>>> Any pointers to design docs would also be greatly appreciated.
>>>>>
>>>>>
>>>>>
>>>>> Thanks,
>>>>>
>>>>> Jayesh
>>>>>
>>>>>
>>>>>
>>>>
>>>>
>>>
>>>
>>> --
>>> Ryan Blue
>>> Software Engineer
>>> Netflix
>>>
>>
>>
>
>
> --
> Ryan Blue
> Software Engineer
> Netflix
>


Re: Datasource API V2 and checkpointing

2018-04-30 Thread Joseph Torres
Offset is just a type alias for arbitrary JSON-serializable state. Most
implementations should (and do) just toss the blob at Spark and let Spark
handle recovery on its own.

In the case of file streams, the obstacle is that the conceptual offset is
very large: a list of every file which the stream has ever read. In order
to parse this efficiently, the stream connector needs detailed control over
how it's stored; the current implementation even has complex
compactification and retention logic.


On Mon, Apr 30, 2018 at 10:48 AM, Ryan Blue <rb...@netflix.com> wrote:

> Why don't we just have the source return a Serializable of state when it
> reports offsets? Then Spark could handle storing the source's state and the
> source wouldn't need to worry about file system paths. I think that would
> be easier for implementations and better for recovery because it wouldn't
> leave unknown state on a single machine's file system.
>
> rb
>
> On Fri, Apr 27, 2018 at 9:23 AM, Joseph Torres <
> joseph.tor...@databricks.com> wrote:
>
>> The precise interactions with the DataSourceV2 API haven't yet been
>> hammered out in design. But much of this comes down to the core of
>> Structured Streaming rather than the API details.
>>
>> The execution engine handles checkpointing and recovery. It asks the
>> streaming data source for offsets, and then determines that batch N
>> contains the data between offset A and offset B. On recovery, if batch N
>> needs to be re-run, the execution engine just asks the source for the same
>> offset range again. Sources also get a handle to their own subfolder of the
>> checkpoint, which they can use as scratch space if they need. For example,
>> Spark's FileStreamReader keeps a log of all the files it's seen, so its
>> offsets can be simply indices into the log rather than huge strings
>> containing all the paths.
>>
>> SPARK-23323 is orthogonal. That commit coordinator is responsible for
>> ensuring that, within a single Spark job, two different tasks can't commit
>> the same partition.
>>
>> On Fri, Apr 27, 2018 at 8:53 AM, Thakrar, Jayesh <
>> jthak...@conversantmedia.com> wrote:
>>
>>> Wondering if this issue is related to SPARK-23323?
>>>
>>>
>>>
>>> Any pointers will be greatly appreciated….
>>>
>>>
>>>
>>> Thanks,
>>>
>>> Jayesh
>>>
>>>
>>>
>>> *From: *"Thakrar, Jayesh" <jthak...@conversantmedia.com>
>>> *Date: *Monday, April 23, 2018 at 9:49 PM
>>> *To: *"dev@spark.apache.org" <dev@spark.apache.org>
>>> *Subject: *Datasource API V2 and checkpointing
>>>
>>>
>>>
>>> I was wondering when checkpointing is enabled, who does the actual work?
>>>
>>> The streaming datasource or the execution engine/driver?
>>>
>>>
>>>
>>> I have written a small/trivial datasource that just generates strings.
>>>
>>> After enabling checkpointing, I do see a folder being created under the
>>> checkpoint folder, but there's nothing else in there.
>>>
>>>
>>>
>>> Same question for write-ahead and recovery?
>>>
>>> And on a restart from a failed streaming session - who should set the
>>> offsets?
>>>
>>> The driver/Spark or the datasource?
>>>
>>>
>>>
>>> Any pointers to design docs would also be greatly appreciated.
>>>
>>>
>>>
>>> Thanks,
>>>
>>> Jayesh
>>>
>>>
>>>
>>
>>
>
>
> --
> Ryan Blue
> Software Engineer
> Netflix
>


Re: Datasource API V2 and checkpointing

2018-04-27 Thread Joseph Torres
The precise interactions with the DataSourceV2 API haven't yet been
hammered out in design. But much of this comes down to the core of
Structured Streaming rather than the API details.

The execution engine handles checkpointing and recovery. It asks the
streaming data source for offsets, and then determines that batch N
contains the data between offset A and offset B. On recovery, if batch N
needs to be re-run, the execution engine just asks the source for the same
offset range again. Sources also get a handle to their own subfolder of the
checkpoint, which they can use as scratch space if they need. For example,
Spark's FileStreamReader keeps a log of all the files it's seen, so its
offsets can be simply indices into the log rather than huge strings
containing all the paths.

SPARK-23323 is orthogonal. That commit coordinator is responsible for
ensuring that, within a single Spark job, two different tasks can't commit
the same partition.

On Fri, Apr 27, 2018 at 8:53 AM, Thakrar, Jayesh <
jthak...@conversantmedia.com> wrote:

> Wondering if this issue is related to SPARK-23323?
>
>
>
> Any pointers will be greatly appreciated….
>
>
>
> Thanks,
>
> Jayesh
>
>
>
> *From: *"Thakrar, Jayesh" 
> *Date: *Monday, April 23, 2018 at 9:49 PM
> *To: *"dev@spark.apache.org" 
> *Subject: *Datasource API V2 and checkpointing
>
>
>
> I was wondering when checkpointing is enabled, who does the actual work?
>
> The streaming datasource or the execution engine/driver?
>
>
>
> I have written a small/trivial datasource that just generates strings.
>
> After enabling checkpointing, I do see a folder being created under the
> checkpoint folder, but there's nothing else in there.
>
>
>
> Same question for write-ahead and recovery?
>
> And on a restart from a failed streaming session - who should set the
> offsets?
>
> The driver/Spark or the datasource?
>
>
>
> Any pointers to design docs would also be greatly appreciated.
>
>
>
> Thanks,
>
> Jayesh
>
>
>


Re: [discuss][data source v2] remove type parameter in DataReader/WriterFactory

2018-04-18 Thread Joseph Torres
The fundamental difficulty seems to be that there's a spurious "round-trip"
in the API. Spark inspects the source to determine what type it's going to
provide, picks an appropriate method according to that type, and then calls
that method on the source to finally get what it wants. Pushing this out of
the DataSourceReader doesn't eliminate this problem; it just shifts it. We
still need an InternalRow method and a ColumnarBatch method and possibly
Row and UnsafeRow methods too.

I'd propose it would be better to just accept a bit less type safety here,
and push the problem all the way down to the DataReader. Make
DataReader.get() return Object, and document that the runtime type had
better match the type declared in the reader's DataFormat. Then we can get
rid of the special Row/UnsafeRow/ColumnarBatch methods cluttering up the
API, and figure out whether to support Row and UnsafeRow independently of
all our other API decisions. (I didn't think about this until now, but the
fact that some orthogonal API decisions have to be conditioned on which set
of row formats we support seems like a code smell.)

On Wed, Apr 18, 2018 at 3:53 PM, Ryan Blue 
wrote:

> Wenchen, can you explain a bit more clearly why this is necessary? The
> pseudo-code you used doesn’t clearly demonstrate why. Why couldn’t this be
> handled this with inheritance from an abstract Factory class? Why define
> all of the createXDataReader methods, but make the DataFormat a field in
> the factory?
>
> A related issue is that I think there’s a strong case that the v2 sources
> should produce only InternalRow and that Row and UnsafeRow shouldn’t be
> exposed; see SPARK-23325
> . The basic arguments
> are:
>
>- UnsafeRow is really difficult to produce without using Spark’s
>projection methods. If implementations can produce UnsafeRow, then
>they can still pass them as InternalRow and the projection Spark adds
>would be a no-op. When implementations can’t produce UnsafeRow, then
>it is better for Spark to insert the projection to unsafe. An example of a
>data format that doesn’t produce unsafe is the built-in Parquet source,
>which produces InternalRow and projects before returning the row.
>- For Row, I see no good reason to support it in a new interface when
>it will just introduce an extra transformation. The argument that Row
>is the “public” API doesn’t apply because UnsafeRow is already exposed
>through the v2 API.
>- Standardizing on InternalRow would remove the need for these
>interfaces entirely and simplify what implementers must provide and would
>reduce confusion over what to do.
>
> Using InternalRow doesn’t cover the case where we want to produce
> ColumnarBatch instead, so what you’re proposing might still be a good
> idea. I just think that we can simplify either path.
> ​
>
> On Mon, Apr 16, 2018 at 11:17 PM, Wenchen Fan  wrote:
>
>> Yea definitely not. The only requirement is, the DataReader/WriterFactory
>> must support at least one DataFormat.
>>
>> >  how are we going to express capability of the given reader of its
>> supported format(s), or specific support for each of “real-time data in row
>> format, and history data in columnar format”?
>>
>> When DataSourceReader/Writer create factories, the factory must contain
>> enough information to decide the data format. Let's take ORC as an example.
>> In OrcReaderFactory, it knows which files to read, and which columns to
>> output. Since now Spark only support columnar scan for simple types,
>> OrcReaderFactory will only output ColumnarBatch if the columns to scan
>> are all simple types.
>>
>> On Tue, Apr 17, 2018 at 11:38 AM, Felix Cheung > > wrote:
>>
>>> Is it required for DataReader to support all known DataFormat?
>>>
>>> Hopefully, not, as assumed by the ‘throw’ in the interface. Then
>>> specifically how are we going to express capability of the given reader of
>>> its supported format(s), or specific support for each of “real-time data in
>>> row format, and history data in columnar format”?
>>>
>>>
>>> --
>>> *From:* Wenchen Fan 
>>> *Sent:* Sunday, April 15, 2018 7:45:01 PM
>>> *To:* Spark dev list
>>> *Subject:* [discuss][data source v2] remove type parameter in
>>> DataReader/WriterFactory
>>>
>>> Hi all,
>>>
>>> I'd like to propose an API change to the data source v2.
>>>
>>> One design goal of data source v2 is API type safety. The FileFormat API
>>> is a bad example, it asks the implementation to return InternalRow even
>>> it's actually ColumnarBatch. In data source v2 we add a type parameter
>>> to DataReader/WriterFactoty and DataReader/Writer, so that data source
>>> supporting columnar scan returns ColumnarBatch at API level.
>>>
>>> However, we met some problems when migrating streaming and file-based
>>> data source to data 

Re: Spark 2.3 V2 Datasource API questions

2018-04-06 Thread Joseph Torres
Thanks for trying it out!

We haven't hooked continuous streaming up to query.status or
query.recentProgress yet - commit() should be called under the hood, we
just don't yet report that it is. I've filed SPARK-23886 and SPARK-23887 to
track the work to add those things.

The issue with printing warnings whenever the query is stopped is tracked
in SPARK-23444.

Jose

On Fri, Apr 6, 2018 at 8:29 AM, Thakrar, Jayesh <
jthak...@conversantmedia.com> wrote:

> First of all thank you to the Spark dev team for coming up with the
> standardized and intuitive API interfaces.
>
> I am sure it will encourage integrating a lot more new datasource
> integration.
>
>
>
> I have been creating playing with the API and have some questions on the
> continuous streaming API
>
> (see https://github.com/JThakrar/sparkconn#continuous-streaming-datasource
> )
>
>
>
> *It seems that "commit" is never called *
>
>
>
> *query.status always shows the message below even after the query has been
> initialized, data has been streaming:*
>
> {
>
>   "message" : "Initializing sources",
>
>   "isDataAvailable" : false,
>
>   "isTriggerActive" : true
>
> }
>
>
>
>
>
> *query.recentProgress always shows an empty array:*
>
>
>
> Array[org.apache.spark.sql.streaming.StreamingQueryProgress] = Array()
>
>
>
> *And stopping a query always shows as if the tasks were lost involuntarily
> or uncleanly (even though close on the datasource was called) :*
>
> 2018-04-06 08:07:10 WARN  TaskSetManager:66 - Lost task 2.0 in stage 1.0
> (TID 7, localhost, executor driver): TaskKilled (Stage cancelled)
>
> 2018-04-06 08:07:10 WARN  TaskSetManager:66 - Lost task 1.0 in stage 1.0
> (TID 6, localhost, executor driver): TaskKilled (Stage cancelled)
>
> 2018-04-06 08:07:10 WARN  TaskSetManager:66 - Lost task 3.0 in stage 1.0
> (TID 8, localhost, executor driver): TaskKilled (Stage cancelled)
>
> 2018-04-06 08:07:10 WARN  TaskSetManager:66 - Lost task 0.0 in stage 1.0
> (TID 5, localhost, executor driver): TaskKilled (Stage cancelled)
>
> 2018-04-06 08:07:10 WARN  TaskSetManager:66 - Lost task 4.0 in stage 1.0
> (TID 9, localhost, executor driver): TaskKilled (Stage cancelled)
>
>
>
> Any pointers/info will be greatly appreciated.
>
>
>
>
>
>
>


Re: Beginner searching for guidance with Jira and issues

2018-03-20 Thread Joseph Torres
Hi!

I can't speak for the other tasks, but SPARK-23444 I'd expect to be pretty
complicated. It's not obvious what the right strategy is, and there's a
bunch of minor stuff that needs to be cleaned up (e.g. tasks shouldn't
print cancellation warnings when cancellation is expected).

If you're interested in working on continuous processing, https://issues.
apache.org/jira/browse/SPARK-23503 could be a good newbie task. It's a
pretty localized change to the EpochCoordinator class; basically, it needs
to wait to call query.commit(n + 1) until after query.commit(n). I'm not
sure how well I've managed to document the existing implementation, but I'd
be happy to answer any questions about it.

Jose

On Tue, Mar 20, 2018 at 9:01 AM, Efim Poberezkin 
wrote:

> Good time of day,
>
>
>
> I’d like to contribute to Spark development, but find it difficult to get
> into the process. I’m somewhat overwhelmed by Spark’s Jira as it’s hard for
> me to figure out the complexity of tasks and choose an appropriate one.
>
> I’ve surfed Jira for some time and have selected a few issues I think I
> could try to solve:
>
>
>
> https://issues.apache.org/jira/browse/SPARK-23444
>
> https://issues.apache.org/jira/browse/SPARK-23693
>
> https://issues.apache.org/jira/browse/SPARK-23673 - although for this one
> there’s an uncertainty that it’s needed at all, according to the comment
>
>
>
> Also I think it would be interesting to work on Continuous Processing if
> there were some newbie tasks, but I wasn’t able to find them.
>
> If you could give me some directions on any of these issues I’ve linked,
> or just point to some tasks that are suitable for a beginner, that’d help
> me a lot, I would appreciate any advice.
>
>
>
> Best regards,
>
> Efim
>


[DISCUSS] Structured Streaming writers in DataSourceV2

2018-03-09 Thread Joseph Torres
Hi all,

I've been working for the past few months on figuring out a DataSourceV2
compatible interface for Structured Streaming sinks. I've written a document

proposing
an overarching design; please take a look and leave any comments you have.


Design doc JIRA: SPARK-23556

DataSourceV2: SPARK-15689


Jose


Re: [VOTE] Spark 2.3.0 (RC2)

2018-01-25 Thread Joseph Torres
SPARK-23221 fixes an issue specific
to KafkaContinuousSourceStressForDontFailOnDataLossSuite; I don't think it
could cause other suites to deadlock.

Do note that the previous hang issues we saw caused by SPARK-23055 were
correctly marked as failures.

On Thu, Jan 25, 2018 at 3:40 PM, Shixiong(Ryan) Zhu  wrote:

> + Jose
>
> On Thu, Jan 25, 2018 at 2:18 PM, Dongjoon Hyun 
> wrote:
>
>> SPARK-23221 is one of the reasons for Kafka-test-suite deadlock issue.
>>
>> For the hang issues, it seems not to be marked as a failure correctly in
>> Apache Spark Jenkins history.
>>
>>
>> On Thu, Jan 25, 2018 at 1:03 PM, Marcelo Vanzin 
>> wrote:
>>
>>> On Thu, Jan 25, 2018 at 12:29 PM, Sean Owen  wrote:
>>> > I am still seeing these tests fail or hang:
>>> >
>>> > - subscribing topic by name from earliest offsets (failOnDataLoss:
>>> false)
>>> > - subscribing topic by name from earliest offsets (failOnDataLoss:
>>> true)
>>>
>>> This is something that we are seeing internally on a different version
>>> Spark, and we're currently investigating with our Kafka people. Not
>>> sure it's the same issue (we have a newer version of Kafka libraries),
>>> but this is just another way of saying that I don't think those hangs
>>> are new in 2.3, at least.
>>>
>>> --
>>> Marcelo
>>>
>>> -
>>> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>>>
>>>
>>
>