Re: What's the root cause of not supporting multiple aggregations in structured streaming?

2019-05-20 Thread Arun Mahadevan
Heres the proposal for supporting it in "append" mode -
https://github.com/apache/spark/pull/23576. You could see if it addresses
your requirement and post your feedback in the PR.
For "update" mode its going to be much harder to support this without first
adding support for "retractions", otherwise we would end up with wrong
results.

- Arun


On Mon, 20 May 2019 at 01:34, Gabor Somogyi 
wrote:

> There is PR for this but not yet merged.
>
> On Mon, May 20, 2019 at 10:13 AM 张万新  wrote:
>
>> Hi there,
>>
>> I'd like to know what's the root reason why multiple aggregations on
>> streaming dataframe is not allowed since it's a very useful feature, and
>> flink has supported it for a long time.
>>
>> Thanks.
>>
>


Re: Spark 2.4.2

2019-04-19 Thread Arun Mahadevan
+1 to upgrade Jackson. It has come up multiple times due to CVEs and the
back port has worked out but it may be good to include if its not going to
delay the release.

On Thu, 18 Apr 2019 at 19:53, Wenchen Fan  wrote:

> I've cut RC1. If people think we must upgrade Jackson in 2.4, I can cut
> RC2 shortly.
>
> Thanks,
> Wenchen
>
> On Fri, Apr 19, 2019 at 3:32 AM Felix Cheung 
> wrote:
>
>> Re shading - same argument I’ve made earlier today in a PR...
>>
>> (Context- in many cases Spark has light or indirect dependencies but
>> bringing them into the process breaks users code easily)
>>
>>
>> --
>> *From:* Michael Heuer 
>> *Sent:* Thursday, April 18, 2019 6:41 AM
>> *To:* Reynold Xin
>> *Cc:* Sean Owen; Michael Armbrust; Ryan Blue; Spark Dev List; Wenchen
>> Fan; Xiao Li
>> *Subject:* Re: Spark 2.4.2
>>
>> +100
>>
>>
>> On Apr 18, 2019, at 1:48 AM, Reynold Xin  wrote:
>>
>> We should have shaded all Spark’s dependencies :(
>>
>> On Wed, Apr 17, 2019 at 11:47 PM Sean Owen  wrote:
>>
>>> For users that would inherit Jackson and use it directly, or whose
>>> dependencies do. Spark itself (with modifications) should be OK with
>>> the change.
>>> It's risky and normally wouldn't backport, except that I've heard a
>>> few times about concerns about CVEs affecting Databind, so wondering
>>> who else out there might have an opinion. I'm not pushing for it
>>> necessarily.
>>>
>>> On Wed, Apr 17, 2019 at 6:18 PM Reynold Xin  wrote:
>>> >
>>> > For Jackson - are you worrying about JSON parsing for users or
>>> internal Spark functionality breaking?
>>> >
>>> > On Wed, Apr 17, 2019 at 6:02 PM Sean Owen  wrote:
>>> >>
>>> >> There's only one other item on my radar, which is considering updating
>>> >> Jackson to 2.9 in branch-2.4 to get security fixes. Pros: it's come up
>>> >> a few times now that there are a number of CVEs open for 2.6.7. Cons:
>>> >> not clear they affect Spark, and Jackson 2.6->2.9 does change Jackson
>>> >> behavior non-trivially. That said back-porting the update PR to 2.4
>>> >> worked out OK locally. Any strong opinions on this one?
>>> >>
>>> >> On Wed, Apr 17, 2019 at 7:49 PM Wenchen Fan 
>>> wrote:
>>> >> >
>>> >> > I volunteer to be the release manager for 2.4.2, as I was also
>>> going to propose 2.4.2 because of the reverting of SPARK-25250. Is there
>>> any other ongoing bug fixes we want to include in 2.4.2? If no I'd like to
>>> start the release process today (CST).
>>> >> >
>>> >> > Thanks,
>>> >> > Wenchen
>>> >> >
>>> >> > On Thu, Apr 18, 2019 at 3:44 AM Sean Owen  wrote:
>>> >> >>
>>> >> >> I think the 'only backport bug fixes to branches' principle
>>> remains sound. But what's a bug fix? Something that changes behavior to
>>> match what is explicitly supposed to happen, or implicitly supposed to
>>> happen -- implied by what other similar things do, by reasonable user
>>> expectations, or simply how it worked previously.
>>> >> >>
>>> >> >> Is this a bug fix? I guess the criteria that matches is that
>>> behavior doesn't match reasonable user expectations? I don't know enough to
>>> have a strong opinion. I also don't think there is currently an objection
>>> to backporting it, whatever it's called.
>>> >> >>
>>> >> >>
>>> >> >> Is the question whether this needs a new release? There's no harm
>>> in another point release, other than needing a volunteer release manager.
>>> One could say, wait a bit longer to see what more info comes in about
>>> 2.4.1. But given that 2.4.1 took like 2 months, it's reasonable to move
>>> towards a release cycle again. I don't see objection to that either (?)
>>> >> >>
>>> >> >>
>>> >> >> The meta question remains: is a 'bug fix' definition even agreed,
>>> and being consistently applied? There aren't correct answers, only best
>>> guesses from each person's own experience, judgment and priorities. These
>>> can differ even when applied in good faith.
>>> >> >>
>>> >> >> Sometimes the variance of opinion comes because people have
>>> different info that needs to be surfaced. Here, maybe it's best to share
>>> what about that offline conversation was convincing, for example.
>>> >> >>
>>> >> >> I'd say it's also important to separate what one would prefer from
>>> what one can't live with(out). Assuming one trusts the intent and
>>> experience of the handful of others with an opinion, I'd defer to someone
>>> who wants X and will own it, even if I'm moderately against it. Otherwise
>>> we'd get little done.
>>> >> >>
>>> >> >> In that light, it seems like both of the PRs at issue here are not
>>> _wrong_ to backport. This is a good pair that highlights why, when there
>>> isn't a clear reason to do / not do something (e.g. obvious errors,
>>> breaking public APIs) we give benefit-of-the-doubt in order to get it later.
>>> >> >>
>>> >> >>
>>> >> >> On Wed, Apr 17, 2019 at 12:09 PM Ryan Blue <
>>> rb...@netflix.com.invalid> wrote:
>>> >> >>>
>>> >> >>> Sorry, I should be more clear about what I'm trying to say here.

Re: Closing a SparkSession stops the SparkContext

2019-04-02 Thread Arun Mahadevan
I am not sure how would it cause a leak though. When a spark session or the
underlying context is stopped it should clean up everything. The
getOrCreate is supposed to return the active thread local or the global
session. May be if you keep creating new sessions after explicitly clearing
the default and the local sessions and keep leaking the sessions it could
happen, but I don't think Sessions are intended to be used that way.



On Tue, 2 Apr 2019 at 08:45, Ryan Blue  wrote:

> I think Vinoo is right about the intended behavior. If we support multiple
> sessions in one context, then stopping any one session shouldn't stop the
> shared context. The last session to be stopped should stop the context, but
> not any before that. We don't typically run multiple sessions in the same
> context so we haven't hit this, but it sounds reasonable.
>
> On Tue, Apr 2, 2019 at 8:23 AM Vinoo Ganesh  wrote:
>
>> Hey Sean - Cool, maybe I'm misunderstanding the intent of clearing a
>> session vs. stopping it.
>>
>> The cause of the leak looks to be because of this line here
>> https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/util/QueryExecutionListener.scala#L131.
>> The ExecutionListenerBus that's added persists forever on the context's
>> listener bus (the SparkContext ListenerBus has an ExecutionListenerBus).
>> I'm trying to figure out the place that this cleanup should happen.
>>
>> With the current implementation, calling SparkSession.stop will clean up
>> the ExecutionListenerBus (since the context itself is stopped), but it's
>> unclear to me why terminating one session should terminate the JVM-global
>> context. Possible my mental model is off here, but I would expect stopping
>> a session to remove all traces of that session, while keeping the context
>> alive, and stopping a context would, well, stop the context.
>>
>> If stopping the session is expected to stop the context, what's the
>> intended usage of clearing the active / default session?
>>
>> Vinoo
>>
>> On 4/2/19, 10:57, "Sean Owen"  wrote:
>>
>> What are you expecting there ... that sounds correct? something else
>> needs to be closed?
>>
>> On Tue, Apr 2, 2019 at 9:45 AM Vinoo Ganesh 
>> wrote:
>> >
>> > Hi All -
>> >
>> >I’ve been digging into the code and looking into what appears to
>> be a memory leak (
>> https://urldefense.proofpoint.com/v2/url?u=https-3A__jira.apache.org_jira_browse_SPARK-2D27337=DwIFaQ=izlc9mHr637UR4lpLEZLFFS3Vn2UXBrZ4tFb6oOnmz8=7WzLIMu3WvZwd6AMPatqn1KZW39eI6c_oflAHIy1NUc=TjtXLhnSM5M_aKQlD2NFU2wRnXPvtrUbRm-t84gBNlY=JUsN7EzGimus0jYxyj47_xHYUDC6KnxieeUBfUKTefk=)
>> and have noticed something kind of peculiar about the way closing a
>> SparkSession is handled. Despite being marked as Closeable,
>> closing/stopping a SparkSession simply stops the SparkContext. This changed
>> happened as a result of one of the PRs addressing
>> https://urldefense.proofpoint.com/v2/url?u=https-3A__jira.apache.org_jira_browse_SPARK-2D15073=DwIFaQ=izlc9mHr637UR4lpLEZLFFS3Vn2UXBrZ4tFb6oOnmz8=7WzLIMu3WvZwd6AMPatqn1KZW39eI6c_oflAHIy1NUc=TjtXLhnSM5M_aKQlD2NFU2wRnXPvtrUbRm-t84gBNlY=Nd9eBDH-FDdzEn_BVt2nZaNQn6fXA8EfVq5rKGztOUo=
>> in
>> https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_apache_spark_pull_12873_files-23diff-2Dd91c284798f1c98bf03a31855e26d71cR596=DwIFaQ=izlc9mHr637UR4lpLEZLFFS3Vn2UXBrZ4tFb6oOnmz8=7WzLIMu3WvZwd6AMPatqn1KZW39eI6c_oflAHIy1NUc=TjtXLhnSM5M_aKQlD2NFU2wRnXPvtrUbRm-t84gBNlY=RM9LrT3Yp2mf1BcbBf1o_m3bcNZdOjznrogBLzUzgeE=
>> .
>> >
>> >
>> >
>> > I’m trying to understand why this is the intended behavior – anyone
>> have any knowledge of why this is the case?
>> >
>> >
>> >
>> > Thanks,
>> >
>> > Vinoo
>>
>>
>>
>> -
>> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>>
>>
>
> --
> Ryan Blue
> Software Engineer
> Netflix
>


Re: Request review for long-standing PRs

2019-02-26 Thread Arun Mahadevan
Yes, I agree thats its a valid concern and leads to individual contributors
giving up on new ideas or major improvements.

On Tue, 26 Feb 2019 at 15:24, Jungtaek Lim  wrote:

> Adding one more, it implicitly leads individual contributors to give up
> with challenging major things and just focus on minor things, which would
> even help on project, but not in the long run. We don't have roadmap put
> into wall and let whole community share the load together, so individual
> contributors have lots of risks on putting major efforts - shouldn't
> conflict to what others have been doing privately, should be accepted after
> putting numerous effort to design and have POC.
>
> 2019년 2월 27일 (수) 오전 8:14, Jungtaek Lim 님이 작성:
>
>> Thanks Sean, as always, to share your thought quickly!
>>
>> I agree most of points, except "they add a lot of code and complexity
>> relative to benefit", since no one can weigh on something before at least
>> taking quick review. IMHO if someone would think so, better to speak (I
>> know it's hard and being a chance to be blamed, but better than giving
>> meaningless hope, yeah I admit I might be another one to do that in another
>> project) and see how others will weigh, rather than let it put aside and
>> ignore.
>>
>> I guess my target is already simple and targeted since I've only
>> mentioned SS area - there're not much committers who can review SS area.
>> Thing to consider is, I have PRs in other areas as well, and I don't have
>> issue on these areas. The reason of posting it to dev mailing list instead
>> of periodically ping in Github PR is, 1) ping in PR just doesn't work 2)
>> let others - hopefully PMC members - indicate a lack on activity on SS
>> area, and lead some action.
>>
>> 2019년 2월 27일 (수) 오전 7:57, Sean Owen 님이 작성:
>>
>>> Those aren't bad changes, but they add a lot of code and complexity
>>> relative to benefit. I think it's positive that you've gotten people
>>> to spend time reviewing them, quite a lot. I don't know whether they
>>> should be merged. This isn't a 'bug' though; not all changes should be
>>> committed. Simple and targeted is much easier to say yes to, because
>>> you implicitly here ask a lot of people to assume responsibility for
>>> your change.
>>>
>>> On Tue, Feb 26, 2019 at 4:38 PM Jungtaek Lim  wrote:
>>> >
>>> > Hi devs,
>>> >
>>> > sorry to bring this again to mailing list, but you know, ping in
>>> Github PR just doesn't work.
>>> >
>>> > I have long-stand (created in last year) PRs on SS area which already
>>> got over 100 comments (so community and me already put lots of efforts) but
>>> no progress in point of view for being merged unfortunately lack of
>>> committers' attention.
>>> >
>>> > - SPARK-20568 [1] : Provide option to clean up completed files in
>>> streaming query
>>> > - SPARK-25151 [2] : Apply Apache Commons Pool to KafkaDataConsumer
>>> >
>>> > According to my experiences on previous PRs (including other areas),
>>> it won't take more than 1 months regardless of size of code diff to merge
>>> once committer(s) gave a focus on PR and reviewed.
>>> >
>>> > Thanks,
>>> > Jungtaek Lim (HeartSaVioR)
>>> >
>>> > ps. I may agree all committers in SS area could be busy (It might
>>> clearly represent SS area lacks committers), but I may not agree they're
>>> involved in DSv2 and DSv2 is the first thing to focus. I haven't seen
>>> anyone in participants on DSv2 discussions, and most of PRs in SS area is
>>> parallel to DSv2 so I'm wondering why we try to couple SS area with DSv2
>>> and restrict its evolution.
>>> >
>>> > ps2. Some of above is the part of previous mail thread regarding "Plan
>>> on Structured Streaming in next major/minor release?" [3]
>>> >
>>> > I'm sure I still would like to address other items in the list (or
>>> new), but without fast feedback it would not be possible. (Maintaining
>>> multiple of long-lasting PRs make contributors very tired, and sometimes
>>> worse than giving -1 and providing reason to reject.)
>>> >
>>> > 1. https://github.com/apache/spark/pull/22952
>>> > 2. https://github.com/apache/spark/pull/22138
>>> > 3.
>>> https://lists.apache.org/thread.html/e6c8a530c998c4a2bb12b167f815d3726d155ce722047957e32689df@%3Cdev.spark.apache.org%3E
>>>
>>


