Re: Source API requires unbounded distributed storage?
Created SPARK-16963 to cover this issue. Fred On Thu, Aug 4, 2016 at 4:52 PM, Michael Armbrustwrote: > Yeah, this API is in the private execution package because we are planning > to continue to iterate on it. Today, we will only ever go back one batch, > though that might change in the future if we do async checkpointing of > internal state. > > You are totally right that we should relay this info back to the source. > Opening a JIRA sounds like a good first step. > > On Thu, Aug 4, 2016 at 4:38 PM, Fred Reiss wrote: > >> Hi, >> >> I've been looking over the Source API in >> org.apache.spark.sql.execution.streaming, >> and I'm at a loss for how the current API can be implemented in a practical >> way. The API defines a single getBatch() method for fetching records from >> the source, with the following Scaladoc comments defining the semantics: >> >> >> */** * Returns the data that is between the offsets (*`*start*`*, *` >> *end*`*]. When *`*start*` *is *`*None*` >> >> *then * the batch should begin with the first available record. This >> method must always return the * same data for a particular *`*start*` *and >> *`*end*` >> *pair. */* >> * def *getBatch(start: Option[Offset], end: Offset): DataFrame >> >> If I read the semantics described here correctly, a Source is required to >> retain all past history for the stream that it backs. Further, a Source >> is also required to retain this data across restarts of the process where >> the Source is instantiated, even when the Source is restarted on a >> different machine. >> >> The current implementation of FileStreamSource follows my reading of the >> requirements above. FileStreamSource never deletes a file. >> >> I feel like this requirement for unbounded state retention must be a >> mistake or misunderstanding of some kind. The scheduler is internally >> maintaining a high water mark (StreamExecution.committedOffsets in >> StreamExecution.scala) of data that has been successfully processed. There >> must have been an intent to communicate that high water mark back to the >> Source so that the Source can clean up its state. Indeed, the DataBricks >> blog post from last week (https://databricks.com/blog/2 >> 016/07/28/structured-streaming-in-apache-spark.html) says that "Only a >> few minutes’ worth of data needs to be retained; Structured Streaming will >> maintain its own internal state after that." >> >> But the code checked into git and shipped with Spark 2.0 does not have an >> API call for the scheduler to tell a Source where the boundary of "only a >> few minutes' worth of data" lies. >> >> Is there a JIRA that I'm not aware of to change the Source API? If not, >> should we maybe open one? >> >> Fred >> > >
Re: Source API requires unbounded distributed storage?
Yeah, this API is in the private execution package because we are planning to continue to iterate on it. Today, we will only ever go back one batch, though that might change in the future if we do async checkpointing of internal state. You are totally right that we should relay this info back to the source. Opening a JIRA sounds like a good first step. On Thu, Aug 4, 2016 at 4:38 PM, Fred Reisswrote: > Hi, > > I've been looking over the Source API in > org.apache.spark.sql.execution.streaming, > and I'm at a loss for how the current API can be implemented in a practical > way. The API defines a single getBatch() method for fetching records from > the source, with the following Scaladoc comments defining the semantics: > > > */** * Returns the data that is between the offsets (*`*start*`*, *`*end* > `*]. When *`*start*` *is *`*None*` > > *then * the batch should begin with the first available record. This > method must always return the * same data for a particular *`*start*` *and > *`*end*` > *pair. */* > * def *getBatch(start: Option[Offset], end: Offset): DataFrame > > If I read the semantics described here correctly, a Source is required to > retain all past history for the stream that it backs. Further, a Source > is also required to retain this data across restarts of the process where > the Source is instantiated, even when the Source is restarted on a > different machine. > > The current implementation of FileStreamSource follows my reading of the > requirements above. FileStreamSource never deletes a file. > > I feel like this requirement for unbounded state retention must be a > mistake or misunderstanding of some kind. The scheduler is internally > maintaining a high water mark (StreamExecution.committedOffsets in > StreamExecution.scala) of data that has been successfully processed. There > must have been an intent to communicate that high water mark back to the > Source so that the Source can clean up its state. Indeed, the DataBricks > blog post from last week (https://databricks.com/blog/ > 2016/07/28/structured-streaming-in-apache-spark.html) says that "Only a > few minutes’ worth of data needs to be retained; Structured Streaming will > maintain its own internal state after that." > > But the code checked into git and shipped with Spark 2.0 does not have an > API call for the scheduler to tell a Source where the boundary of "only a > few minutes' worth of data" lies. > > Is there a JIRA that I'm not aware of to change the Source API? If not, > should we maybe open one? > > Fred >
Source API requires unbounded distributed storage?
Hi, I've been looking over the Source API in org.apache.spark.sql.execution.streaming, and I'm at a loss for how the current API can be implemented in a practical way. The API defines a single getBatch() method for fetching records from the source, with the following Scaladoc comments defining the semantics: */** * Returns the data that is between the offsets (*`*start*`*, *`*end*`*]. When *`*start*` *is *`*None*` *then * the batch should begin with the first available record. This method must always return the * same data for a particular *`*start*` *and *`*end*` *pair. */* * def *getBatch(start: Option[Offset], end: Offset): DataFrame If I read the semantics described here correctly, a Source is required to retain all past history for the stream that it backs. Further, a Source is also required to retain this data across restarts of the process where the Source is instantiated, even when the Source is restarted on a different machine. The current implementation of FileStreamSource follows my reading of the requirements above. FileStreamSource never deletes a file. I feel like this requirement for unbounded state retention must be a mistake or misunderstanding of some kind. The scheduler is internally maintaining a high water mark (StreamExecution.committedOffsets in StreamExecution.scala) of data that has been successfully processed. There must have been an intent to communicate that high water mark back to the Source so that the Source can clean up its state. Indeed, the DataBricks blog post from last week ( https://databricks.com/blog/2016/07/28/structured-streaming-in-apache-spark.html) says that "Only a few minutes’ worth of data needs to be retained; Structured Streaming will maintain its own internal state after that." But the code checked into git and shipped with Spark 2.0 does not have an API call for the scheduler to tell a Source where the boundary of "only a few minutes' worth of data" lies. Is there a JIRA that I'm not aware of to change the Source API? If not, should we maybe open one? Fred