Re: StructuredStreaming Custom Sinks (motivated by Structured Streaming Machine Learning)

2016-10-14 Thread Fred Reiss
I think the way I phrased things earlier may be leading to some confusion
here. When I said "don't bring down my application", I was referring to the
application not meeting its end-to-end SLA, not to the app server crashing.

The groups I've talked to already isolate their front-end systems from
back-end systems with multiple solutions like message queues and key-value
stores. But some applications require a complex automated decision based on
information that is not available until the moment of the decision. Some
good examples are credit decisions, decisions about whether to blacklist a
hostile IP address, and product recommendations based on the user's
location and the link they just clicked. In all these cases, there is an
action that needs to happen in the real world, and that action has a
deadline, and you need to score a model to meet that deadline.

In a typical enterprise IT environment, the analytics tier is a much more
convenient place to run compute- and memory-intensive scoring tasks. The
hardware, software, and toolchain are tuned for the workload, and the data
science department has much more administrative control. So the question
naturally comes up: "Can I score my [credit/security/recommender/...]
models on the same infrastructure that I use to build them?"

Fred

On Thu, Oct 13, 2016 at 9:59 AM, Holden Karau  wrote:

> This is a thing I often have people ask me about, and then I do my best
> dissuade them from using Spark in the "hot path" and it's normally
> something which most people eventually accept. Fred might have more
> information for people for whom this is a hard requirement though.


> On Thursday, October 13, 2016, Cody Koeninger  wrote:
>
>> I've always been confused as to why it would ever be a good idea to
>> put any streaming query system on the critical path for synchronous  <
>> 100msec requests.  It seems to make a lot more sense to have a
>> streaming system do asynch updates of a store that has better latency
>> and quality of service characteristics for multiple users.  Then your
>> only latency concerns are event to update, not request to response.
>>
>> On Thu, Oct 13, 2016 at 10:39 AM, Fred Reiss 
>> wrote:
>> > On Tue, Oct 11, 2016 at 11:02 AM, Shivaram Venkataraman
>> >  wrote:
>> >>
>> >> >
>> >> Could you expand a little bit more on stability ? Is it just bursty
>> >> workloads in terms of peak vs. average throughput ? Also what level of
>> >> latencies do you find users care about ? Is it on the order of 2-3
>> >> seconds vs. 1 second vs. 100s of milliseconds ?
>> >> >
>> >
>> >
>> > Regarding stability, I've seen two levels of concrete requirements.
>> >
>> > The first is "don't bring down my Spark cluster". That is to say,
>> regardless
>> > of the input data rate, Spark shouldn't thrash or crash outright.
>> Processing
>> > may lag behind the data arrival rate, but the cluster should stay up and
>> > remain fully functional.
>> >
>> > The second level is "don't bring down my application". A common use for
>> > streaming systems is to handle heavyweight computations that are part
>> of a
>> > larger application, like a web application, a mobile app, or a plant
>> control
>> > system. For example, an online application for car insurance might need
>> to
>> > do some pretty involved machine learning to produce an accurate quote
>> and
>> > suggest good upsells to the customer. If the heavyweight portion times
>> out,
>> > the whole application times out, and you lose a customer.
>> >
>> > In terms of bursty vs. non-bursty, the "don't bring down my Spark
>> cluster
>> > case" is more about handling bursts, while the "don't bring down my
>> > application" case is more about delivering acceptable end-to-end
>> response
>> > times under typical load.
>> >
>> > Regarding latency: One group I talked to mentioned requirements in the
>> > 100-200 msec range, driven by the need to display a web page on a
>> browser or
>> > mobile device. Another group in the Internet of Things space mentioned
>> times
>> > ranging from 5 seconds to 30 seconds throughout the conversation. But
>> most
>> > people I've talked to have been pretty vague about specific numbers.
>> >
>> > My impression is that these groups are not motivated by anxiety about
>> > meeting a particular latency target for a particular application.
>> Rather,
>> > they want to make low latency the norm so that they can stop having to
>> think
>> > about latency. Today, low latency is a special requirement of special
>> > applications. But that policy imposes a lot of hidden costs. IT
>> architects
>> > have to spend time estimating the latency requirements of every
>> application
>> > and lobbying for special treatment when those requirements are strict.
>> > Managers have to spend time engineering business processes around
>> latency.
>> > Data scientists have to spend time packaging up models and negotiating
>> how

Re: StructuredStreaming Custom Sinks (motivated by Structured Streaming Machine Learning)

2016-10-13 Thread Shivaram Venkataraman
Thanks Fred for the detailed reply. The stability points are
especially interesting as a goal for the streaming component in Spark.
In terms of next steps, one approach that might be helpful is trying
to create benchmarks or situations that mimic real-life workloads and
then we can work on isolating specific changes that are required etc.
It'd also be great to hear other approaches / next steps to concretize
some of these goals.

Thanks
Shivaram