Re: [SS] Allowing stream Sink metadata as part of checkpoint?

2019-02-25 Thread Arun Mahadevan
Unless its some sink metadata to be maintained by the framework (e.g sink
state that needs to be passed back to the sink etc), would it make sense
to keep it under the checkpoint dir ?

Maybe I am missing the motivation of the proposed approach but I guess
the sink mostly needs to store the last seen batchId to discard duplicate
data during a batch replay. It would be ideal
for the sink to store this information in the external store (along with
the data) for de-duplication to work correctly.

Thanks,
Arun



On Mon, 25 Feb 2019 at 22:13, Jungtaek Lim  wrote:

> Hi devs,
>
> I was about to give it a try, but it would relate to DSv2 so decide to
> initiate new thread before actual work. I also don't think this should be
> along with DSv2 discussion since the change would be minor.
>
> While dealing with SPARK-24295 [1] and SPARK-26411 [2], I feel the needs
> of participating sink metadata into checkpoint directory, but unlike source
> which metadata directory is provided as subdirectory of checkpoint
> directory, sink doesn't receive its own metadata directory.
>
> For example, FileStreamSink creates metadata directory on output directory
> - though it is a bit intentional to share between queries - but sometimes
> we may want to make it coupled with query checkpoint.
>
> What do you think about passing metadata path to sink (we have only one
> for query) so that sink metadata can be coupled with query checkpoint?
>
> Thanks,
> Jungtaek Lim (HeartSaVioR)
>
> 1. https://issues.apache.org/jira/browse/SPARK-24295
> 2. https://issues.apache.org/jira/browse/SPARK-26411
>
>


Re: Welcome Jose Torres as a Spark committer

2019-01-29 Thread Arun Mahadevan
Congrats Jose! Well deserved.

On Tue, 29 Jan 2019 at 11:15, Jules Damji  wrote:

> Congrats Jose!
>
> Sent from my iPhone
> Pardon the dumb thumb typos :)
>
> On Jan 29, 2019, at 11:07 AM, shane knapp  wrote:
>
> congrats, and welcome!
>
> On Tue, Jan 29, 2019 at 11:07 AM Dean Wampler 
> wrote:
>
>> Congrats, Jose!
>>
>>
>> *Dean Wampler, Ph.D.*
>>
>> *VP, Fast Data Engineering at Lightbend*
>>
>>
>> On Tue, Jan 29, 2019 at 12:52 PM Burak Yavuz  wrote:
>>
>>> Congrats Jose!
>>>
>>> On Tue, Jan 29, 2019 at 10:50 AM Xiao Li  wrote:
>>>
 Congratulations!

 Xiao

 Shixiong Zhu  于2019年1月29日周二 上午10:48写道:

> Hi all,
>
> The Apache Spark PMC recently added Jose Torres as a committer on the
> project. Jose has been a major contributor to Structured Streaming. Please
> join me in welcoming him!
>
> Best Regards,
>
> Shixiong Zhu
>
>
>
> --
> Shane Knapp
> UC Berkeley EECS Research / RISELab Staff Technical Lead
> https://rise.cs.berkeley.edu
>
>


Re: Support SqlStreaming in spark

2018-12-21 Thread Arun Mahadevan
There has been efforts to come up with a unified syntax for streaming (see
[1] [2]), but I guess there will be differences based on the streaming
features supported by a system.

Agree it needs a detailed design and it can be as close to the Spark batch
SQL syntax as possible.

Also I am not sure if its possible or makes sense to express all the
operations via pure sql. e.g. the query start/stop, triggers, watermark etc
might be better expressed via APIs.

[1]
https://docs.google.com/document/d/1wrla8mF_mmq-NW9sdJHYVgMyZsgCmHumJJ5f5WUzTiM/edit#heading=h.vfrf26d6b3ne
[2] https://calcite.apache.org/docs/stream.html


On Fri, 21 Dec 2018 at 18:13, Wenchen Fan  wrote:

> It will be great to add pure-SQL support to structured streaming. I think
> it goes without saying that how important SQL support is, but we should
> make a completed design first.
>
> Looking at the Kafka streaming syntax
> , it
> has CREATE STREAM, it has WINDOW TUMBLING. Shall we check other streaming
> systems with SQL support, and justify places where we are going to differ?
>
> We should also take into account the full lifecycle:
> 1. how to restart a streaming query from checkpoint?
> 2. how to stop a streaming query?
> 3. how to check status/progress of a streaming query?
> 4. ...
>
> Basically, we should check what functions the DataStreamReader/Writer API
> support, and see if we can support it with SQL as well.
>
>
> Thanks for your proposal!
> Wenchen
>
> On Mon, Oct 22, 2018 at 11:15 AM JackyLee  wrote:
>
>> The code of SQLStreaming has been pushed:
>>
>> https://github.com/apache/spark/pull/22575
>>
>>
>>
>> --
>> Sent from: http://apache-spark-developers-list.1001551.n3.nabble.com/
>>
>> -
>> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>>
>>


Re: Implementation for exactly-once streaming sink

2018-12-05 Thread Arun Mahadevan
I guess thats roughly it.

As of now theres no in-built support to co-ordinate the commits across the
executors in an atomic way. So you need to commit the batch (global commit)
at the driver.

And when the batch is replayed and if any of the intermediate operations
are not idempotent or can cause side effects, the result produced during
replay may be different from what you committed and would be ignored.

Thanks,
Arun

On Wed, 5 Dec 2018 at 11:36, Eric Wohlstadter  wrote:

> Hi all,
>  We are working on implementing a streaming sink on 2.3.1 with the
> DataSourceV2 APIs.
>
> Can anyone help check if my understanding is correct, with respect to the
> failure modes which need to be covered?
>
> We are assuming that a Reliable Receiver (such as Kafka) is used as the
> stream source. And we only want to support micro-batch execution at this
> time (not yet Continuous Processing).
>
> I believe the possible failures that need to be covered are:
>
> 1. Task failure: If a task fails, it may have written data to the sink
> output before failure. Subsequent attempts for a failed task must be
> idempotent, so that no data is duplicated in the output.
> 2. Driver failure: If the driver fails, upon recovery, it might replay a
> micro-batch that was already seen by the sink (if a failure occurs after
> the sink has committed output but before the driver has updated the
> checkpoint). In this case, the sink must be idempotent when a micro-batch
> is replayed so that no data is duplicated in the output.
>
> Are there any other cases where data might be duplicated in the stream?
> i.e. if neither of these 2 failures occur, is there still a case where
> data can be duplicated?
>
> Thanks for any help to check if my understanding is correct.
>
>
>
>
>


Re: DataSourceV2 sync tomorrow

2018-11-13 Thread Arun Mahadevan
IMO, the currentOffset should not be optional.
For continuous mode I assume this offset gets periodically check pointed
(so mandatory) ?
For the micro batch mode the currentOffset would be the start offset for a
micro-batch.

And if the micro-batch could be executed without knowing the 'latest'
offset (say until 'next' returns false), we only need the current offset
(to figure out the offset boundaries of a micro-batch) and may be then the
'latest' offset is not needed at all.

- Arun


On Tue, 13 Nov 2018 at 16:01, Ryan Blue  wrote:

> Hi everyone,
> I just wanted to send out a reminder that there’s a DSv2 sync tomorrow at
> 17:00 PST, which is 01:00 UTC.
>
> Here are some of the topics under discussion in the last couple of weeks:
>
>- Read API for v2 - see Wenchen’s doc
>
> 
>- Capabilities API - see the dev list thread
>
> 
>- Using CatalogTableIdentifier to reliably separate v2 code paths -
>see PR #21978 
>- A replacement for InternalRow
>
> I know that a lot of people are also interested in combining the source
> API for micro-batch and continuous streaming. Wenchen and I have been
> discussing a way to do that and Wenchen has added it to the Read API doc as
> Alternative #2. I think this would be a good thing to plan on discussing.
>
> rb
>
> Here’s some additional background on combining micro-batch and continuous
> APIs:
>
> The basic idea is to update how tasks end so that the same tasks can be
> used in micro-batch or streaming. For tasks that are naturally limited like
> data files, when the data is exhausted, Spark stops reading. For tasks that
> are not limited, like a Kafka partition, Spark decides when to stop in
> micro-batch mode by hitting a pre-determined LocalOffset or Spark can just
> keep running in continuous mode.
>
> Note that a task deciding to stop can happen in both modes, either when a
> task is exhausted in micro-batch or when a stream needs to be reconfigured
> in continuous.
>
> Here’s the task reader API. The offset returned is optional so that a task
> can avoid stopping if there isn’t a resumeable offset, like if it is in the
> middle of an input file:
>
> interface StreamPartitionReader extends InputPartitionReader {
>   Optional currentOffset();
>   boolean next() // from InputPartitionReader
>   T get()// from InputPartitionReader
> }
>
> The streaming code would look something like this:
>
> Stream stream = scan.toStream()
> StreamReaderFactory factory = stream.createReaderFactory()
>
> while (true) {
>   Offset start = stream.currentOffset()
>   Offset end = if (isContinuousMode) {
> None
>   } else {
> // rate limiting would happen here
> Some(stream.latestOffset())
>   }
>
>   InputPartition[] parts = stream.planInputPartitions(start)
>
>   // returns when needsReconfiguration is true or all tasks finish
>   runTasks(parts, factory, end)
>
>   // the stream's current offset has been updated at the last epoch
> }
>
> --
> Ryan Blue
> Software Engineer
> Netflix
>


Re: DataSourceV2 hangouts sync

2018-10-31 Thread Arun Mahadevan
Thanks for bringing up the custom metrics API in the list, its something
that needs to be addressed.

A couple more items worth considering,

1. Possibility to unify the batch, micro-batch and continuous sources.
(similar to SPARK-25000)
Right now now there is significant code duplication even between
micro-batch v/s continuous sources.
Attempt to redesign such that a single implementation could potentially
work across modes (by implementing relevant apis).
2. Better framework support for supporting end-end exactly-once in
streaming. (maybe framework level support for 2PC).

Thanks,
Arun


On Tue, 30 Oct 2018 at 19:24, Wenchen Fan  wrote:

> Hi all,
>
> I spent some time thinking about the roadmap, and came up with an initial
> list:
> SPARK-25390: data source V2 API refactoring
> SPARK-24252: add catalog support
> SPARK-25531: new write APIs for data source v2
> SPARK-25190: better operator pushdown API
> Streaming rate control API
> Custom metrics API
> Migrate existing data sources
> Move data source v2 and built-in implementations to individual modules.
>
>
> Let's have more discussion over the hangout.
>
> Thanks,
> Wenchen
>
> On Tue, Oct 30, 2018 at 4:32 AM Ryan Blue 
> wrote:
>
>> Everyone,
>>
>> There are now 25 guests invited, which is a lot of people to actively
>> participate in a sync like this.
>>
>> For those of you who probably won't actively participate, I've added a
>> live stream. If you don't plan to talk, please use the live stream instead
>> of the meet/hangout so that we don't end up with so many people that we
>> can't actually get the discussion going. Here's a link to the stream:
>>
>> https://stream.meet.google.com/stream/6be59d80-04c7-44dc-9042-4f3b597fc8ba
>>
>> Thanks!
>>
>> rb
>>
>> On Thu, Oct 25, 2018 at 1:09 PM Ryan Blue  wrote:
>>
>>> Hi everyone,
>>>
>>> There's been some great discussion for DataSourceV2 in the last few
>>> months, but it has been difficult to resolve some of the discussions and I
>>> don't think that we have a very clear roadmap for getting the work done.
>>>
>>> To coordinate better as a community, I'd like to start a regular sync-up
>>> over google hangouts. We use this in the Parquet community to have more
>>> effective community discussions about thorny technical issues and to get
>>> aligned on an overall roadmap. It is really helpful in that community and I
>>> think it would help us get DSv2 done more quickly.
>>>
>>> Here's how it works: people join the hangout, we go around the list to
>>> gather topics, have about an hour-long discussion, and then send a summary
>>> of the discussion to the dev list for anyone that couldn't participate.
>>> That way we can move topics along, but we keep the broader community in the
>>> loop as well for further discussion on the mailing list.
>>>
>>> I'll volunteer to set up the sync and send invites to anyone that wants
>>> to attend. If you're interested, please reply with the email address you'd
>>> like to put on the invite list (if there's a way to do this without
>>> specific invites, let me know). Also for the first sync, please note what
>>> times would work for you so we can try to account for people in different
>>> time zones.
>>>
>>> For the first one, I was thinking some day next week (time TBD by those
>>> interested) and starting off with a general roadmap discussion before
>>> diving into specific technical topics.
>>>
>>> Thanks,
>>>
>>> rb
>>>
>>> --
>>> Ryan Blue
>>> Software Engineer
>>> Netflix
>>>
>>
>>
>> --
>> Ryan Blue
>> Software Engineer
>> Netflix
>>
>


Re: queryable state & streaming

2018-10-24 Thread Arun Mahadevan
I don't think separate API or RPCs etc might be necessary for queryable
state if the state can be exposed as just another datasource. Then the sql
queries can be issued against it just like executing sql queries against
any other data source.

For now I think the "memory" sink could be used  as a sink and run queries
against it but I agree it does not scale for large states.

On Sun, 21 Oct 2018 at 21:24, Jungtaek Lim  wrote:

> It doesn't seem Spark has workarounds other than storing output into
> external storages, so +1 on having this.
>
> My major concern on implementing queryable state in structured streaming
> is "Are all states available on executors at any time while query is
> running?" Querying state shouldn't affect the running query. Given that
> state is huge and default state provider is loading state in memory, we may
> not want to load one more redundant snapshot of state: we want to always
> load "current state" which query is also using. (For sure, Queryable state
> should be read-only.)
>
> Regarding improvement of local state, I guess it is ideal to leverage
> embedded db, like Kafka and Flink are doing. The difference will not be
> only reading state from non-heap, but also how to take a snapshot and store
> delta. We may want to check snapshotting works well with small batch
> interval, and find alternative approach when it doesn't. Sounds like it is
> a huge item and can be handled individually.
>
> - Jungtaek Lim (HeartSaVioR)
>
> 2017년 12월 9일 (토) 오후 10:51, Stavros Kontopoulos 님이
> 작성:
>
>> Nice I was looking for a jira. So I agree we should justify why we are
>> building something. Now to that direction here is what I have seen from my
>> experience.
>> People quite often use state within their streaming app and may have
>> large states (TBs). Shortening the pipeline by not having to copy data (to
>> Cassandra for example for serving) is an advantage, in terms of at least
>> latency and complexity.
>> This can be true if we advantage of state checkpointing (locally could be
>> RocksDB or in general HDFS the latter is currently supported)  along with
>> an API to efficiently query data.
>> Some use cases I see:
>>
>> - real-time dashboards and real-time reporting, the faster the better
>> - monitoring of state for operational reasons, app health etc...
>> - integrating with external services via an API eg. making accessible
>>  aggregations over time windows to some third party service within your
>> system
>>
>> Regarding requirements here are some of them:
>> - support of an API to expose state (could be done at the spark driver),
>> like rest.
>> - supporting dynamic allocation (not sure how it affects state
>> management)
>> - an efficient way to talk to executors to get the state (rpc?)
>> - making local state more efficient and easier accessible with an
>> embedded db (I dont think this is supported from what I see, maybe wrong)?
>> Some people are already working with such techs and some stuff could be
>> re-used: https://issues.apache.org/jira/browse/SPARK-20641
>>
>> Best,
>> Stavros
>>
>>
>> On Fri, Dec 8, 2017 at 10:32 PM, Michael Armbrust > > wrote:
>>
>>> https://issues.apache.org/jira/browse/SPARK-16738
>>>
>>> I don't believe anyone is working on it yet.  I think the most useful
>>> thing is to start enumerating requirements and use cases and then we can
>>> talk about how to build it.
>>>
>>> On Fri, Dec 8, 2017 at 10:47 AM, Stavros Kontopoulos <
>>> st.kontopou...@gmail.com> wrote:
>>>
 Cool Burak do you have a pointer, should I take the initiative for a
 first design document or Databricks is working on it?

 Best,
 Stavros

 On Fri, Dec 8, 2017 at 8:40 PM, Burak Yavuz  wrote:

> Hi Stavros,
>
> Queryable state is definitely on the roadmap! We will revamp the
> StateStore API a bit, and a queryable StateStore is definitely one of the
> things we are thinking about during that revamp.
>
> Best,
> Burak
>
> On Dec 8, 2017 9:57 AM, "Stavros Kontopoulos" <
> st.kontopou...@gmail.com> wrote:
>
>> Just to re-phrase my question: Would query-able state make a viable
>> SPIP?
>>
>> Regards,
>> Stavros
>>
>> On Thu, Dec 7, 2017 at 1:34 PM, Stavros Kontopoulos <
>> st.kontopou...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> Maybe this has been discussed before. Given the fact that many
>>> streaming apps out there use state extensively, could be a good idea to
>>> make Spark expose streaming state with an external API like other
>>> systems do (Kafka streams, Flink etc), in order to facilitate
>>> interactive queries?
>>>
>>> Regards,
>>> Stavros
>>>
>>
>>

>>>
>>


Re: welcome a new batch of committers

2018-10-03 Thread Arun Mahadevan
Congratulations everyone!

On Wed, 3 Oct 2018 at 09:16, Dilip Biswal  wrote:

> Congratulations to Shane Knapp, Dongjoon Hyun, Kazuaki Ishizaki, Xingbo
> Jiang,  Yinan Li  and Takeshi Yamamuro !!
>
> Regards,
> Dilip Biswal
> Tel: 408-463-4980
> dbis...@us.ibm.com
>
>
>
> - Original message -
> From: Reynold Xin 
> To: dev 
> Cc:
> Subject: welcome a new batch of committers
> Date: Wed, Oct 3, 2018 1:59 AM
>
> Hi all,
>
> The Apache Spark PMC has recently voted to add several new committers to
> the project, for their contributions:
>
> - Shane Knapp (contributor to infra)
> - Dongjoon Hyun (contributor to ORC support and other parts of Spark)
> - Kazuaki Ishizaki (contributor to Spark SQL)
> - Xingbo Jiang (contributor to Spark Core and SQL)
> - Yinan Li (contributor to Spark on Kubernetes)
> - Takeshi Yamamuro (contributor to Spark SQL)
>
> Please join me in welcoming them!
>
>
>
>
> - To
> unsubscribe e-mail: dev-unsubscr...@spark.apache.org


Re: DataSourceWriter V2 Api questions

2018-09-11 Thread Arun Mahadevan
>Some being said it is exactly-once when the output is eventually
exactly-once, whereas others being said there should be no side effect,
like consumer shouldn't see partial write. I guess 2PC is former, since
some partitions can commit earlier while other partitions fail to commit
for some time.
Yes its more about guaranteeing atomicity like all partitions eventually
commit or none commits. The visibility of the data for the readers is
orthogonal (e.g setting the isolation levels like serializable for XA) and
in general its difficult to guarantee that data across partitions are
visible at once. The approach like staging table and global commit works in
a centralized set up but can be difficult to do in a distributed manner
across partitions (e.g each partition output goes to a different database)

On Mon, 10 Sep 2018 at 21:23, Jungtaek Lim  wrote:

> IMHO that's up to how we would like to be strict about "exactly-once".
>
> Some being said it is exactly-once when the output is eventually
> exactly-once, whereas others being said there should be no side effect,
> like consumer shouldn't see partial write. I guess 2PC is former, since
> some partitions can commit earlier while other partitions fail to commit
> for some time.
>
> Being said, there may be couple of alternatives other than the contract
> Spark provides/requires, and I'd like to see how Spark community wants to
> deal with others. Would we want to disallow alternatives, like "replay +
> deduplicate write (per a batch/partition)" which ensures "eventually"
> exactly-once but cannot ensure the contract?
>
> Btw, unless achieving exactly-once is light enough for given sink, I think
> the sink should provide both at-least-once (also optimized for the
> semantic) vs exactly-once, and let end users pick one.
>
> 2018년 9월 11일 (화) 오후 12:57, Russell Spitzer 님이
> 작성:
>
>> Why is atomic operations a requirement? I feel like doubling the amount
>> of writes (with staging tables) is probably a tradeoff that the end user
>> should make.
>>
>> On Mon, Sep 10, 2018, 10:43 PM Wenchen Fan  wrote:
>>
>>> Regardless the API, to use Spark to write data atomically, it requires
>>> 1. Write data distributedly, with a central coordinator at Spark driver.
>>> 2. The distributed writers are not guaranteed to run together at the
>>> same time. (This can be relaxed if we can extend the barrier scheduling
>>> feature)
>>> 3. The new data is visible if and only if all distributed writers
>>> success.
>>>
>>> According to these requirements, I think using a staging table is the
>>> most common way and maybe the only way. I'm not sure how 2PC can help, we
>>> don't want users to read partial data, so we need a final step to commit
>>> all the data together.
>>>
>>> For RDBMS data sources, I think a simple solution is to ask users to
>>> coalesce the input RDD/DataFrame into one partition, then we don't need to
>>> care about multi-client transaction. Or using a staging table like Ryan
>>> described before.
>>>
>>>
>>>
>>> On Tue, Sep 11, 2018 at 5:10 AM Jungtaek Lim  wrote:
>>>
>>>> > And regarding the issue that Jungtaek brought up, 2PC doesn't require
>>>> tasks to be running at the same time, we need a mechanism to take down
>>>> tasks after they have prepared and bring up the tasks during the commit
>>>> phase.
>>>>
>>>> I guess we already got into too much details here, but if it is based
>>>> on client transaction Spark must assign "commit" tasks to the executor
>>>> which task was finished "prepare", and if it loses executor it is not
>>>> feasible to force committing. Staging should come into play for that.
>>>>
>>>> We should also have mechanism for "recovery": Spark needs to ensure it
>>>> finalizes "commit" even in case of failures before starting a new batch.
>>>>
>>>> So not an easy thing to integrate correctly.
>>>>
>>>> 2018년 9월 11일 (화) 오전 6:00, Arun Mahadevan 님이 작성:
>>>>
>>>>> >Well almost all relational databases you can move data in a
>>>>> transactional way. That’s what transactions are for.
>>>>>
>>>>> It would work, but I suspect in most cases it would involve moving
>>>>> data from temporary tables to the final tables
>>>>>
>>>>> Right now theres no mechanisms to let the individual tasks commit in a
>>>>> two-phase manner (Not sure if the CommitCordinator might

Re: DataSourceWriter V2 Api questions

2018-09-10 Thread Arun Mahadevan
>Well almost all relational databases you can move data in a transactional
way. That’s what transactions are for.

It would work, but I suspect in most cases it would involve moving data
from temporary tables to the final tables

Right now theres no mechanisms to let the individual tasks commit in a
two-phase manner (Not sure if the CommitCordinator might help). If such an
API is provided, the sources could use it as they wish (e.g. use XA support
provided by mysql to implement it in a more efficient way than the driver
moving from temp tables to destination tables).

Definitely there are complexities involved, but I am not sure if the
network partitioning comes into play here since the driver can act as the
co-ordinator and can run in HA mode. And regarding the issue that Jungtaek
brought up, 2PC doesn't require tasks to be running at the same time, we
need a mechanism to take down tasks after they have prepared and bring up
the tasks during the commit phase.

Most of the sources would not need any of the above and just need a way to
support Idempotent writes and like Ryan suggested we can enable this (if
there are gaps in the current APIs).


On Mon, 10 Sep 2018 at 13:43, Reynold Xin  wrote:

> Well almost all relational databases you can move data in a transactional
> way. That’s what transactions are for.
>
> For just straight HDFS, the move is a pretty fast operation so while it is
> not completely transactional, the window of potential failure is pretty
> short for appends. For writers at the partition level it is fine because it
> is just renaming directory, which is atomic.
>
> On Mon, Sep 10, 2018 at 1:40 PM Jungtaek Lim  wrote:
>
>> When network partitioning happens it is pretty OK for me to see 2PC not
>> working, cause we deal with global transaction. Recovery should be hard
>> thing to get it correctly though. I completely agree it would require
>> massive changes to Spark.
>>
>> What I couldn't find for underlying storages is moving data from staging
>> table to final table in transactional way. I'm not fully sure but as I'm
>> aware of, many storages would not support moving data, and even HDFS sink
>> it is not strictly done in transactional way since we move multiple files
>> with multiple operations. If coordinator just crashes it leaves partial
>> write, and among writers and coordinator need to deal with ensuring it will
>> not be going to be duplicated.
>>
>> Ryan replied me as Iceberg and HBase MVCC timestamps can enable us to
>> implement "commit" (his reply didn't hit dev. mailing list though) but I'm
>> not an expert of both twos and I couldn't still imagine it can deal with
>> various crash cases.
>>
>> 2018년 9월 11일 (화) 오전 5:17, Reynold Xin 님이 작성:
>>
>>> I don't think two phase commit would work here at all.
>>>
>>> 1. It'd require massive changes to Spark.
>>>
>>> 2. Unless the underlying data source can provide an API to coordinate
>>> commits (which few data sources I know provide something like that), 2PC
>>> wouldn't work in the presence of network partitioning. You can't defy the
>>> law of physics.
>>>
>>> Really the most common and simple way I've seen this working is through
>>> staging tables and a final transaction to move data from staging table to
>>> final table.
>>>
>>>
>>>
>>>
>>>
>>> On Mon, Sep 10, 2018 at 12:56 PM Jungtaek Lim  wrote:
>>>
>>>> I guess we all are aware of limitation of contract on DSv2 writer.
>>>> Actually it can be achieved only with HDFS sink (or other filesystem based
>>>> sinks) and other external storage are normally not feasible to implement it
>>>> because there's no way to couple a transaction with multiple clients as
>>>> well as coordinator can't take over transactions from writers to do the
>>>> final commit.
>>>>
>>>> XA is also not a trivial one to get it correctly with current execution
>>>> model: Spark doesn't require writer tasks to run at the same time but to
>>>> achieve 2PC they should run until end of transaction (closing client before
>>>> transaction ends normally means aborting transaction). Spark should also
>>>> integrate 2PC with its checkpointing mechanism to guarantee completeness of
>>>> batch. And it might require different integration for continuous mode.
>>>>
>>>> Jungtaek Lim (HeartSaVioR)
>>>>
>>>> 2018년 9월 11일 (화) 오전 4:37, Arun Mahadevan 님이 작성:
>>>>
>>>>> In some cases the implementations may be ok with eventual consistency
>>>>> (a

Re: DataSourceWriter V2 Api questions

2018-09-10 Thread Arun Mahadevan
In some cases the implementations may be ok with eventual consistency (and
does not care if the output is written out atomically)

XA can be one option for datasources that supports it and requires
atomicity but I am not sure how would one implement it with the current
API.

May be we need to discuss improvements at the Datasource V2 API level (e.g.
individual tasks would "prepare" for commit and once the driver receives
"prepared" from all the tasks, a "commit" would be invoked at each of the
individual tasks). Right now the responsibility of the final "commit" is
with the driver and it may not always be possible for the driver to take
over the transactions started by the tasks.


On Mon, 10 Sep 2018 at 11:48, Dilip Biswal  wrote:

> This is a pretty big challenge in general for data sources -- for the vast
> majority of data stores, the boundary of a transaction is per client. That
> is, you can't have two clients doing writes and coordinating a single
> transaction. That's certainly the case for almost all relational databases.
> Spark, on the other hand, will have multiple clients (consider each task a
> client) writing to the same underlying data store.
>
> DB>> Perhaps we can explore two-phase commit protocol (aka XA) for this ?
> Not sure how easy it is to implement this though :-)
>
> Regards,
> Dilip Biswal
> Tel: 408-463-4980
> dbis...@us.ibm.com
>
>
>
> - Original message -
> From: Reynold Xin 
> To: Ryan Blue 
> Cc: ross.law...@gmail.com, dev 
> Subject: Re: DataSourceWriter V2 Api questions
> Date: Mon, Sep 10, 2018 10:26 AM
>
> I don't think the problem is just whether we have a starting point for
> write. As a matter of fact there's always a starting point for write,
> whether it is explicit or implicit.
>
> This is a pretty big challenge in general for data sources -- for the vast
> majority of data stores, the boundary of a transaction is per client. That
> is, you can't have two clients doing writes and coordinating a single
> transaction. That's certainly the case for almost all relational databases.
> Spark, on the other hand, will have multiple clients (consider each task a
> client) writing to the same underlying data store.
>
> On Mon, Sep 10, 2018 at 10:19 AM Ryan Blue  wrote:
>
> Ross, I think the intent is to create a single transaction on the driver,
> write as part of it in each task, and then commit the transaction once the
> tasks complete. Is that possible in your implementation?
>
> I think that part of this is made more difficult by not having a clear
> starting point for a write, which we are fixing in the redesign of the v2
> API. That will have a method that creates a Write to track the operation.
> That can create your transaction when it is created and commit the
> transaction when commit is called on it.
>
> rb
>
> On Mon, Sep 10, 2018 at 9:05 AM Reynold Xin  wrote:
>
> Typically people do it via transactions, or staging tables.
>
>
> On Mon, Sep 10, 2018 at 2:07 AM Ross Lawley  wrote:
>
> Hi all,
>
> I've been prototyping an implementation of the DataSource V2 writer for
> the MongoDB Spark Connector and I have a couple of questions about how its
> intended to be used with database systems. According to the Javadoc for
> DataWriter.commit():
>
> *"this method should still "hide" the written data and ask the
> DataSourceWriter at driver side to do the final commit via
> WriterCommitMessage"*
>
> Although, MongoDB now has transactions, it doesn't have a way to "hide"
> the data once it has been written. So as soon as the DataWriter has
> committed the data, it has been inserted/updated in the collection and is
> discoverable - thereby breaking the documented contract.
>
> I was wondering how other databases systems plan to implement this API and
> meet the contract as per the Javadoc?
>
> Many thanks
>
> Ross
>
>
>
> --
> Ryan Blue
> Software Engineer
> Netflix
>
>
>
> - To
> unsubscribe e-mail: dev-unsubscr...@spark.apache.org


Re: Branch 2.4 is cut

2018-09-10 Thread Arun Mahadevan
Ryan's proposal makes a lot of sense. Its better not to release half-baked
changes in 2.4 which not only breaks a lot of the APIs released in 2.3, but
also expected to change further due redesigns before 3.0 so don't see much
value releasing it in 2.4.

On Sun, 9 Sep 2018 at 22:42, Wenchen Fan  wrote:

> Strictly speaking, data source v2 is always half-finished until we mark it
> as stable. We need some small milestones to move forward step by step.
>
> The redesign also happens in an incremental way. SPARK-24882 mostly focus
> on the "RDD" part of the API: the separation of reader factory and input
> partitions, the introduction of ScanConfig, etc. Then we focus on the
> high-level abstraction and want to change the "table" part of the API.
>
> In my understanding, each PR should be self-contained. If we are OK to
> have SPARK-24882 in master as an individual commit, I think it's also OK to
> have it in branch 2.4.
>
> I've created https://issues.apache.org/jira/browse/SPARK-25390 to track
> the new abstraction. It doesn't change the API a lot, but update the
> streaming execution engine quite a bit.
>
> Thanks,
> Wenchen
>
> On Mon, Sep 10, 2018 at 4:20 AM Ryan Blue  wrote:
>
>> Wenchen, can you hold off on the first RC?
>>
>> The half-finished changes from the redesign of the DataSourceV2 API are
>> in master, added in SPARK-24882
>> , and are now in the 2.4
>> branch. We've had a lot of good discussion since that PR was merged to
>> update and fix the design, plus only one of the follow-ups on SPARK-25186
>>  is done. Clearly,
>> the redesign was too large to get into 2.4 in so little time -- it was
>> proposed about 10 days before the original branch date -- and I don't think
>> it is a good idea to release half-finished major changes.
>>
>> The easiest solution is to revert SPARK-24882 in the release branch. That
>> way we have minor changes in 2.4 and major changes in the next release,
>> instead of major changes in both. What does everyone think?
>>
>> rb
>>
>> On Fri, Sep 7, 2018 at 10:37 AM shane knapp  wrote:
>>
>>> ++joshrosen  (thanks for the help w/deploying the jenkins configs)
>>>
>>> the basic 2.4 builds are deployed and building!
>>>
>>> i haven't created (a) build(s) yet for scala 2.12...  i'll be
>>> coordinating this w/the databricks folks next week.
>>>
>>> On Fri, Sep 7, 2018 at 9:53 AM, Dongjoon Hyun 
>>> wrote:
>>>
 Thank you, Shane! :D

 Bests,
 Dongjoon.

 On Fri, Sep 7, 2018 at 9:51 AM shane knapp  wrote:

> i'll try and get to the 2.4 branch stuff today...
>
>
>>>
>>>
>>> --
>>> Shane Knapp
>>> UC Berkeley EECS Research / RISELab Staff Technical Lead
>>> https://rise.cs.berkeley.edu
>>>
>>
>>
>> --
>> Ryan Blue
>> Software Engineer
>> Netflix
>>
>


Re: [Proposal] New feature: reconfigurable number of partitions on stateful operators in Structured Streaming

2018-08-03 Thread Arun Mahadevan
coalesce might work.

Say "spark.sql.shuffle.partitions" = 200, and then "
input.readStream.map.filter.groupByKey(..).coalesce(2)" would still
create 200 instances for state but execute just 2 tasks.

However I think further groupByKey operations downstream would need similar
coalesce.

And this is assuming the user sets the right shuffle partitions upfront.

It maybe worth to bundle this pattern as some builtin api so that it can be
transparent to the user. I am not sure how were you planning to expose the
state key groups at api level and if it would be transparent.

IMO, decoupling the state and partitions and making it key based would
still be worth exploring to support dynamic state rebalancing. May be the
default HDFS based implementation can maintain the state partition wise and
not support it, but there could be implementations based on distributed k-v
store which supports this.

Thanks,
Arun


On 3 August 2018 at 08:21, Joseph Torres 
wrote:

> A coalesced RDD will definitely maintain any within-partition invariants
> that the original RDD maintained. It pretty much just runs its input
> partitions sequentially.
>
> There'd still be some Dataframe API work needed to get the coalesce
> operation where you want it to be, but this is much simpler than
> introducing a new concept of state key groups. As far as I can tell,
> state key groups are just the same thing that we currently call partitions
> of the aggregate RDD.
>
> On Fri, Aug 3, 2018 at 8:01 AM, Jungtaek Lim  wrote:
>
>> I’m afraid I don’t know about the details on coalesce(), but some finding
>> resource for coalesce, it looks like helping reducing actual partitions.
>>
>> For streaming aggregation, state for all partitions (by default, 200)
>> must be initialized and committed even it is being unchanged. Otherwise
>> error occurred when reading a partition which is excluded in query
>> previously. Moreover, it can’t find existing row from state or store row in
>> wrong partition if partition id doesn’t match the expected id via hashing
>> function.
>>
>> Could you verify coalesce() meets such requirements?
>>
>> On Fri, 3 Aug 2018 at 22:23 Joseph Torres 
>> wrote:
>>
>>> Scheduling multiple partitions in the same task is basically what
>>> coalesce() does. Is there a reason that doesn't work here?
>>>
>>> On Fri, Aug 3, 2018 at 5:55 AM, Jungtaek Lim  wrote:
>>>
>>>> Here's a link for Google docs (anyone can comment):
>>>> https://docs.google.com/document/d/1DEOW3WQcPUq0YFgazkZx6Ei6
>>>> EOdj_3pXEsyq4LGpyNs/edit?usp=sharing
>>>>
>>>> Please note that I just copied the content to the google docs, so
>>>> someone could point out lack of details. I would like to start with
>>>> explanation of the concept, and once we are in agreement on going forward,
>>>> I could add more detail in doc, or even just start working and detail can
>>>> be shared with POC code or even WIP patch.
>>>>
>>>> Answer inlined for Arun's comments:
>>>>
>>>> 2018년 8월 3일 (금) 오후 5:39, Arun Mahadevan 님이 작성:
>>>>
>>>>> Can you share this in a google doc to make the discussions easier.?
>>>>>
>>>>>
>>>>>
>>>>> Thanks for coming up with ideas to improve upon the current
>>>>> restrictions with the SS state store.
>>>>>
>>>>>
>>>>>
>>>>> If I understood correctly, the plan is to introduce a logical
>>>>> partitioning scheme for state storage (based on keys) independent of
>>>>> spark’s partitioning so that the number of spark partitions can be varied.
>>>>>
>>>>>
>>>>>
>>>>> my 2 cents,
>>>>>
>>>>>
>>>>>
>>>>>1. The Partitioning is already a kind of a logical entity in
>>>>>Spark. Maybe this can be leveraged to over-partition in advance 
>>>>> (similar to
>>>>>setting the number of state key groups in your proposal) but make it 
>>>>> easy
>>>>>to run more than one task (partition) per core (I am not sure how easy 
>>>>> this
>>>>>is currently). Then we can continue to leverage the existing state
>>>>>implementation. This has similar limitations like what you pointed out 
>>>>> (the
>>>>>max number of partitions has to be fixed upfront). But once the over
>>>>>provisioning of partitions is made easy it could be leveraged even for
&g

Re: [Proposal] New feature: reconfigurable number of partitions on stateful operators in Structured Streaming

2018-08-03 Thread Arun Mahadevan
Can you share this in a google doc to make the discussions easier.?



Thanks for coming up with ideas to improve upon the current restrictions
with the SS state store.



If I understood correctly, the plan is to introduce a logical partitioning
scheme for state storage (based on keys) independent of spark’s
partitioning so that the number of spark partitions can be varied.



my 2 cents,



   1. The Partitioning is already a kind of a logical entity in Spark.
   Maybe this can be leveraged to over-partition in advance (similar to
   setting the number of state key groups in your proposal) but make it easy
   to run more than one task (partition) per core (I am not sure how easy this
   is currently). Then we can continue to leverage the existing state
   implementation. This has similar limitations like what you pointed out (the
   max number of partitions has to be fixed upfront). But once the over
   provisioning of partitions is made easy it could be leveraged even for
   non-stateful operations.



   1. Decouple the state from partition completely associate it only with
   the keys. This would be the most flexible option and we can scale the
   partitions up/down as we wish. This needs a scalable distributed state
   store implementation supporting fast look ups /storage by key.



Thanks,

Arun


On 2 August 2018 at 23:45, Jungtaek Lim  wrote:

> Hi Spark devs,
>
> I have a new feature to propose and hear opinions on community. Not sure
> it is such a big change to worth to step on SPIP, so posting to dev mailing
> list  instead.
>
> > Feature
>
> Reconfigurable number of partitions on state operators in Structured
> Streaming
>
> > Rationalization
>
> Nowadays, state in structured streaming is stored individually via
> partition given such configuration "spark.sql.shuffle.partitions" and
> cannot modify the configuration after the query is run once. One
> contributor already submitted a patch [1] without knowing why such
> restriction came into play.
>
> Such restriction for state is necessary because state is distributed by
> hash function applied to key columns, but as a side-effect of restriction,
> we can't change partitions of stateful operators. End users would have
> various workloads and also various SLA (and SLA can be changed), so
> restricting to specific count of partitions would not satisfy their needs.
> Moreover, end users are not easy to indicate the configuration before they
> run query, and realize they can't modify it when they try to modify it.
>
> > Proposal
>
> The feature proposes decoupling data partitions and operator partitions
> via introducing key groups to state, enabling scalability of operator
> partitions while state data partitions remain same (so no issue on state
> data). This approach is inspired by how Flink supports scalability with
> partitioned state.
>
> The concept itself is simple, while we apply such partitioning expression
> to the key columns (simplified):
>
> hash(key columns) % number of state operator partitions
>
> it will apply below partitioning expression so that it can be distributed
> via state data partitions but each state operator partition could handle
> multiple state data partitions.
>
> (hash(key columns) % number of state key groups) % number of state
> operator partitions
>
> The state data will not still be scalable actually, so the number of state
> key groups will be a new hard limit (we should restrict modifying it once
> query is run). But we can change the number of stateful operator partitions
> afterwards. The number of stateful operator partitions should be equal or
> smaller than the number of state key groups. (It doesn't make sense for
> partitions to be not assigned any state key group and idle.)
>
> > Possible Risk
>
> Performance might be affected, because either one should be performed:
>
> 1. each partition should calculate key group id per key
> 2. key group id should be calculated and inserted to the row before
> passing state operators (shuffle), and removed after passing state operators
>
> There's other performance concern like committing multiple states in a
> partition when number of operator partitions < number of state key groups,
> but we could run it concurrently (at least for HDFS state store), and
> actually it is also an issue for nowadays (all tasks may not be launched
> together).
>
> Code complexity would be introduced as expected.
>
> > Limitation
>
> For the first time, it will not support dynamic reconfiguration like
> changing the value during query is running. Actually it can be achieved
> simply via unloading all the state providers in executors before running
> next batch, but it would invalidate all state caches and may incur high
> latency to reload the state cache for previous batch. But I guess we could
> adopt it if we feel bigger merit for reconfiguring partitions of stateful
> operators against reloading state.
>
> > Rejected alternatives
>
> * Offline physical repartitioning of state 

Re: Sorting on a streaming dataframe

2018-04-24 Thread Arun Mahadevan
I guess sorting would make sense only when you have the complete data set. In 
streaming you don’t know what record is coming next so doesn’t make sense to 
sort it (except in the aggregated complete output mode where the entire result 
table is emitted each time and the results can be sorted).

Thanks,
Arun

From:  Hemant Bhanawat 
Date:  Tuesday, April 24, 2018 at 12:18 AM
To:  "Bowden, Chris" 
Cc:  Reynold Xin , dev 
Subject:  Re: Sorting on a streaming dataframe

Thanks Chris. There are many ways in which I can solve this problem but they 
are cumbersome. The easiest way would have been to sort the streaming 
dataframe. The reason I asked this question is because I could not find a 
reason why sorting on streaming dataframe is disallowed. 

Hemant

On Mon, Apr 16, 2018 at 6:09 PM, Bowden, Chris  
wrote:
You can happily sort the underlying RDD of InternalRow(s) inside a sink, 
assuming you are willing to implement and maintain your own sink(s). That is, 
just grabbing the parquet sink, etc. isn’t going to work out of the box. 
Alternatively map/flatMapGroupsWithState is probably sufficient and requires 
less working knowledge to make effective reuse of internals. Just group by foo 
and then sort accordingly and assign ids. The id counter can be stateful per 
group. Sometimes this problem may not need to be solved at all. For example, if 
you are using kafka, a proper partitioning scheme and message offsets may be 
“good enough”. From: Hemant Bhanawat 
Sent: Thursday, April 12, 2018 11:42:59 PM
To: Reynold Xin
Cc: dev
Subject: Re: Sorting on a streaming dataframe
 
Well, we want to assign snapshot ids (incrementing counters) to the incoming 
records. For that, we are zipping the streaming rdds with that counter using a 
modified version of ZippedWithIndexRDD. We are ok if the records in the 
streaming dataframe gets counters in random order but the counter should always 
be incrementing. 

This is working fine until we have a failure. When we have a failure, we 
re-assign the records to snapshot ids  and this time same snapshot id can get 
assigned to a different record. This is a problem because the primary key in 
our storage engine is . So we want to sort the dataframe 
so that the records always get the same snapshot id. 



On Fri, Apr 13, 2018 at 11:43 AM, Reynold Xin  wrote:
Can you describe your use case more?

On Thu, Apr 12, 2018 at 11:12 PM Hemant Bhanawat  wrote:
Hi Guys, 

Why is sorting on streaming dataframes not supported(unless it is complete 
mode)? My downstream needs me to sort the streaming dataframe.

Hemant