Re: [Pyspark, SQL] Very slow IN operator

2017-04-06 Thread Fred Reiss
If you just want to emulate pushing down a join, you can just wrap the IN
list query in a JDBCRelation directly:

scala> val r_df = spark.read.format("jdbc").option("url",
> "jdbc:h2:/tmp/testdb").option("dbtable", "R").load()
> r_df: org.apache.spark.sql.DataFrame = [A: int]
> scala> r_df.show
> +---+
> |  A|
> +---+
> | 42|
> |-42|
> +---+
>
> scala> val querystr = s"select * from R where a in (${(1 to
> 10).mkString(",")})"
> querystr: String = select * from R where a in
> (1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,32,33,34,35,36,37,38,39,40,41,42,43,44,45,46,47,48,49,50,51,52,53,54,55,56,57,58,59,60,61,62,63,64,65,66,67,68,69,70,71,72,73,74,75,76,77,78,79,80,81,82,83,84,85,86,87,88,89,90,91,92,93,94,95,96,97,98,99,100,101,102,103,104,105,106,107,108,109,110,111,112,113,114,115,116,117,118,119,120,121,122,123,124,125,126,127,128,129,130,131,132,133,134,135,136,137,138,139,140,141,142,143,144,145,146,147,148,149,150,151,152,153,154,155,156,157,158,159,160,161,162,163,164,165,166,167,168,169,170,171,172,173,174,175,176,177,178,179,180,181,182,183,184,185,186,187,188,189,190,191,192,193,194,195,196,197,198,199,200,201,202,203,204,205,206,207,208,209,210,211,212,213,214,21...
> scala> val filtered_df = spark.read.format("jdbc").option("url",
> "jdbc:h2:/tmp/testdb").option("dbtable", s"($querystr)").load()
> filtered_df: org.apache.spark.sql.DataFrame = [A: int]
> scala> filtered_df.show
> +---+
> |  A|
> +---+
> | 42|
> +---+


Fred


On Thu, Apr 6, 2017 at 1:51 AM Maciej Bryński  wrote:

> 2017-04-06 4:00 GMT+02:00 Michael Segel :
> > Just out of curiosity, what would happen if you put your 10K values in
> to a temp table and then did a join against it?
>
> The answer is predicates pushdown.
> In my case I'm using this kind of query on JDBC table and IN predicate
> is executed on DB in less than 1s.
>
>
> Regards,
> --
> Maciek Bryński
>
> -
> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>
>


Re: Mini-Proposal: Make it easier to contribute to the contributing to Spark Guide

2016-10-20 Thread Fred Reiss
Great idea! If the developer docs are in github, then new contributors who
find errors or omissions can update the docs as an introduction to the PR
process.

Fred

On Wed, Oct 19, 2016 at 5:46 PM, Reynold Xin  wrote:

> For the contributing guide I think it makes more sense to put it in
> apache/spark github, since that's where contributors start. I'd also link
> to it from the website ...
>
>
> On Tue, Oct 18, 2016 at 10:03 AM, Shivaram Venkataraman <
> shiva...@eecs.berkeley.edu> wrote:
>
>> +1 - Given that our website is now on github
>> (https://github.com/apache/spark-website), I think we can move most of
>> our wiki into the main website. That way we'll only have two sources
>> of documentation to maintain: A release specific one in the main repo
>> and the website which is more long lived.
>>
>> Thanks
>> Shivaram
>>
>> On Tue, Oct 18, 2016 at 9:59 AM, Matei Zaharia 
>> wrote:
>> > Is there any way to tie wiki accounts with JIRA accounts? I found it
>> weird
>> > that they're not tied at the ASF.
>> >
>> > Otherwise, moving this into the docs might make sense.
>> >
>> > Matei
>> >
>> > On Oct 18, 2016, at 6:19 AM, Cody Koeninger  wrote:
>> >
>> > +1 to putting docs in one clear place.
>> >
>> >
>> > On Oct 18, 2016 6:40 AM, "Sean Owen"  wrote:
>> >>
>> >> I'm OK with that. The upside to the wiki is that it can be edited
>> directly
>> >> outside of a release cycle. However, in practice I find that the wiki
>> is
>> >> rarely changed. To me it also serves as a place for information that
>> isn't
>> >> exactly project documentation like "powered by" listings.
>> >>
>> >> In a way I'd like to get rid of the wiki to have one less place for
>> docs,
>> >> that doesn't have the same accessibility (I don't know who can give
>> edit
>> >> access), and doesn't have a review process.
>> >>
>> >> For now I'd settle for bringing over a few key docs like the one you
>> >> mention. I spent a little time a while ago removing some duplication
>> across
>> >> the wiki and project docs and think there's a bit more than could be
>> done.
>> >>
>> >>
>> >> On Tue, Oct 18, 2016 at 12:25 PM Holden Karau 
>> >> wrote:
>> >>>
>> >>> Right now the wiki isn't particularly accessible to updates by
>> external
>> >>> contributors. We've already got a contributing to spark page which
>> just
>> >>> links to the wiki - how about if we just move the wiki contents over?
>> This
>> >>> way contributors can contribute to our documentation about how to
>> contribute
>> >>> probably helping clear up points of confusion for new contributors
>> which the
>> >>> rest of us may be blind to.
>> >>>
>> >>> If we do this we would probably want to update the wiki page to point
>> to
>> >>> the documentation generated from markdown. It would also mean that the
>> >>> results of any update to the contributing guide take a full release
>> cycle to
>> >>> be visible. Another alternative would be opening up the wiki to a
>> broader
>> >>> set of people.
>> >>>
>> >>> I know a lot of people are probably getting ready for Spark Summit EU
>> >>> (and I hope to catch up with some of y'all there) but I figured this a
>> >>> relatively minor proposal.
>> >>> --
>> >>> Cell : 425-233-8271
>> >>> Twitter: https://twitter.com/holdenkarau
>> >
>> >
>>
>> -
>> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>>
>>
>


Re: StructuredStreaming Custom Sinks (motivated by Structured Streaming Machine Learning)

2016-10-14 Thread Fred Reiss
I think the way I phrased things earlier may be leading to some confusion
here. When I said "don't bring down my application", I was referring to the
application not meeting its end-to-end SLA, not to the app server crashing.

The groups I've talked to already isolate their front-end systems from
back-end systems with multiple solutions like message queues and key-value
stores. But some applications require a complex automated decision based on
information that is not available until the moment of the decision. Some
good examples are credit decisions, decisions about whether to blacklist a
hostile IP address, and product recommendations based on the user's
location and the link they just clicked. In all these cases, there is an
action that needs to happen in the real world, and that action has a
deadline, and you need to score a model to meet that deadline.

In a typical enterprise IT environment, the analytics tier is a much more
convenient place to run compute- and memory-intensive scoring tasks. The
hardware, software, and toolchain are tuned for the workload, and the data
science department has much more administrative control. So the question
naturally comes up: "Can I score my [credit/security/recommender/...]
models on the same infrastructure that I use to build them?"

Fred

On Thu, Oct 13, 2016 at 9:59 AM, Holden Karau <hol...@pigscanfly.ca> wrote:

> This is a thing I often have people ask me about, and then I do my best
> dissuade them from using Spark in the "hot path" and it's normally
> something which most people eventually accept. Fred might have more
> information for people for whom this is a hard requirement though.


> On Thursday, October 13, 2016, Cody Koeninger <c...@koeninger.org> wrote:
>
>> I've always been confused as to why it would ever be a good idea to
>> put any streaming query system on the critical path for synchronous  <
>> 100msec requests.  It seems to make a lot more sense to have a
>> streaming system do asynch updates of a store that has better latency
>> and quality of service characteristics for multiple users.  Then your
>> only latency concerns are event to update, not request to response.
>>
>> On Thu, Oct 13, 2016 at 10:39 AM, Fred Reiss <freiss@gmail.com>
>> wrote:
>> > On Tue, Oct 11, 2016 at 11:02 AM, Shivaram Venkataraman
>> > <shiva...@eecs.berkeley.edu> wrote:
>> >>
>> >> >
>> >> Could you expand a little bit more on stability ? Is it just bursty
>> >> workloads in terms of peak vs. average throughput ? Also what level of
>> >> latencies do you find users care about ? Is it on the order of 2-3
>> >> seconds vs. 1 second vs. 100s of milliseconds ?
>> >> >
>> >
>> >
>> > Regarding stability, I've seen two levels of concrete requirements.
>> >
>> > The first is "don't bring down my Spark cluster". That is to say,
>> regardless
>> > of the input data rate, Spark shouldn't thrash or crash outright.
>> Processing
>> > may lag behind the data arrival rate, but the cluster should stay up and
>> > remain fully functional.
>> >
>> > The second level is "don't bring down my application". A common use for
>> > streaming systems is to handle heavyweight computations that are part
>> of a
>> > larger application, like a web application, a mobile app, or a plant
>> control
>> > system. For example, an online application for car insurance might need
>> to
>> > do some pretty involved machine learning to produce an accurate quote
>> and
>> > suggest good upsells to the customer. If the heavyweight portion times
>> out,
>> > the whole application times out, and you lose a customer.
>> >
>> > In terms of bursty vs. non-bursty, the "don't bring down my Spark
>> cluster
>> > case" is more about handling bursts, while the "don't bring down my
>> > application" case is more about delivering acceptable end-to-end
>> response
>> > times under typical load.
>> >
>> > Regarding latency: One group I talked to mentioned requirements in the
>> > 100-200 msec range, driven by the need to display a web page on a
>> browser or
>> > mobile device. Another group in the Internet of Things space mentioned
>> times
>> > ranging from 5 seconds to 30 seconds throughout the conversation. But
>> most
>> > people I've talked to have been pretty vague about specific numbers.
>> >
>> > My impression is that these groups are not motivated by anxiety about
>> > meeting a particular laten

Re: StructuredStreaming Custom Sinks (motivated by Structured Streaming Machine Learning)

2016-10-13 Thread Fred Reiss
On Tue, Oct 11, 2016 at 11:02 AM, Shivaram Venkataraman <
shiva...@eecs.berkeley.edu> wrote:
>
> >
> Could you expand a little bit more on stability ? Is it just bursty
> workloads in terms of peak vs. average throughput ? Also what level of
> latencies do you find users care about ? Is it on the order of 2-3
> seconds vs. 1 second vs. 100s of milliseconds ?
> >
>

Regarding stability, I've seen two levels of concrete requirements.

The first is "don't bring down my Spark cluster". That is to say,
regardless of the input data rate, Spark shouldn't thrash or crash
outright. Processing may lag behind the data arrival rate, but the cluster
should stay up and remain fully functional.

The second level is "don't bring down my application". A common use for
streaming systems is to handle heavyweight computations that are part of a
larger application, like a web application, a mobile app, or a plant
control system. For example, an online application for car insurance might
need to do some pretty involved machine learning to produce an accurate
quote and suggest good upsells to the customer. If the heavyweight portion
times out, the whole application times out, and you lose a customer.

In terms of bursty vs. non-bursty, the "don't bring down my Spark cluster
case" is more about handling bursts, while the "don't bring down my
application" case is more about delivering acceptable end-to-end response
times under typical load.

Regarding latency: One group I talked to mentioned requirements in the
100-200 msec range, driven by the need to display a web page on a browser
or mobile device. Another group in the Internet of Things space mentioned
times ranging from 5 seconds to 30 seconds throughout the conversation. But
most people I've talked to have been pretty vague about specific numbers.

My impression is that these groups are not motivated by anxiety about
meeting a particular latency target for a particular application. Rather,
they want to make low latency the norm so that they can stop having to
think about latency. Today, low latency is a special requirement of special
applications. But that policy imposes a lot of hidden costs. IT architects
have to spend time estimating the latency requirements of every application
and lobbying for special treatment when those requirements are strict.
Managers have to spend time engineering business processes around latency.
Data scientists have to spend time packaging up models and negotiating how
those models will be shipped over to the low-latency serving tier. And
customers who are accustomed to Google and smartphones end up with an
experience that is functional but unsatisfying.

It's best to think of latency as a sliding scale. A given level of latency
imposes a given level of cost enterprise-wide. Someone who is making a
decision on middleware policy will balance this cost against other costs:
How much does it cost to deploy the middleware? How much does it cost to
train developers to use the system? The winner will be the system that
minimizes the overall cost.

Fred


Re: StructuredStreaming Custom Sinks (motivated by Structured Streaming Machine Learning)

2016-10-12 Thread Fred Reiss
On Tue, Oct 11, 2016 at 10:57 AM, Reynold Xin  wrote:

>
> On Tue, Oct 11, 2016 at 10:55 AM, Michael Armbrust  > wrote:
>
>> *Complex event processing and state management:* Several groups I've
>>> talked to want to run a large number (tens or hundreds of thousands now,
>>> millions in the near future) of state machines over low-rate partitions of
>>> a high-rate stream. Covering these use cases translates roughly into a
>>> three sub-requirements: maintaining lots of persistent state efficiently,
>>> feeding tuples to each state machine in the right order, and exposing
>>> convenient programmer APIs for complex event detection and signal
>>> processing tasks.
>>>
>>
>> I've heard this one too, but don't know of anyone actively working on
>> it.  Would be awesome to open a JIRA and start discussing what the APIs
>> would look like.
>>
>
> There is an existing ticket for CEP: https://issues.apache.org
> /jira/browse/SPARK-14745
>
>
>
Yeah, Mario and Sachin opened up that CEP ticket a while back, and they had
an early prototype (https://github.com/apache/spark/pull/12518) on the old
DStream APIs. The project stalled out due to uncertainty about how state
management and streaming query languages would work on Structured
Streaming. The people who were working on it are now focusing on other
issues.

Getting CEP to work efficiently is a whole-stack affair. You need optimizer
support for things like pulling out common subexpressions from event specs
and deciding between eager vs. lazy evaluation for predicates. You need
good fine-grained state management in the engine, including support for
efficiently handling out-of-order event arrival. And CEP workloads with a
large number of interdependent, stateful tasks will put stress on the
scheduling layer.

Fred


StructuredStreaming Custom Sinks (motivated by Structured Streaming Machine Learning)

2016-10-11 Thread Fred Reiss
On Thu, Oct 6, 2016 at 12:37 PM, Michael Armbrust > wrote:
>
> [snip!]
> Relatedly, I'm curious to hear more about the types of questions you are
> getting.  I think the dev list is a good place to discuss applications and
> if/how structured streaming can handle them.
>

Details are difficult to share, but I can give the general gist without
revealing anything proprietary.

I find myself having the same conversation about twice a month. The other
party to the conversation is an IBM product group or an IBM client who
is using Spark for batch and interactive analytics. Their overall
application has or will soon have a real-time component. They want
information from the IBM Spark Technology Center on the relative merits
of different streaming systems for that part of the application or product.
Usually, the options on the table are Spark Streaming/Structured Streaming
and another more "pure" streaming system like Apache Flink or IBM Streams.

Right now, the best recommendation I can give is: "Spark Streaming has
known shortcomings; here's a list. If you are certain that your application
can work within these constraints, then we recommend you give Spark
Streaming a try. Otherwise, check back 12-18 months from now, when
Structured Streaming will hopefully provide a usable platform for your
application."

The specific unmet requirements that are most relevant to these
conversations are: latency, throughput, stability under bursty loads,
complex event processing support, state management, job graph scheduling,
and access to the newer Dataset-based Spark APIs.

Again, apologies for not being able to name names, but here's a redacted
description of why these requirements are relevant.

*Delivering low latency, high throughput, and stability simultaneously:* Right
now, our own tests indicate you can get at most two of these
characteristics out of Spark Streaming at the same time. I know of two
parties that have abandoned Spark Streaming because "pick any two" is not
an acceptable answer to the latency/throughput/stability question for them.

*Complex event processing and state management:* Several groups I've talked
to want to run a large number (tens or hundreds of thousands now, millions
in the near future) of state machines over low-rate partitions of a
high-rate stream. Covering these use cases translates roughly into a three
sub-requirements: maintaining lots of persistent state efficiently, feeding
tuples to each state machine in the right order, and exposing convenient
programmer APIs for complex event detection and signal processing tasks.

*Job graph scheduling and access to Dataset APIs: *These requirements come
up in the context of groups who want to do streaming ETL. The general
application profile that I've seen involves updating a large number of
materialized views based on a smaller number of streams, using a mixture of
SQL and nontraditional processing. The extended SQL that the Dataset APIs
provide is useful in these applications. As for scheduling needs, it's
common for multiple output tables to share intermediate computations. Users
need an easy way to ensure that this shared computation happens only once,
while controlling the peak memory utilization during each batch.

Hope this helps.

Fred


Re: StructuredStreaming Custom Sinks (motivated by Structured Streaming Machine Learning)

2016-10-05 Thread Fred Reiss
Thanks for the thoughtful comments, Michael and Shivaram. From what I’ve
seen in this thread and on JIRA, it looks like the current plan with regard
to application-facing APIs for sinks is roughly:
1. Rewrite incremental query compilation for Structured Streaming.
2. Redesign Structured Streaming's source and sink APIs so that they do not
depend on RDDs.
3. Allow the new APIs to stabilize.
4. Open these APIs to use by application code.

Is there a way for those of us who aren’t involved in the first two steps
to get some idea of the current plans and progress? I get asked a lot about
when Structured Streaming will be a viable replacement for Spark Streaming,
and I like to be able to give accurate advice.

Fred

On Tue, Oct 4, 2016 at 3:02 PM, Michael Armbrust 
wrote:

> I don't quite understand why exposing it indirectly through a typed
>> interface should be delayed before finalizing the API.
>>
>
> Spark has a long history
>  of maintaining
> binary compatibility in its public APIs.  I strongly believe this is one of
> the things that has made the project successful.  Exposing internals that
> we know are going to change in the primary user facing API for creating
> Streaming DataFrames seems directly counter to this goal.  I think the
> argument that "you can do it anyway" fails to capture user expectations who
> probably aren't closely following this discussion.
>
> If advanced users want to dig though the code and experiment, great.  I
> hope they report back on whats good and what can be improved.  However, if
> you add the function suggested in the PR to DataStreamReader, you are
> giving them a bad experience by leaking internals that don't even show up
> in the published documentation.
>


Re: welcoming Xiao Li as a committer

2016-10-05 Thread Fred Reiss
Congratulations, Xiao!

Fred

On Tuesday, October 4, 2016, Joseph Bradley  wrote:

> Congrats!
>
> On Tue, Oct 4, 2016 at 4:09 PM, Kousuke Saruta  > wrote:
>
>> Congratulations Xiao!
>>
>> - Kousuke
>> On 2016/10/05 7:44, Bryan Cutler wrote:
>>
>> Congrats Xiao!
>>
>> On Tue, Oct 4, 2016 at 11:14 AM, Holden Karau > > wrote:
>>
>>> Congratulations :D :) Yay!
>>>
>>> On Tue, Oct 4, 2016 at 11:14 AM, Suresh Thalamati <
>>> suresh.thalam...@gmail.com
>>> > wrote:
>>>
 Congratulations, Xiao!



 > On Oct 3, 2016, at 10:46 PM, Reynold Xin > wrote:
 >
 > Hi all,
 >
 > Xiao Li, aka gatorsmile, has recently been elected as an Apache Spark
 committer. Xiao has been a super active contributor to Spark SQL. Congrats
 and welcome, Xiao!
 >
 > - Reynold
 >


 -
 To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
 


>>>
>>>
>>> --
>>> Cell : 425-233-8271
>>> Twitter: https://twitter.com/holdenkarau
>>>
>>
>>
>>
>


Re: Test fails when compiling spark with tests

2016-09-14 Thread Fred Reiss
Also try doing a fresh clone of the git repository. I've seen some of those
rare failure modes corrupt parts of my local copy in the past.

FWIW the main branch as of yesterday afternoon is building fine in my
environment.

Fred

On Tue, Sep 13, 2016 at 6:29 PM, Jakob Odersky  wrote:

> There are some flaky tests that occasionally fail, my first
> recommendation would be to re-run the test suite. Another thing to
> check is if there are any applications listening to spark's default
> ports.
> Btw, what is your environment like? In case it is windows, I don't
> think tests are regularly run against that platform and therefore
> could very well be broken.
>
> On Sun, Sep 11, 2016 at 10:49 PM, assaf.mendelson
>  wrote:
> > Hi,
> >
> > I am trying to set up a spark development environment. I forked the spark
> > git project and cloned the fork. I then checked out branch-2.0 tag
> (which I
> > assume is the released source code).
> >
> > I then compiled spark twice.
> >
> > The first using:
> >
> > mvn -Pyarn -Phadoop-2.6 -Dhadoop.version=2.6.0 -DskipTests clean package
> >
> > This compiled successfully.
> >
> > The second using mvn -Pyarn -Phadoop-2.6 -Dhadoop.version=2.6.0 clean
> > package
> >
> > This got a failure in Spark Project Core with the following test failing:
> >
> > - caching in memory and disk, replicated
> >
> > - caching in memory and disk, serialized, replicated *** FAILED ***
> >
> >   java.util.concurrent.TimeoutException: Can't find 2 executors before
> 3
> > milliseconds elapsed
> >
> >   at
> > org.apache.spark.ui.jobs.JobProgressListener.waitUntilExecutorsUp(
> JobProgressListener.scala:573)
> >
> >   at
> > org.apache.spark.DistributedSuite.org$apache$spark$DistributedSuite$$
> testCaching(DistributedSuite.scala:154)
> >
> >   at
> > org.apache.spark.DistributedSuite$$anonfun$32$$
> anonfun$apply$1.apply$mcV$sp(DistributedSuite.scala:191)
> >
> >   at
> > org.apache.spark.DistributedSuite$$anonfun$32$$anonfun$apply$1.apply(
> DistributedSuite.scala:191)
> >
> >   at
> > org.apache.spark.DistributedSuite$$anonfun$32$$anonfun$apply$1.apply(
> DistributedSuite.scala:191)
> >
> >   at
> > org.scalatest.Transformer$$anonfun$apply$1.apply$mcV$sp(
> Transformer.scala:22)
> >
> >   at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)
> >
> >   at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
> >
> >   at org.scalatest.Transformer.apply(Transformer.scala:22)
> >
> >   at org.scalatest.Transformer.apply(Transformer.scala:20)
> >
> >   ...
> >
> > - compute without caching when no partitions fit in memory
> >
> >
> >
> > I made no changes to the code whatsoever. Can anyone help me figure out
> what
> > is wrong with my environment?
> >
> > BTW I am using maven 3.3.9 and java 1.8.0_101-b13
> >
> >
> >
> > Thanks,
> >
> > Assaf
> >
> >
> > 
> > View this message in context: Test fails when compiling spark with tests
> > Sent from the Apache Spark Developers List mailing list archive at
> > Nabble.com.
>
> -
> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>
>


Re: Spark SQL - Applying transformation on a struct inside an array

2016-09-14 Thread Fred Reiss
+1 to this request. I talked last week with a product group within IBM that
is struggling with the same issue. It's pretty common in data cleaning
applications for data in the early stages to have nested lists or sets
inconsistent or incomplete schema information.

Fred

On Tue, Sep 13, 2016 at 8:08 AM, Olivier Girardot <
o.girar...@lateral-thoughts.com> wrote:

> Hi everyone,
> I'm currently trying to create a generic transformation mecanism on a
> Dataframe to modify an arbitrary column regardless of the underlying the
> schema.
>
> It's "relatively" straightforward for complex types like struct>
> to apply an arbitrary UDF on the column and replace the data "inside" the
> struct, however I'm struggling to make it work for complex types containing
> arrays along the way like struct>>.
>
> Michael Armbrust seemed to allude on the mailing list/forum to a way of
> using Encoders to do that, I'd be interested in any pointers, especially
> considering that it's not possible to output any Row or
> GenericRowWithSchema from a UDF (thanks to https://github.com/apache/
> spark/blob/v2.0.0/sql/catalyst/src/main/scala/org/
> apache/spark/sql/catalyst/ScalaReflection.scala#L657 it seems).
>
> To sum up, I'd like to find a way to apply a transformation on complex
> nested datatypes (arrays and struct) on a Dataframe updating the value
> itself.
>
> Regards,
>
> *Olivier Girardot*
>


Re: Structured Streaming with Kafka sources/sinks

2016-08-29 Thread Fred Reiss
I think that the community really needs some feedback on the progress of
this very important task. Many existing Spark Streaming applications can't
be ported to Structured Streaming without Kafka support.

Is there a design document somewhere?  Or can someone from the DataBricks
team break down the existing monolithic JIRA issue into smaller steps that
reflect the current development plan?

Fred


On Sat, Aug 27, 2016 at 2:32 PM, Koert Kuipers  wrote:

> thats great
>
> is this effort happening anywhere that is publicly visible? github?
>
> On Tue, Aug 16, 2016 at 2:04 AM, Reynold Xin  wrote:
>
>> We (the team at Databricks) are working on one currently.
>>
>>
>> On Mon, Aug 15, 2016 at 7:26 PM, Cody Koeninger 
>> wrote:
>>
>>> https://issues.apache.org/jira/browse/SPARK-15406
>>>
>>> I'm not working on it (yet?), never got an answer to the question of
>>> who was planning to work on it.
>>>
>>> On Mon, Aug 15, 2016 at 9:12 PM, Guo, Chenzhao 
>>> wrote:
>>> > Hi all,
>>> >
>>> >
>>> >
>>> > I’m trying to write Structured Streaming test code and will deal with
>>> Kafka
>>> > source. Currently Spark 2.0 doesn’t support Kafka sources/sinks.
>>> >
>>> >
>>> >
>>> > I found some Databricks slides saying that Kafka sources/sinks will be
>>> > implemented in Spark 2.0, so is there anybody working on this? And
>>> when will
>>> > it be released?
>>> >
>>> >
>>> >
>>> > Thanks,
>>> >
>>> > Chenzhao Guo
>>>
>>> -
>>> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>>>
>>>
>>
>


Re: Source API requires unbounded distributed storage?

2016-08-08 Thread Fred Reiss
Created SPARK-16963 to cover this issue.

Fred

On Thu, Aug 4, 2016 at 4:52 PM, Michael Armbrust <mich...@databricks.com>
wrote:

> Yeah, this API is in the private execution package because we are planning
> to continue to iterate on it.  Today, we will only ever go back one batch,
> though that might change in the future if we do async checkpointing of
> internal state.
>
> You are totally right that we should relay this info back to the source.
> Opening a JIRA sounds like a good first step.
>
> On Thu, Aug 4, 2016 at 4:38 PM, Fred Reiss <freiss@gmail.com> wrote:
>
>> Hi,
>>
>> I've been looking over the Source API in 
>> org.apache.spark.sql.execution.streaming,
>> and I'm at a loss for how the current API can be implemented in a practical
>> way. The API defines a single getBatch() method for fetching records from
>> the source, with the following Scaladoc comments defining the semantics:
>>
>>
>> */**  * Returns the data that is between the offsets (*`*start*`*, *`
>> *end*`*]. When *`*start*` *is *`*None*`
>>
>> *then  * the batch should begin with the first available record. This
>> method must always return the  * same data for a particular *`*start*` *and
>> *`*end*`
>> *pair.  */*
>> * def *getBatch(start: Option[Offset], end: Offset): DataFrame
>>
>> If I read the semantics described here correctly, a Source is required to
>> retain all past history for the stream that it backs. Further, a Source
>> is also required to retain this data across restarts of the process where
>> the Source is instantiated, even when the Source is restarted on a
>> different machine.
>>
>> The current implementation of FileStreamSource follows my reading of the
>> requirements above. FileStreamSource never deletes a file.
>>
>> I feel like this requirement for unbounded state retention must be a
>> mistake or misunderstanding of some kind. The scheduler is internally
>> maintaining a high water mark (StreamExecution.committedOffsets in
>> StreamExecution.scala) of data that has been successfully processed. There
>> must have been an intent to communicate that high water mark back to the
>> Source so that the Source can clean up its state. Indeed, the DataBricks
>> blog post from last week (https://databricks.com/blog/2
>> 016/07/28/structured-streaming-in-apache-spark.html) says that "Only a
>> few minutes’ worth of data needs to be retained; Structured Streaming will
>> maintain its own internal state after that."
>>
>> But the code checked into git and shipped with Spark 2.0 does not have an
>> API call for the scheduler to tell a Source where the boundary of "only a
>> few minutes' worth of data" lies.
>>
>> Is there a JIRA that I'm not aware of to change the Source API? If not,
>> should we maybe open one?
>>
>> Fred
>>
>
>


Source API requires unbounded distributed storage?

2016-08-04 Thread Fred Reiss
Hi,

I've been looking over the Source API in
org.apache.spark.sql.execution.streaming, and I'm at a loss for how the
current API can be implemented in a practical way. The API defines a single
getBatch() method for fetching records from the source, with the following
Scaladoc comments defining the semantics:


*/**  * Returns the data that is between the offsets (*`*start*`*, *`*end*`*].
When *`*start*` *is *`*None*`

*then  * the batch should begin with the first available record. This
method must always return the  * same data for a particular *`*start*` *and
*`*end*`
*pair.  */*
* def *getBatch(start: Option[Offset], end: Offset): DataFrame

If I read the semantics described here correctly, a Source is required to
retain all past history for the stream that it backs. Further, a Source is
also required to retain this data across restarts of the process where the
Source is instantiated, even when the Source is restarted on a different
machine.

The current implementation of FileStreamSource follows my reading of the
requirements above. FileStreamSource never deletes a file.

I feel like this requirement for unbounded state retention must be a
mistake or misunderstanding of some kind. The scheduler is internally
maintaining a high water mark (StreamExecution.committedOffsets in
StreamExecution.scala) of data that has been successfully processed. There
must have been an intent to communicate that high water mark back to the
Source so that the Source can clean up its state. Indeed, the DataBricks
blog post from last week (
https://databricks.com/blog/2016/07/28/structured-streaming-in-apache-spark.html)
says that "Only a few minutes’ worth of data needs to be retained;
Structured Streaming will maintain its own internal state after that."

But the code checked into git and shipped with Spark 2.0 does not have an
API call for the scheduler to tell a Source where the boundary of "only a
few minutes' worth of data" lies.

Is there a JIRA that I'm not aware of to change the Source API? If not,
should we maybe open one?

Fred