On Thu, Oct 13, 2016 at 8:39 AM, Fred Reiss  wrote:
> On Tue, Oct 11, 2016 at 11:02 AM, Shivaram Venkataraman
>  wrote:
>>
>> >
>> Could you expand a little bit more on stability ? Is it just bursty
>> workloads in terms of peak vs. average throughput ? Also what level of
>> latencies do you find users care about ? Is it on the order of 2-3
>> seconds vs. 1 second vs. 100s of milliseconds ?
>> >
>
>
> Regarding stability, I've seen two levels of concrete requirements.
>
> The first is "don't bring down my Spark cluster". That is to say, regardless
> of the input data rate, Spark shouldn't thrash or crash outright. Processing
> may lag behind the data arrival rate, but the cluster should stay up and
> remain fully functional.
>
> The second level is "don't bring down my application". A common use for
> streaming systems is to handle heavyweight computations that are part of a
> larger application, like a web application, a mobile app, or a plant control
> system. For example, an online application for car insurance might need to
> do some pretty involved machine learning to produce an accurate quote and
> suggest good upsells to the customer. If the heavyweight portion times out,
> the whole application times out, and you lose a customer.
>
> In terms of bursty vs. non-bursty, the "don't bring down my Spark cluster
> case" is more about handling bursts, while the "don't bring down my
> application" case is more about delivering acceptable end-to-end response
> times under typical load.
>
> Regarding latency: One group I talked to mentioned requirements in the
> 100-200 msec range, driven by the need to display a web page on a browser or
> mobile device. Another group in the Internet of Things space mentioned times
> ranging from 5 seconds to 30 seconds throughout the conversation. But most
> people I've talked to have been pretty vague about specific numbers.
>
> My impression is that these groups are not motivated by anxiety about
> meeting a particular latency target for a particular application. Rather,
> they want to make low latency the norm so that they can stop having to think
> about latency. Today, low latency is a special requirement of special
> applications. But that policy imposes a lot of hidden costs. IT architects
> have to spend time estimating the latency requirements of every application
> and lobbying for special treatment when those requirements are strict.
> Managers have to spend time engineering business processes around latency.
> Data scientists have to spend time packaging up models and negotiating how
> those models will be shipped over to the low-latency serving tier. And
> customers who are accustomed to Google and smartphones end up with an
> experience that is functional but unsatisfying.
>
> It's best to think of latency as a sliding scale. A given level of latency
> imposes a given level of cost enterprise-wide. Someone who is making a
> decision on middleware policy will balance this cost against other costs:
> How much does it cost to deploy the middleware? How much does it cost to
> train developers to use the system? The winner will be the system that
> minimizes the overall cost.
>
> Fred

-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org



Re: StructuredStreaming Custom Sinks (motivated by Structured Streaming Machine Learning)

2016-10-13 Thread Holden Karau
This is a thing I often have people ask me about, and then I do my best
dissuade them from using Spark in the "hot path" and it's normally
something which most people eventually accept. Fred might have more
information for people for whom this is a hard requirement though.

On Thursday, October 13, 2016, Cody Koeninger  wrote:

> I've always been confused as to why it would ever be a good idea to
> put any streaming query system on the critical path for synchronous  <
> 100msec requests.  It seems to make a lot more sense to have a
> streaming system do asynch updates of a store that has better latency
> and quality of service characteristics for multiple users.  Then your
> only latency concerns are event to update, not request to response.
>
> On Thu, Oct 13, 2016 at 10:39 AM, Fred Reiss  > wrote:
> > On Tue, Oct 11, 2016 at 11:02 AM, Shivaram Venkataraman
> > > wrote:
> >>
> >> >
> >> Could you expand a little bit more on stability ? Is it just bursty
> >> workloads in terms of peak vs. average throughput ? Also what level of
> >> latencies do you find users care about ? Is it on the order of 2-3
> >> seconds vs. 1 second vs. 100s of milliseconds ?
> >> >
> >
> >
> > Regarding stability, I've seen two levels of concrete requirements.
> >
> > The first is "don't bring down my Spark cluster". That is to say,
> regardless
> > of the input data rate, Spark shouldn't thrash or crash outright.
> Processing
> > may lag behind the data arrival rate, but the cluster should stay up and
> > remain fully functional.
> >
> > The second level is "don't bring down my application". A common use for
> > streaming systems is to handle heavyweight computations that are part of
> a
> > larger application, like a web application, a mobile app, or a plant
> control
> > system. For example, an online application for car insurance might need
> to
> > do some pretty involved machine learning to produce an accurate quote and
> > suggest good upsells to the customer. If the heavyweight portion times
> out,
> > the whole application times out, and you lose a customer.
> >
> > In terms of bursty vs. non-bursty, the "don't bring down my Spark cluster
> > case" is more about handling bursts, while the "don't bring down my
> > application" case is more about delivering acceptable end-to-end response
> > times under typical load.
> >
> > Regarding latency: One group I talked to mentioned requirements in the
> > 100-200 msec range, driven by the need to display a web page on a
> browser or
> > mobile device. Another group in the Internet of Things space mentioned
> times
> > ranging from 5 seconds to 30 seconds throughout the conversation. But
> most
> > people I've talked to have been pretty vague about specific numbers.
> >
> > My impression is that these groups are not motivated by anxiety about
> > meeting a particular latency target for a particular application. Rather,
> > they want to make low latency the norm so that they can stop having to
> think
> > about latency. Today, low latency is a special requirement of special
> > applications. But that policy imposes a lot of hidden costs. IT
> architects
> > have to spend time estimating the latency requirements of every
> application
> > and lobbying for special treatment when those requirements are strict.
> > Managers have to spend time engineering business processes around
> latency.
> > Data scientists have to spend time packaging up models and negotiating
> how
> > those models will be shipped over to the low-latency serving tier. And
> > customers who are accustomed to Google and smartphones end up with an
> > experience that is functional but unsatisfying.
> >
> > It's best to think of latency as a sliding scale. A given level of
> latency
> > imposes a given level of cost enterprise-wide. Someone who is making a
> > decision on middleware policy will balance this cost against other costs:
> > How much does it cost to deploy the middleware? How much does it cost to
> > train developers to use the system? The winner will be the system that
> > minimizes the overall cost.
> >
> > Fred
>
> -
> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org 
>
>

-- 
Cell : 425-233-8271
Twitter: https://twitter.com/holdenkarau


Re: StructuredStreaming Custom Sinks (motivated by Structured Streaming Machine Learning)

2016-10-13 Thread Cody Koeninger
I've always been confused as to why it would ever be a good idea to
put any streaming query system on the critical path for synchronous  <
100msec requests.  It seems to make a lot more sense to have a
streaming system do asynch updates of a store that has better latency
and quality of service characteristics for multiple users.  Then your
only latency concerns are event to update, not request to response.

On Thu, Oct 13, 2016 at 10:39 AM, Fred Reiss  wrote:
> On Tue, Oct 11, 2016 at 11:02 AM, Shivaram Venkataraman
>  wrote:
>>
>> >
>> Could you expand a little bit more on stability ? Is it just bursty
>> workloads in terms of peak vs. average throughput ? Also what level of
>> latencies do you find users care about ? Is it on the order of 2-3
>> seconds vs. 1 second vs. 100s of milliseconds ?
>> >
>
>
> Regarding stability, I've seen two levels of concrete requirements.
>
> The first is "don't bring down my Spark cluster". That is to say, regardless
> of the input data rate, Spark shouldn't thrash or crash outright. Processing
> may lag behind the data arrival rate, but the cluster should stay up and
> remain fully functional.
>
> The second level is "don't bring down my application". A common use for
> streaming systems is to handle heavyweight computations that are part of a
> larger application, like a web application, a mobile app, or a plant control
> system. For example, an online application for car insurance might need to
> do some pretty involved machine learning to produce an accurate quote and
> suggest good upsells to the customer. If the heavyweight portion times out,
> the whole application times out, and you lose a customer.
>
> In terms of bursty vs. non-bursty, the "don't bring down my Spark cluster
> case" is more about handling bursts, while the "don't bring down my
> application" case is more about delivering acceptable end-to-end response
> times under typical load.
>
> Regarding latency: One group I talked to mentioned requirements in the
> 100-200 msec range, driven by the need to display a web page on a browser or
> mobile device. Another group in the Internet of Things space mentioned times
> ranging from 5 seconds to 30 seconds throughout the conversation. But most
> people I've talked to have been pretty vague about specific numbers.
>
> My impression is that these groups are not motivated by anxiety about
> meeting a particular latency target for a particular application. Rather,
> they want to make low latency the norm so that they can stop having to think
> about latency. Today, low latency is a special requirement of special
> applications. But that policy imposes a lot of hidden costs. IT architects
> have to spend time estimating the latency requirements of every application
> and lobbying for special treatment when those requirements are strict.
> Managers have to spend time engineering business processes around latency.
> Data scientists have to spend time packaging up models and negotiating how
> those models will be shipped over to the low-latency serving tier. And
> customers who are accustomed to Google and smartphones end up with an
> experience that is functional but unsatisfying.
>
> It's best to think of latency as a sliding scale. A given level of latency
> imposes a given level of cost enterprise-wide. Someone who is making a
> decision on middleware policy will balance this cost against other costs:
> How much does it cost to deploy the middleware? How much does it cost to
> train developers to use the system? The winner will be the system that
> minimizes the overall cost.
>
> Fred

-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org



Re: StructuredStreaming Custom Sinks (motivated by Structured Streaming Machine Learning)

2016-10-13 Thread Fred Reiss
On Tue, Oct 11, 2016 at 11:02 AM, Shivaram Venkataraman <
shiva...@eecs.berkeley.edu> wrote:
>
> >
> Could you expand a little bit more on stability ? Is it just bursty
> workloads in terms of peak vs. average throughput ? Also what level of
> latencies do you find users care about ? Is it on the order of 2-3
> seconds vs. 1 second vs. 100s of milliseconds ?
> >
>

Regarding stability, I've seen two levels of concrete requirements.

The first is "don't bring down my Spark cluster". That is to say,
regardless of the input data rate, Spark shouldn't thrash or crash
outright. Processing may lag behind the data arrival rate, but the cluster
should stay up and remain fully functional.

The second level is "don't bring down my application". A common use for
streaming systems is to handle heavyweight computations that are part of a
larger application, like a web application, a mobile app, or a plant
control system. For example, an online application for car insurance might
need to do some pretty involved machine learning to produce an accurate
quote and suggest good upsells to the customer. If the heavyweight portion
times out, the whole application times out, and you lose a customer.

In terms of bursty vs. non-bursty, the "don't bring down my Spark cluster
case" is more about handling bursts, while the "don't bring down my
application" case is more about delivering acceptable end-to-end response
times under typical load.

Regarding latency: One group I talked to mentioned requirements in the
100-200 msec range, driven by the need to display a web page on a browser
or mobile device. Another group in the Internet of Things space mentioned
times ranging from 5 seconds to 30 seconds throughout the conversation. But
most people I've talked to have been pretty vague about specific numbers.

My impression is that these groups are not motivated by anxiety about
meeting a particular latency target for a particular application. Rather,
they want to make low latency the norm so that they can stop having to
think about latency. Today, low latency is a special requirement of special
applications. But that policy imposes a lot of hidden costs. IT architects
have to spend time estimating the latency requirements of every application
and lobbying for special treatment when those requirements are strict.
Managers have to spend time engineering business processes around latency.
Data scientists have to spend time packaging up models and negotiating how
those models will be shipped over to the low-latency serving tier. And
customers who are accustomed to Google and smartphones end up with an
experience that is functional but unsatisfying.

It's best to think of latency as a sliding scale. A given level of latency
imposes a given level of cost enterprise-wide. Someone who is making a
decision on middleware policy will balance this cost against other costs:
How much does it cost to deploy the middleware? How much does it cost to
train developers to use the system? The winner will be the system that
minimizes the overall cost.

Fred


Re: StructuredStreaming Custom Sinks (motivated by Structured Streaming Machine Learning)

2016-10-12 Thread Fred Reiss
On Tue, Oct 11, 2016 at 10:57 AM, Reynold Xin  wrote:

>
> On Tue, Oct 11, 2016 at 10:55 AM, Michael Armbrust  > wrote:
>
>> *Complex event processing and state management:* Several groups I've
>>> talked to want to run a large number (tens or hundreds of thousands now,
>>> millions in the near future) of state machines over low-rate partitions of
>>> a high-rate stream. Covering these use cases translates roughly into a
>>> three sub-requirements: maintaining lots of persistent state efficiently,
>>> feeding tuples to each state machine in the right order, and exposing
>>> convenient programmer APIs for complex event detection and signal
>>> processing tasks.
>>>
>>
>> I've heard this one too, but don't know of anyone actively working on
>> it.  Would be awesome to open a JIRA and start discussing what the APIs
>> would look like.
>>
>
> There is an existing ticket for CEP: https://issues.apache.org
> /jira/browse/SPARK-14745
>
>
>
Yeah, Mario and Sachin opened up that CEP ticket a while back, and they had
an early prototype (https://github.com/apache/spark/pull/12518) on the old
DStream APIs. The project stalled out due to uncertainty about how state
management and streaming query languages would work on Structured
Streaming. The people who were working on it are now focusing on other
issues.

Getting CEP to work efficiently is a whole-stack affair. You need optimizer
support for things like pulling out common subexpressions from event specs
and deciding between eager vs. lazy evaluation for predicates. You need
good fine-grained state management in the engine, including support for
efficiently handling out-of-order event arrival. And CEP workloads with a
large number of interdependent, stateful tasks will put stress on the
scheduling layer.

Fred


Re: StructuredStreaming Custom Sinks (motivated by Structured Streaming Machine Learning)

2016-10-11 Thread Shivaram Venkataraman
Thanks Fred - that is very helpful.

> Delivering low latency, high throughput, and stability simultaneously: Right
> now, our own tests indicate you can get at most two of these characteristics
> out of Spark Streaming at the same time. I know of two parties that have
> abandoned Spark Streaming because "pick any two" is not an acceptable answer
> to the latency/throughput/stability question for them.
>
Could you expand a little bit more on stability ? Is it just bursty
workloads in terms of peak vs. average throughput ? Also what level of
latencies do you find users care about ? Is it on the order of 2-3
seconds vs. 1 second vs. 100s of milliseconds ?
>

-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org



Re: StructuredStreaming Custom Sinks (motivated by Structured Streaming Machine Learning)

2016-10-11 Thread Reynold Xin
On Tue, Oct 11, 2016 at 10:55 AM, Michael Armbrust 
wrote:

> *Complex event processing and state management:* Several groups I've
>> talked to want to run a large number (tens or hundreds of thousands now,
>> millions in the near future) of state machines over low-rate partitions of
>> a high-rate stream. Covering these use cases translates roughly into a
>> three sub-requirements: maintaining lots of persistent state efficiently,
>> feeding tuples to each state machine in the right order, and exposing
>> convenient programmer APIs for complex event detection and signal
>> processing tasks.
>>
>
> I've heard this one too, but don't know of anyone actively working on it.
> Would be awesome to open a JIRA and start discussing what the APIs would
> look like.
>

There is an existing ticket for CEP:
https://issues.apache.org/jira/browse/SPARK-14745


Re: StructuredStreaming Custom Sinks (motivated by Structured Streaming Machine Learning)

2016-10-11 Thread Michael Armbrust
This is super helpful, thanks for writing it up!


> *Delivering low latency, high throughput, and stability simultaneously:* Right
> now, our own tests indicate you can get at most two of these
> characteristics out of Spark Streaming at the same time. I know of two
> parties that have abandoned Spark Streaming because "pick any two" is not
> an acceptable answer to the latency/throughput/stability question for them.
>

Agree, this should be the major focus.


> *Complex event processing and state management:* Several groups I've
> talked to want to run a large number (tens or hundreds of thousands now,
> millions in the near future) of state machines over low-rate partitions of
> a high-rate stream. Covering these use cases translates roughly into a
> three sub-requirements: maintaining lots of persistent state efficiently,
> feeding tuples to each state machine in the right order, and exposing
> convenient programmer APIs for complex event detection and signal
> processing tasks.
>

I've heard this one too, but don't know of anyone actively working on it.
Would be awesome to open a JIRA and start discussing what the APIs would
look like.


> *Job graph scheduling and access to Dataset APIs: *These requirements
> come up in the context of groups who want to do streaming ETL. The general
> application profile that I've seen involves updating a large number of
> materialized views based on a smaller number of streams, using a mixture of
> SQL and nontraditional processing. The extended SQL that the Dataset APIs
> provide is useful in these applications. As for scheduling needs, it's
> common for multiple output tables to share intermediate computations. Users
> need an easy way to ensure that this shared computation happens only once,
> while controlling the peak memory utilization during each batch.
>

This sounds like two separate things to me.  High-level APIs (are streaming
DataFrames / Datasets missing anything?) and multi-query optimization for
streams.  I've been thinking about the latter.  I think we probably want to
crush latency/throughput/stability in the simpler case first, but after
that I think there is a lot of machinery already in SQL we can reuse (i.e.
the sameResult calculations used for caching).


StructuredStreaming Custom Sinks (motivated by Structured Streaming Machine Learning)

2016-10-11 Thread Fred Reiss
On Thu, Oct 6, 2016 at 12:37 PM, Michael Armbrust > wrote:
>
> [snip!]
> Relatedly, I'm curious to hear more about the types of questions you are
> getting.  I think the dev list is a good place to discuss applications and
> if/how structured streaming can handle them.
>

Details are difficult to share, but I can give the general gist without
revealing anything proprietary.

I find myself having the same conversation about twice a month. The other
party to the conversation is an IBM product group or an IBM client who
is using Spark for batch and interactive analytics. Their overall
application has or will soon have a real-time component. They want
information from the IBM Spark Technology Center on the relative merits
of different streaming systems for that part of the application or product.
Usually, the options on the table are Spark Streaming/Structured Streaming
and another more "pure" streaming system like Apache Flink or IBM Streams.

Right now, the best recommendation I can give is: "Spark Streaming has
known shortcomings; here's a list. If you are certain that your application
can work within these constraints, then we recommend you give Spark
Streaming a try. Otherwise, check back 12-18 months from now, when
Structured Streaming will hopefully provide a usable platform for your
application."

The specific unmet requirements that are most relevant to these
conversations are: latency, throughput, stability under bursty loads,
complex event processing support, state management, job graph scheduling,
and access to the newer Dataset-based Spark APIs.

Again, apologies for not being able to name names, but here's a redacted
description of why these requirements are relevant.

*Delivering low latency, high throughput, and stability simultaneously:* Right
now, our own tests indicate you can get at most two of these
characteristics out of Spark Streaming at the same time. I know of two
parties that have abandoned Spark Streaming because "pick any two" is not
an acceptable answer to the latency/throughput/stability question for them.

*Complex event processing and state management:* Several groups I've talked
to want to run a large number (tens or hundreds of thousands now, millions
in the near future) of state machines over low-rate partitions of a
high-rate stream. Covering these use cases translates roughly into a three
sub-requirements: maintaining lots of persistent state efficiently, feeding
tuples to each state machine in the right order, and exposing convenient
programmer APIs for complex event detection and signal processing tasks.

*Job graph scheduling and access to Dataset APIs: *These requirements come
up in the context of groups who want to do streaming ETL. The general
application profile that I've seen involves updating a large number of
materialized views based on a smaller number of streams, using a mixture of
SQL and nontraditional processing. The extended SQL that the Dataset APIs
provide is useful in these applications. As for scheduling needs, it's
common for multiple output tables to share intermediate computations. Users
need an easy way to ensure that this shared computation happens only once,
while controlling the peak memory utilization during each batch.

Hope this helps.

Fred


Re: StructuredStreaming Custom Sinks (motivated by Structured Streaming Machine Learning)

2016-10-06 Thread Michael Armbrust
Fred, I think thats a pretty good summary of my thoughts.  Thanks for
condensing them :)

Right now, my focus is to get more people using Structured Streaming so
that we can get some real world feedback on what is missing.  Right now
this means:
 - SPARK-15406  Kafka
Support - since this seems to be the source of choice for many users
 - SPARK-17731  Metrics
- right now its pretty hard to see what is going on, and where latency is
coming from.

Once those are in and see some use, I think it'll be easier to prioritize
the work on #1.

Relatedly, I'm curious to hear more about the types of questions you are
getting.  I think the dev list is a good place to discuss applications and
if/how structured streaming can handle them.

On Wed, Oct 5, 2016 at 3:20 PM, Fred Reiss  wrote:

> Thanks for the thoughtful comments, Michael and Shivaram. From what I’ve
> seen in this thread and on JIRA, it looks like the current plan with regard
> to application-facing APIs for sinks is roughly:
> 1. Rewrite incremental query compilation for Structured Streaming.
> 2. Redesign Structured Streaming's source and sink APIs so that they do
> not depend on RDDs.
> 3. Allow the new APIs to stabilize.
> 4. Open these APIs to use by application code.
>
> Is there a way for those of us who aren’t involved in the first two steps
> to get some idea of the current plans and progress? I get asked a lot about
> when Structured Streaming will be a viable replacement for Spark Streaming,
> and I like to be able to give accurate advice.
>
> Fred
>
> On Tue, Oct 4, 2016 at 3:02 PM, Michael Armbrust 
> wrote:
>
>> I don't quite understand why exposing it indirectly through a typed
>>> interface should be delayed before finalizing the API.
>>>
>>
>> Spark has a long history
>>  of maintaining
>> binary compatibility in its public APIs.  I strongly believe this is one of
>> the things that has made the project successful.  Exposing internals that
>> we know are going to change in the primary user facing API for creating
>> Streaming DataFrames seems directly counter to this goal.  I think the
>> argument that "you can do it anyway" fails to capture user expectations who
>> probably aren't closely following this discussion.
>>
>> If advanced users want to dig though the code and experiment, great.  I
>> hope they report back on whats good and what can be improved.  However, if
>> you add the function suggested in the PR to DataStreamReader, you are
>> giving them a bad experience by leaking internals that don't even show up
>> in the published documentation.
>>
>
>


Re: StructuredStreaming Custom Sinks (motivated by Structured Streaming Machine Learning)

2016-10-05 Thread Fred Reiss
Thanks for the thoughtful comments, Michael and Shivaram. From what I’ve
seen in this thread and on JIRA, it looks like the current plan with regard
to application-facing APIs for sinks is roughly:
1. Rewrite incremental query compilation for Structured Streaming.
2. Redesign Structured Streaming's source and sink APIs so that they do not
depend on RDDs.
3. Allow the new APIs to stabilize.
4. Open these APIs to use by application code.

Is there a way for those of us who aren’t involved in the first two steps
to get some idea of the current plans and progress? I get asked a lot about
when Structured Streaming will be a viable replacement for Spark Streaming,
and I like to be able to give accurate advice.

Fred

On Tue, Oct 4, 2016 at 3:02 PM, Michael Armbrust 
wrote:

> I don't quite understand why exposing it indirectly through a typed
>> interface should be delayed before finalizing the API.
>>
>
> Spark has a long history
>  of maintaining
> binary compatibility in its public APIs.  I strongly believe this is one of
> the things that has made the project successful.  Exposing internals that
> we know are going to change in the primary user facing API for creating
> Streaming DataFrames seems directly counter to this goal.  I think the
> argument that "you can do it anyway" fails to capture user expectations who
> probably aren't closely following this discussion.
>
> If advanced users want to dig though the code and experiment, great.  I
> hope they report back on whats good and what can be improved.  However, if
> you add the function suggested in the PR to DataStreamReader, you are
> giving them a bad experience by leaking internals that don't even show up
> in the published documentation.
>


Re: StructuredStreaming Custom Sinks (motivated by Structured Streaming Machine Learning)

2016-10-04 Thread Michael Armbrust
>
> I don't quite understand why exposing it indirectly through a typed
> interface should be delayed before finalizing the API.
>

Spark has a long history
 of maintaining
binary compatibility in its public APIs.  I strongly believe this is one of
the things that has made the project successful.  Exposing internals that
we know are going to change in the primary user facing API for creating
Streaming DataFrames seems directly counter to this goal.  I think the
argument that "you can do it anyway" fails to capture user expectations who
probably aren't closely following this discussion.

If advanced users want to dig though the code and experiment, great.  I
hope they report back on whats good and what can be improved.  However, if
you add the function suggested in the PR to DataStreamReader, you are
giving them a bad experience by leaking internals that don't even show up
in the published documentation.


Re: StructuredStreaming Custom Sinks (motivated by Structured Streaming Machine Learning)

2016-10-04 Thread Jakob Odersky
Hi everyone,

is there any ongoing discussion/documentation on the redesign of sinks?
I think it could be a good thing to abstract away the underlying
streaming model, however that isn't directly related to Holden's first
point. The way I understand it, is to slightly change the
DataStreamWriter API (the thing that's returned when you call
"df.writeStream") to allow passing in a custom sink provider instead
of only accepting strings. This would allow users to write their own
providers and sinks, and give them a strongly typed, possibly generic
way to do so. The sink api is currently available to users indirectly
(you can create your own sink provider and load it with the built-in
DataSource reflection functionality), therefore I don't quite
understand why exposing it indirectly through a typed interface should
be delayed before finalizing the API.
On a side note, I saw that sources have a similar limitation in that
they are currently only available through a stringly-typed interface.
Could a similar solution be applied to sources? Maybe the writer and
reader api's could even be unified to a certain degree.

Shivaram, I like your ideas on the proposed redesign! Can we discuss
this further?

cheers,
--Jakob


On Mon, Sep 26, 2016 at 5:12 PM, Shivaram Venkataraman
 wrote:
> Disclaimer - I am not very closely involved with Structured Streaming
> design / development, so this is just my two cents from looking at the
> discussion in the linked JIRAs and PRs.
>
> It seems to me there are a couple of issues being conflated here: (a)
> is the question of how to specify or add more functionality to the
> Sink API such as ability to get model updates back to the driver [A
> design issue IMHO] (b) question of how to pass parameters to
> DataFrameWriter, esp. strings vs. typed objects and whether the API is
> stable vs. experimental
>
> TLDR is that I think we should first focus on refactoring the Sink and
> add new functionality after that. Detailed comments below.
>
> Sink design / functionality: Looking at SPARK-10815, a JIRA linked
> from SPARK-16407, it looks like the existing Sink API is limited
> because it is tied to the RDD/Dataframe definitions. It also has
> surprising limitations like not being able to run operators on `data`
> and only using `collect/foreach`.  Given these limitations, I think it
> makes sense to redesign the Sink API first *before* adding new
> functionality to the existing Sink. I understand that we have not
> marked this experimental in 2.0.0 -- but I guess since
> StructuredStreaming is new as a whole, so we can probably break the
> Sink API in a upcoming 2.1.0 release.
>
> As a part of the redesign, I think we need to do two things: (i) come
> up with a new data handle that separates RDD from what is passed to
> the Sink (ii) Have some way to specify code that can run on the
> driver. This might not be an issue if the data handle already has
> clean abstraction for this.
>
> Micro-batching: Ideally it would be good to not expose the micro-batch
> processing model in the Sink API as this might change going forward.
> Given the consistency model we are presenting I think there will be
> some notion of batch / time-range identifier in the API. But I think
> if we can avoid having hard constraints on where functions will get
> run (i.e. on the driver vs. as a part of a job etc.) and when
> functions will get run (i.e. strictly after every micro-batch) it
> might give us more freedom in improving performance going forward [1].
>
> Parameter passing: I think your point that typed is better than
> untyped is pretty good and supporting both APIs isn't necessarily bad
> either. My understand of the discussion around this is that we should
> do this after Sink is refactored to avoid exposing the old APIs ?
>
> Thanks
> Shivaram
>
> [1] FWIW this is something I am looking at and
> https://spark-summit.org/2016/events/low-latency-execution-for-apache-spark/
> has some details about this.
>
>
> On Mon, Sep 26, 2016 at 1:38 PM, Holden Karau  wrote:
>> Hi Spark Developers,
>>
>>
>> After some discussion on SPARK-16407 (and on the PR) we’ve decided to jump
>> back to the developer list (SPARK-16407 itself comes from our early work on
>> SPARK-16424 to enable ML with the new Structured Streaming API). SPARK-16407
>> is proposing to extend the current DataStreamWriter API to allow users to
>> specify a specific instance of a StreamSinkProvider - this makes it easier
>> for users to create sinks that are configured with things besides strings
>> (for example things like lambdas). An example of something like this already
>> inside Spark is the ForeachSink.
>>
>>
>> We have been working on adding support for online learning in Structured
>> Streaming, similar to what Spark Streaming and MLLib provide today. Details
>> are available in  SPARK-16424. Along the way, we noticed that there is
>> currently no way for code running in the driver to access the 

Re: StructuredStreaming Custom Sinks (motivated by Structured Streaming Machine Learning)

2016-09-26 Thread Shivaram Venkataraman
Disclaimer - I am not very closely involved with Structured Streaming
design / development, so this is just my two cents from looking at the
discussion in the linked JIRAs and PRs.

It seems to me there are a couple of issues being conflated here: (a)
is the question of how to specify or add more functionality to the
Sink API such as ability to get model updates back to the driver [A
design issue IMHO] (b) question of how to pass parameters to
DataFrameWriter, esp. strings vs. typed objects and whether the API is
stable vs. experimental

TLDR is that I think we should first focus on refactoring the Sink and
add new functionality after that. Detailed comments below.

Sink design / functionality: Looking at SPARK-10815, a JIRA linked
from SPARK-16407, it looks like the existing Sink API is limited
because it is tied to the RDD/Dataframe definitions. It also has
surprising limitations like not being able to run operators on `data`
and only using `collect/foreach`.  Given these limitations, I think it
makes sense to redesign the Sink API first *before* adding new
functionality to the existing Sink. I understand that we have not
marked this experimental in 2.0.0 -- but I guess since
StructuredStreaming is new as a whole, so we can probably break the
Sink API in a upcoming 2.1.0 release.

As a part of the redesign, I think we need to do two things: (i) come
up with a new data handle that separates RDD from what is passed to
the Sink (ii) Have some way to specify code that can run on the
driver. This might not be an issue if the data handle already has
clean abstraction for this.

Micro-batching: Ideally it would be good to not expose the micro-batch
processing model in the Sink API as this might change going forward.
Given the consistency model we are presenting I think there will be
some notion of batch / time-range identifier in the API. But I think
if we can avoid having hard constraints on where functions will get
run (i.e. on the driver vs. as a part of a job etc.) and when
functions will get run (i.e. strictly after every micro-batch) it
might give us more freedom in improving performance going forward [1].

Parameter passing: I think your point that typed is better than
untyped is pretty good and supporting both APIs isn't necessarily bad
either. My understand of the discussion around this is that we should
do this after Sink is refactored to avoid exposing the old APIs ?

Thanks
Shivaram

[1] FWIW this is something I am looking at and
https://spark-summit.org/2016/events/low-latency-execution-for-apache-spark/
has some details about this.


On Mon, Sep 26, 2016 at 1:38 PM, Holden Karau  wrote:
> Hi Spark Developers,
>
>
> After some discussion on SPARK-16407 (and on the PR) we’ve decided to jump
> back to the developer list (SPARK-16407 itself comes from our early work on
> SPARK-16424 to enable ML with the new Structured Streaming API). SPARK-16407
> is proposing to extend the current DataStreamWriter API to allow users to
> specify a specific instance of a StreamSinkProvider - this makes it easier
> for users to create sinks that are configured with things besides strings
> (for example things like lambdas). An example of something like this already
> inside Spark is the ForeachSink.
>
>
> We have been working on adding support for online learning in Structured
> Streaming, similar to what Spark Streaming and MLLib provide today. Details
> are available in  SPARK-16424. Along the way, we noticed that there is
> currently no way for code running in the driver to access the streaming
> output of a Structured Streaming query (in our case ideally as an Dataset or
> RDD - but regardless of the underlying data structure). In our specific
> case, we wanted to update a model in the driver using aggregates computed by
> a Structured Streaming query.
>
>
> A lot of other applications are going to have similar requirements. For
> example, there is no way (outside of using private Spark internals)* to
> implement a console sink with a user supplied formatting function, or
> configure a templated or generic sink at runtime, trigger a custom Python
> call-back or even implement the ForeachSink outside of Spark. For work
> inside of Spark to enable Structured Streaming with ML we clearly don’t need
> SPARK-16407 as we can directly access the internals (although it would be
> cleaner to not have to) but if we want to empower people working outside of
> the Spark codebase itself with Structured Streaming I think we need to
> provide some mechanism for this and it would be great to see what
> options/ideas the community can come up with.
>
>
> One of the arguments against SPARK-16407 seems to be mostly that it exposes
> the Sink API which is implemented using micro-batching, but the counter
> argument to this is that the Sink API is already exposed (instead of passing
> in an instance the user needs to pass in a class name which is then created
> through reflection and has configuration parameters 

StructuredStreaming Custom Sinks (motivated by Structured Streaming Machine Learning)

2016-09-26 Thread Holden Karau
Hi Spark Developers,

After some discussion on SPARK-16407
 (and on the PR
) we’ve decided to jump back to
the developer list (SPARK-16407
 itself comes from our
early work on SPARK-16424
 to enable ML with the
new Structured Streaming API). SPARK-16407 is proposing to extend the
current DataStreamWriter API to allow users to specify a specific instance
of a StreamSinkProvider

- this makes it easier for users to create sinks that are configured with
things besides strings (for example things like lambdas). An example of
something like this already inside Spark is the ForeachSink.

We have been working on adding support for online learning in Structured
Streaming, similar to what Spark Streaming and MLLib provide today. Details
are available in  SPARK-16424
. Along the way, we
noticed that there is currently no way for code running in the driver to
access the streaming output of a Structured Streaming query (in our case
ideally as an Dataset or RDD - but regardless of the underlying data
structure). In our specific case, we wanted to update a model in the driver
using aggregates computed by a Structured Streaming query.

A lot of other applications are going to have similar requirements. For
example, there is no way (outside of using private Spark internals)* to
implement a console sink with a user supplied formatting function, or
configure a templated or generic sink at runtime, trigger a custom Python
call-back or even implement the ForeachSink outside of Spark. For work
inside of Spark to enable Structured Streaming with ML we clearly don’t
need SPARK-16407  as we
can directly access the internals (although it would be cleaner to not have
to) but if we want to empower people working outside of the Spark codebase
itself with Structured Streaming I think we need to provide some mechanism
for this and it would be great to see what options/ideas the community can
come up with.

One of the arguments against SPARK-16407
 seems to be mostly that
it exposes the Sink API which is implemented using micro-batching, but the
counter argument to this is that the Sink API is already exposed (instead
of passing in an instance the user needs to pass in a class name which is
then created through reflection and has configuration parameters passed in
as a map of strings).

Personally I think we should exposed a more nicely typed API instead of
depending on Strings for all configuration, and that if at some point the
Sink API itself needs to change if/when Spark Streaming moves away from
micro-batching we would still likely want to allow users to provide the
typed interface as well to give Sink creators more flexibility with
configuration.

Now obviously this is based on my understanding of the lay of the land
which could be a little off since the Spark Structured Streaming design
docs and JIRAs don’t seem to be being actively updated - so I’d love to
know what assumptions I’ve made that don’t match the current plans for
structured streaming.

Cheers,

Holden :)

Related Links:

   -

   The JIRA for this proposal
   https://issues.apache.org/jira/browse/SPARK-16407
   -

   The Structured Streaming ML JIRA
   https://issues.apache.org/jira/browse/SPARK-16424
   -


   
https://docs.google.com/document/d/1snh7x7b0dQIlTsJNHLr-IxIFgP43RfRV271YK2qGiFQ/edit?usp=sharing
   -

   https://github.com/apache/spark/pull/14691
   -

   https://github.com/holdenk/spark-structured-streaming-ml


*Strictly speaking one _could_ pass in a string of Java code and then
compile it inside the Sink with Janino - but that clearly isn’t reasonable.

-- 
Cell : 425-233-8271
Twitter: https://twitter.com/holdenkarau