Re: Is Spark Structured Streaming TOTALLY BROKEN (Spark Metadata Issues)

2020-06-17 Thread Tathagata Das
Hello Rachana,

Getting exactly-once semantics on files and making it scale to a very large
number of files are very hard problems to solve. While Structured Streaming
+ built-in file sink solves the exactly-once guarantee that DStreams could
not, it is definitely limited in other ways (scaling in terms of files,
combining batch and streaming writes in the same place, etc). And solving
this problem requires a holistic solution that is arguably beyond the scope
of the Spark project.

There are other projects that are trying to solve this file management
issue. For example, Delta Lake (full disclosure, I am
involved in it) was built to exactly solve this problem - get exactly-once
and ACID guarantees on files, but also scale to handling millions of files.
Please consider it as part of your solution.




On Wed, Jun 17, 2020 at 9:50 AM Rachana Srivastava
 wrote:

> I have written a simple spark structured steaming app to move data from
> Kafka to S3. Found that in order to support exactly-once guarantee spark
> creates _spark_metadata folder, which ends up growing too large as the
> streaming app is SUPPOSE TO run FOREVER. But when the streaming app runs
> for a long time the metadata folder grows so big that we start getting OOM
> errors. Only way to resolve OOM is delete Checkpoint and Metadata folder
> and loose VALUABLE customer data.
>
> Spark open JIRAs SPARK-24295 and SPARK-29995, SPARK-30462, and SPARK-24295)
> Since Spark Streaming was NOT broken like this. Is Spark Streaming a
> BETTER choice?
>


Re: how can i write spark addListener metric to kafka

2020-06-09 Thread Tathagata Das
https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#reporting-metrics-programmatically-using-asynchronous-apis

On Tue, Jun 9, 2020 at 4:42 PM a s  wrote:

> hi Guys,
>
> I am building a structured streaming  app for google analytics data
>
> i want to capture the number of rows read and processed
>
> i am able to see it in log how can i send it to kafka
>
>
> Thanks
> Alis
>


Re: Spark, read from Kafka stream failing AnalysisException

2020-04-05 Thread Tathagata Das
Have you looked at the suggestion made by the error by searching for
"Structured Streaming + Kafka Integration Guide" in Google? It should be
the first result.
The last section

in the "Structured Streaming + Kafka Integration Guide" specifies how to
add the spark-sql-kafka maven dependency when starting pyspark.


On Sun, Apr 5, 2020 at 11:11 AM Sumit Agrawal  wrote:

> Hello,
>
> I am using Spark 2.4.5, Kafka 2.3.1 on my local machine.
>
> I am able to produce and consume messages on Kafka with bootstrap server
> config "localhost:9092”
>
> While trying to setup reader with spark streaming API, I am getting an
> error as
>
> Exception Message:
> Py4JJavaError: An error occurred while calling o166.load.
> : org.apache.spark.sql.AnalysisException: Failed to find data source:
> kafka. Please deploy the application as per the deployment section of
> "Structured Streaming + Kafka Integration Guide".;
>
> Spark Code I am trying to execute:
> df1 = spark.readStream.format("kafka")\
>   .option("kafka.bootstrap.servers", "localhost:9092")\
>   .option("subscribe", "topic1")\
>   .load()
>
> Any guidelines would help.
>
> Regards,
> Sumit
>
>
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Stateful Structured Spark Streaming: Timeout is not getting triggered

2020-03-04 Thread Tathagata Das
Make sure that you are continuously feeding data into the query to trigger
the batches. only then timeouts are processed.
See the timeout behavior details here -
https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.streaming.GroupState

On Wed, Mar 4, 2020 at 2:51 PM Something Something 
wrote:

> I've set the timeout duration to "2 minutes" as follows:
>
> def updateAcrossEvents (tuple3: Tuple3[String, String, String], inputs: 
> Iterator[R00tJsonObject],
>   oldState: GroupState[MyState]): OutputRow = {
>
> println(" Inside updateAcrossEvents with : " + tuple3._1 + ", " + 
> tuple3._2 + ", " + tuple3._3)
> var state: MyState = if (oldState.exists) oldState.get else 
> MyState(tuple3._1, tuple3._2, tuple3._3)
>
> if (oldState.hasTimedOut) {
>   println("@ oldState has timed out ")
>   // Logic to Write OutputRow
>   OutputRow("some values here...")
> } else {
>   for (input <- inputs) {
> state = updateWithEvent(state, input)
> oldState.update(state)
> *oldState.setTimeoutDuration("2 minutes")*
>   }
>   OutputRow(null, null, null)
> }
>
>   }
>
> I have also specified ProcessingTimeTimeout in 'mapGroupsWithState' as 
> follows...
>
> .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(updateAcrossEvents)
>
> But 'hasTimedOut' is never true so I don't get any output! What am I doing 
> wrong?
>
>
>
>


Re: Structured Streaming: mapGroupsWithState UDT serialization does not work

2020-02-28 Thread Tathagata Das
Sounds like something to do with the serialization/deserialization, and not
related to mapGroupsWithState.

https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/types/UserDefinedType.scala

The docs says that
1. this is deprecated and therefore should not be used
2. you have to use the annotation `SQLUserDefinedType
<https://github.com/apache/spark/blob/master/sql/catalyst/src/main/java/org/apache/spark/sql/types/SQLUserDefinedType.java>`
on the class definition. You dont seem to have done it, maybe thats the
reason?

I would debug by printing the values in the serialize/deserialize methods,
and then passing it through the groupBy that is known to fail.

TD

On Fri, Feb 28, 2020 at 2:45 PM Bryan Jeffrey 
wrote:

> Tathagata,
>
> The difference is more than hours off. In this instance it's different by
> 4 years. In other instances it's different by tens of years (and other
> smaller durations).
>
> We've considered moving to storage as longs, but this makes code much less
> readable and harder to maintain. The udt serialization bug also causes
> issues outside of stateful streaming, as when executing a simple group by.
>
> Regards,
>
> Bryan Jeffrey
>
> Get Outlook for Android <https://aka.ms/ghei36>
>
> --
> *From:* Tathagata Das 
> *Sent:* Friday, February 28, 2020 4:56:07 PM
> *To:* Bryan Jeffrey 
> *Cc:* user 
> *Subject:* Re: Structured Streaming: mapGroupsWithState UDT serialization
> does not work
>
> You are deserializing by explicitly specifying UTC timezone, but when
> serializing you are not specifying it. Maybe that is reason?
>
> Also, if you can encode it using just long, then I recommend just saving
> the value as long and eliminating some of the serialization overheads.
> Spark will probably better optimize stuff if it sees it as a long rather
> than an opaque UDT.
>
> TD
>
> On Fri, Feb 28, 2020 at 6:39 AM Bryan Jeffrey 
> wrote:
>
> Hello.
>
> I'm running Scala 2.11 w/ Spark 2.3.0.  I've encountered a problem with
> mapGroupsWithState, and was wondering if anyone had insight.  We use Joda
> time in a number of data structures, and so we've generated a custom
> serializer for Joda.  This works well in most dataset/dataframe structured
> streaming operations. However, when running mapGroupsWithState we observed
> that incorrect dates were being returned from a state.
>
> I created a bug here: https://issues.apache.org/jira/browse/SPARK-30986 in
> an effort to assist tracking of related information.
>
> Simple example:
> 1. Input A has a date D
> 2. Input A updates state in mapGroupsWithState. Date present in state is D
> 3. Input A is added again.  Input A has correct date D, but existing state
> now has invalid date
>
> Here is a simple repro:
>
> Joda Time UDT:
>
> private[sql] class JodaTimeUDT extends UserDefinedType[DateTime] {
>   override def sqlType: DataType  = LongType
>   override def serialize(obj: DateTime): Long = obj.getMillis
>   def deserialize(datum: Any): DateTime = datum match { case value: Long => 
> new DateTime(value, DateTimeZone.UTC) }
>   override def userClass: Class[DateTime] = classOf[DateTime]
>   private[spark] override def asNullable: JodaTimeUDT = this
> }
>
> object JodaTimeUDTRegister {
>   def register : Unit = { UDTRegistration.register(classOf[DateTime].getName, 
> classOf[JodaTimeUDT].getName)  }
> }
>
>
> Test Leveraging Joda UDT:
>
> case class FooWithDate(date: DateTime, s: String, i: Int)
>
> @RunWith(classOf[JUnitRunner])
> class TestJodaTimeUdt extends FlatSpec with Matchers with MockFactory with 
> BeforeAndAfterAll {
>   val application = this.getClass.getName
>   var session: SparkSession = _
>
>   override def beforeAll(): Unit = {
> System.setProperty("hadoop.home.dir", getClass.getResource("/").getPath)
> val sparkConf = new SparkConf()
>   .set("spark.driver.allowMultipleContexts", "true")
>   .set("spark.testing", "true")
>   .set("spark.memory.fraction", "1")
>   .set("spark.ui.enabled", "false")
>   .set("spark.streaming.gracefulStopTimeout", "1000")
>   .setAppName(application).setMaster("local[*]")
>
>
> session = SparkSession.builder().config(sparkConf).getOrCreate()
> session.sparkContext.setCheckpointDir("/")
> JodaTimeUDTRegister.register
>   }
>
>   override def afterAll(): Unit = {
> session.stop()
>   }
>
>   it should "work correctly for a streaming input with stateful 
> transformation" in {
> val date = new DateTime(2020, 1, 2, 

Re: Structured Streaming: mapGroupsWithState UDT serialization does not work

2020-02-28 Thread Tathagata Das
You are deserializing by explicitly specifying UTC timezone, but when
serializing you are not specifying it. Maybe that is reason?

Also, if you can encode it using just long, then I recommend just saving
the value as long and eliminating some of the serialization overheads.
Spark will probably better optimize stuff if it sees it as a long rather
than an opaque UDT.

TD

On Fri, Feb 28, 2020 at 6:39 AM Bryan Jeffrey 
wrote:

> Hello.
>
> I'm running Scala 2.11 w/ Spark 2.3.0.  I've encountered a problem with
> mapGroupsWithState, and was wondering if anyone had insight.  We use Joda
> time in a number of data structures, and so we've generated a custom
> serializer for Joda.  This works well in most dataset/dataframe structured
> streaming operations. However, when running mapGroupsWithState we observed
> that incorrect dates were being returned from a state.
>
> I created a bug here: https://issues.apache.org/jira/browse/SPARK-30986 in
> an effort to assist tracking of related information.
>
> Simple example:
> 1. Input A has a date D
> 2. Input A updates state in mapGroupsWithState. Date present in state is D
> 3. Input A is added again.  Input A has correct date D, but existing state
> now has invalid date
>
> Here is a simple repro:
>
> Joda Time UDT:
>
> private[sql] class JodaTimeUDT extends UserDefinedType[DateTime] {
>   override def sqlType: DataType  = LongType
>   override def serialize(obj: DateTime): Long = obj.getMillis
>   def deserialize(datum: Any): DateTime = datum match { case value: Long => 
> new DateTime(value, DateTimeZone.UTC) }
>   override def userClass: Class[DateTime] = classOf[DateTime]
>   private[spark] override def asNullable: JodaTimeUDT = this
> }
>
> object JodaTimeUDTRegister {
>   def register : Unit = { UDTRegistration.register(classOf[DateTime].getName, 
> classOf[JodaTimeUDT].getName)  }
> }
>
>
> Test Leveraging Joda UDT:
>
> case class FooWithDate(date: DateTime, s: String, i: Int)
>
> @RunWith(classOf[JUnitRunner])
> class TestJodaTimeUdt extends FlatSpec with Matchers with MockFactory with 
> BeforeAndAfterAll {
>   val application = this.getClass.getName
>   var session: SparkSession = _
>
>   override def beforeAll(): Unit = {
> System.setProperty("hadoop.home.dir", getClass.getResource("/").getPath)
> val sparkConf = new SparkConf()
>   .set("spark.driver.allowMultipleContexts", "true")
>   .set("spark.testing", "true")
>   .set("spark.memory.fraction", "1")
>   .set("spark.ui.enabled", "false")
>   .set("spark.streaming.gracefulStopTimeout", "1000")
>   .setAppName(application).setMaster("local[*]")
>
>
> session = SparkSession.builder().config(sparkConf).getOrCreate()
> session.sparkContext.setCheckpointDir("/")
> JodaTimeUDTRegister.register
>   }
>
>   override def afterAll(): Unit = {
> session.stop()
>   }
>
>   it should "work correctly for a streaming input with stateful 
> transformation" in {
> val date = new DateTime(2020, 1, 2, 3, 4, 5, 6, DateTimeZone.UTC)
> val sqlContext = session.sqlContext
> import sqlContext.implicits._
>
> val input = List(FooWithDate(date, "Foo", 1), FooWithDate(date, "Foo", 
> 3), FooWithDate(date, "Foo", 3))
> val streamInput: MemoryStream[FooWithDate] = new 
> MemoryStream[FooWithDate](42, session.sqlContext)
> streamInput.addData(input)
> val ds: Dataset[FooWithDate] = streamInput.toDS()
>
> val mapGroupsWithStateFunction: (Int, Iterator[FooWithDate], 
> GroupState[FooWithDate]) => FooWithDate = TestJodaTimeUdt.updateFooState
> val result: Dataset[FooWithDate] = ds
>   .groupByKey(x => x.i)
>   
> .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(mapGroupsWithStateFunction)
> val writeTo = s"random_table_name"
>
> 
> result.writeStream.outputMode(OutputMode.Update).format("memory").queryName(writeTo).trigger(Trigger.Once()).start().awaitTermination()
> val combinedResults: Array[FooWithDate] = session.sql(sqlText = s"select 
> * from $writeTo").as[FooWithDate].collect()
> val expected = Array(FooWithDate(date, "Foo", 1), FooWithDate(date, 
> "FooFoo", 6))
> combinedResults should contain theSameElementsAs(expected)
>   }
> }
>
> object TestJodaTimeUdt {
>   def updateFooState(id: Int, inputs: Iterator[FooWithDate], state: 
> GroupState[FooWithDate]): FooWithDate = {
> if (state.hasTimedOut) {
>   state.remove()
>   state.getOption.get
> } else {
>   val inputsSeq: Seq[FooWithDate] = inputs.toSeq
>   val startingState = state.getOption.getOrElse(inputsSeq.head)
>   val toProcess = if (state.getOption.isDefined) inputsSeq else 
> inputsSeq.tail
>   val updatedFoo = toProcess.foldLeft(startingState)(concatFoo)
>
>   state.update(updatedFoo)
>   state.setTimeoutDuration("1 minute")
>   updatedFoo
> }
>   }
>
>   def concatFoo(a: FooWithDate, b: FooWithDate): FooWithDate = 
> FooWithDate(b.date, a.s + b.s, a.i + b.i)
> }
>
>
> The test output shows the 

Re: dropDuplicates and watermark in structured streaming

2020-02-28 Thread Tathagata Das
why do you have two watermarks? once you apply the watermark to a column
(i.e., "time"), it can be used in all later operations as long as the
column is preserved. So the above code should be equivalent to

df.withWarmark("time","window
size").dropDulplicates("id").groupBy(window("time","window size","window
size")).agg(count("id"))

The right way to think about the watermark threshold is "how late and out
of order my data can be". The answer may be different from the window size
completely. You may want to calculate 10 minutes windows but your data may
come in 5 hour late. So you should define watermark with 5 hour, not 10
minutes.

Btw, on a side note, just so you know, you can use "approx_count_distinct"
if you are okay with some approximation.

On Thu, Feb 27, 2020 at 9:11 PM lec ssmi  wrote:

>   Such as :
> df.withWarmark("time","window
> size").dropDulplicates("id").withWatermark("time","real
> watermark").groupBy(window("time","window size","window
> size")).agg(count("id"))
>can It  make count(distinct count) success?
>
> Tathagata Das  于2020年2月28日周五 上午10:25写道:
>
>> 1. Yes. All times in event time, not processing time. So you may get 10AM
>> event time data at 11AM processing time, but it will still be compared
>> again all data within 9-10AM event times.
>>
>> 2. Show us your code.
>>
>> On Thu, Feb 27, 2020 at 2:30 AM lec ssmi  wrote:
>>
>>> Hi:
>>> I'm new to structured streaming. Because the built-in API cannot
>>> perform the Count Distinct operation of Window, I want to use
>>> dropDuplicates first, and then perform the window count.
>>>But in the process of using, there are two problems:
>>>1. Because it is streaming computing, in the process of
>>> deduplication, the state needs to be cleared in time, which requires the
>>> cooperation of watermark. Assuming my event time field is consistently
>>>   increasing, and I set the watermark to 1 hour, does it
>>> mean that the data at 10 o'clock will only be compared in these data from 9
>>> o'clock to 10 o'clock, and the data before 9 o'clock will be cleared ?
>>>2. Because it is window deduplication, I set the watermark
>>> before deduplication to the window size.But after deduplication, I need to
>>> call withWatermark () again to set the watermark to the real
>>>watermark. Will setting the watermark again take effect?
>>>
>>>  Thanks a lot !
>>>
>>


Re: dropDuplicates and watermark in structured streaming

2020-02-27 Thread Tathagata Das
1. Yes. All times in event time, not processing time. So you may get 10AM
event time data at 11AM processing time, but it will still be compared
again all data within 9-10AM event times.

2. Show us your code.

On Thu, Feb 27, 2020 at 2:30 AM lec ssmi  wrote:

> Hi:
> I'm new to structured streaming. Because the built-in API cannot
> perform the Count Distinct operation of Window, I want to use
> dropDuplicates first, and then perform the window count.
>But in the process of using, there are two problems:
>1. Because it is streaming computing, in the process of
> deduplication, the state needs to be cleared in time, which requires the
> cooperation of watermark. Assuming my event time field is consistently
>   increasing, and I set the watermark to 1 hour, does it mean
> that the data at 10 o'clock will only be compared in these data from 9
> o'clock to 10 o'clock, and the data before 9 o'clock will be cleared ?
>2. Because it is window deduplication, I set the watermark
> before deduplication to the window size.But after deduplication, I need to
> call withWatermark () again to set the watermark to the real
>watermark. Will setting the watermark again take effect?
>
>  Thanks a lot !
>


Re: Spark Streaming: Aggregating values across batches

2020-02-27 Thread Tathagata Das
Use Structured Streaming. Its aggregation, by definition, is across batches.

On Thu, Feb 27, 2020 at 3:17 PM Something Something <
mailinglist...@gmail.com> wrote:

> We've a Spark Streaming job that calculates some values in each batch.
> What we need to do now is aggregate values across ALL batches. What is the
> best strategy to do this in Spark Streaming. Should we use 'Spark
> Accumulators' for this?
>


Re: how can I dynamic parse json in kafka when using Structured Streaming

2019-09-17 Thread Tathagata Das
You can use *from_json* built-in SQL function to parse json.
https://spark.apache.org/docs/latest/api/java/org/apache/spark/sql/functions.html#from_json-org.apache.spark.sql.Column-org.apache.spark.sql.Column-

On Mon, Sep 16, 2019 at 7:39 PM lk_spark  wrote:

> hi,all :
> I'm using Structured Streaming to read kafka , the data type is json
> String , I want to parse  it and conver to a datafrme , my code can't pass
> compile , I don't know how to fix it:
>
>
> val lines = messages.selectExpr("CAST(value AS STRING) as value").as[
> String]
>
> val words = lines.map(line => {
>   var json: JValue = null
>   try {
> json = parse(line)
>   } catch {
> case ex: Exception => { println(ex.getMessage + " " + line) }
>   }
>   //var result: scala.collection.mutable.Map[String,String] =
> scala.collection.mutable.Map()
>   val jsonObj = json.values.asInstanceOf[Map[String, _]]
>   val valuse = jsonObj.values.toArray
>   val schema = StructType(List())
>   for ((k, v) <- jsonObj){
> //result += (k -> jsonObj.get(k).toString())
>
> if(v.isInstanceOf[String]){
>   schema.add(k,StringType)
> }else if (v.isInstanceOf[Int]){
>   schema.add(k,IntegerType)
> }/*else if (v.isInstanceOf[Array[String]]){
>   schema.add(k,ArrayType(StringType))
> }else if (v.isInstanceOf[Map[String,String]]){
>   schema.add(k,MapType(StringType,StringType))
> }*/
>   }
>   val row =  new GenericRowWithSchema(valuse,schema)
>   row
> })
>
>
> Error:(45, 26) Unable to find encoder for type
> org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema. An implicit
> Encoder[org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema] is
> needed to store
> org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema instances in
> a Dataset. Primitive types (Int, String, etc) and Product types (case
> classes) are supported by importing spark.implicits._  Support for
> serializing other types will be added in future releases.
> val words = lines.map(line => {
>
> Error:(45, 26) not enough arguments for method map: (implicit evidence$6:
> org.apache.spark.sql.Encoder[org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema])org.apache.spark.sql.Dataset[org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema].
> Unspecified value parameter evidence$6.
> val words = lines.map(line => {
>
>
>
> 2019-09-17
> --
> lk_spark
>


Re: Structured Streaming: How to add a listener for when a batch is complete

2019-09-03 Thread Tathagata Das
https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#reporting-metrics-programmatically-using-asynchronous-apis

On Tue, Sep 3, 2019, 3:26 PM Natalie Ruiz
 wrote:

> Hello all,
>
>
>
> I’m a beginner, new to Spark and wanted to know if there was an equivalent
> to Spark Streaming’s StreamingListenerBatchCompleted in Structured
> Streaming? I want to add a listener for when a batch is complete but the
> documentation and examples I find are for Spark Streaming and not
> Structured Streaming and registering it to a StreamingContext, which from
> my understanding is just for Spark Streaming
>
>
>
> Thanks!
>


Re: Structured Streaming Dataframe Size

2019-08-29 Thread Tathagata Das
Responses inline.

On Wed, Aug 28, 2019 at 8:42 AM Nick Dawes  wrote:

> Thank you, TD. Couple of follow up questions please.
>
> 1) "It only keeps around the minimal intermediate state data"
>
> How do you define "minimal" here? Is there a configuration property to
> control the time or size of Streaming Dataframe?
>
Thats what watermarks are for. You can tune how much late data to consider
and accordingly how much of the past information need to be buffered as the
state. More lateness tolerance = more state in memory to manage.
Shameful plug but see my deep dive talk -
https://databricks.com/session/a-deep-dive-into-stateful-stream-processing-in-structured-streaming


>
> 2) I'm not writing anything out to any database or S3. My requirement is
> to find out a count (real-time) in a 1 hour window. I would like to get
> this count from a BI tool. So can register as a temp view and access from
> BI tool?
>
> I tried something like this In my Streaming application
>
> AggStreamingDF.createOrReplaceGlobalTempView("streaming_table")
>
> Then, In BI tool, I queried like this...
>
> select * from streaming_table
>
> Error:  Queries with streaming sources must be executed with
> writeStream.start()
>
> Any suggestions to make this work?
>
>
There are two ways of doing this

1. Write the aggregates to an in-memory table (driver's memory) and query
that.

*
AggStreamingDF.write.format("memory").outputMode("complete").queryName("myAggTable").start()*
Then
*select * from **myAggTable*

2. Write the aggregates to files using Delta Lake <https://delta.io>
project (docs <https://docs.delta.io/latest/index.html#>).

*AggStreamingDF.write.format("delta").outputMode("complete").start("path/to/delta/table")*
Then you can query the delta table using Spark.

*spark.read.format("delta").load("path/to/delta/table").createOrReplaceGlobalTempView("myAggTable")
*
Then
*select * from **myAggTable*
This will give awesome ACID transactional guarantees between reads and
writes. Read more on the linked website (full disclosure, I work on that
project as well).



> Thank you very much for your help!
>
>
> On Tue, Aug 27, 2019, 6:42 PM Tathagata Das 
> wrote:
>
>>
>> https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#basic-concepts
>>
>> *Note that Structured Streaming does not materialize the entire table*.
>>> It reads the latest available data from the streaming data source,
>>> processes it incrementally to update the result, and then discards the
>>> source data. It only keeps around the minimal intermediate *state* data
>>> as required to update the result (e.g. intermediate counts in the earlier
>>> example).
>>>
>>
>>
>> On Tue, Aug 27, 2019 at 1:21 PM Nick Dawes  wrote:
>>
>>> I have a quick newbie question.
>>>
>>> Spark Structured Streaming creates an unbounded dataframe that keeps
>>> appending rows to it.
>>>
>>> So what's the max size of data it can hold? What if the size becomes
>>> bigger than the JVM? Will it spill to disk? I'm using S3 as storage. So
>>> will it write temp data on S3 or on local file system of the cluster?
>>>
>>> Nick
>>>
>>


Re: Structured Streaming Dataframe Size

2019-08-27 Thread Tathagata Das
https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#basic-concepts

*Note that Structured Streaming does not materialize the entire table*. It
> reads the latest available data from the streaming data source, processes
> it incrementally to update the result, and then discards the source data.
> It only keeps around the minimal intermediate *state* data as required to
> update the result (e.g. intermediate counts in the earlier example).
>


On Tue, Aug 27, 2019 at 1:21 PM Nick Dawes  wrote:

> I have a quick newbie question.
>
> Spark Structured Streaming creates an unbounded dataframe that keeps
> appending rows to it.
>
> So what's the max size of data it can hold? What if the size becomes
> bigger than the JVM? Will it spill to disk? I'm using S3 as storage. So
> will it write temp data on S3 or on local file system of the cluster?
>
> Nick
>


Announcing Delta Lake 0.3.0

2019-08-01 Thread Tathagata Das
Hello everyone,

We are excited to announce the availability of Delta Lake 0.3.0 which
introduces new programmatic APIs for manipulating and managing data in
Delta Lake tables.

Here are the main features:


   -

   Scala/Java APIs for DML commands - You can now modify data in Delta Lake
   tables using programmatic APIs for *Delete*, *Update* and *Merge*. These
   APIs mirror the syntax and semantics of their corresponding SQL commands
   and are great for many workloads, e.g., Slowly Changing Dimension (SCD)
   operations, merging change data for replication, and upserts from streaming
   queries. See the documentation
    for more details.



   -

   Scala/Java APIs for query commit history - You can now query a table’s
   commit history to see what operations modified the table. This enables you
   to audit data changes, time travel queries on specific versions, debug and
   recover data from accidental deletions, etc. See the documentation
    for
   more details.



   -

   Scala/Java APIs for vacuuming old files - Delta Lake uses MVCC to enable
   snapshot isolation and time travel. However, keeping all versions of a
   table forever can be prohibitively expensive. Stale snapshots (as well as
   other uncommitted files from aborted transactions) can be garbage collected
   by vacuuming the table. See the documentation
    for more details.


To try out Delta Lake 0.3.0, please follow the Delta Lake Quickstart:
https://docs.delta.io/0.3.0/quick-start.html

To view the release notes:
https://github.com/delta-io/delta/releases/tag/v0.3.0

We would like to thank all the community members for contributing to this
release.

TD


Re: Announcing Delta Lake 0.2.0

2019-06-21 Thread Tathagata Das
@ayan guha  @Gourav Sengupta

Delta Lake is OSS currently does not support defining tables in Hive
metastore using DDL commands. We are hoping to add the necessary
compatibility fixes in Apache Spark to make Delta Lake work with tables and
DDL commands. So we will support them in a future release. In the meantime,
please read/write Delta tables using paths.

TD

On Fri, Jun 21, 2019 at 12:49 AM Gourav Sengupta 
wrote:

> Hi Ayan,
>
> I may be wrong about this, but I think that Delta files are in Parquet
> format. But I am sure that you have already checked this. Am I missing
> something?
>
> Regards,
> Gourav Sengupta
>
> On Fri, Jun 21, 2019 at 6:39 AM ayan guha  wrote:
>
>> Hi
>> We used spark.sql to create a table using DELTA. We also have a hive
>> metastore attached to the spark session. Hence, a table gets created in
>> Hive metastore. We then tried to query the table from Hive. We faced
>> following issues:
>>
>>1. SERDE is SequenceFile, should have been Parquet
>>2. Scema fields are not passed.
>>
>> Essentially the hive DDL looks like:
>>
>> *CREATE TABLE `TABLE NAME`(**  `col` array COMMENT 'from
>> deserializer')*
>>
>> *ROW FORMAT SERDE **
>> 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe' **WITH
>> SERDEPROPERTIES ( **  'path'=WASB PATH**')  **STORED AS INPUTFORMAT *
>> *  'org.apache.hadoop.mapred.SequenceFileInputFormat'*
>>
>> *OUTPUTFORMAT **
>> 'org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat'  **LOCATION **
>> '* *WASB PATH'*
>>
>> *TBLPROPERTIES ( **  'spark.sql.create.version'='2.4.0',**
>> 'spark.sql.sources.provider'='DELTA',**
>> 'spark.sql.sources.schema.numParts'='1',*
>> *  'spark.sql.sources.schema.part.0'='{\"type\":\"struct\",\"fields\":[]}',**
>> 'transient_lastDdlTime'='1556544657')*
>>
>> Is this expected? And will the use case be supported in future releases?
>>
>>
>> We are now experimenting
>>
>> Best
>>
>> Ayan
>>
>> On Fri, Jun 21, 2019 at 11:06 AM Liwen Sun 
>> wrote:
>>
>>> Hi James,
>>>
>>> Right now we don't have plans for having a catalog component as part of
>>> Delta Lake, but we are looking to support Hive metastore and also DDL
>>> commands in the near future.
>>>
>>> Thanks,
>>> Liwen
>>>
>>> On Thu, Jun 20, 2019 at 4:46 AM James Cotrotsios <
>>> jamescotrots...@gmail.com> wrote:
>>>
 Is there a plan to have a business catalog component for the Data Lake?
 If not how would someone make a proposal to create an open source project
 related to that. I would be interested in building out an open source data
 catalog that would use the Hive metadata store as a baseline for technical
 metadata.


 On Wed, Jun 19, 2019 at 3:04 PM Liwen Sun 
 wrote:

> We are delighted to announce the availability of Delta Lake 0.2.0!
>
> To try out Delta Lake 0.2.0, please follow the Delta Lake Quickstart:
> https://docs.delta.io/0.2.0/quick-start.html
>
> To view the release notes:
> https://github.com/delta-io/delta/releases/tag/v0.2.0
>
> This release introduces two main features:
>
> *Cloud storage support*
> In addition to HDFS, you can now configure Delta Lake to read and
> write data on cloud storage services such as Amazon S3 and Azure Blob
> Storage. For configuration instructions, please see:
> https://docs.delta.io/0.2.0/delta-storage.html
>
> *Improved concurrency*
> Delta Lake now allows concurrent append-only writes while still
> ensuring serializability. For concurrency control in Delta Lake, please
> see: https://docs.delta.io/0.2.0/delta-concurrency.html
>
> We have also greatly expanded the test coverage as part of this
> release.
>
> We would like to acknowledge all community members for contributing to
> this release.
>
> Best regards,
> Liwen Sun
>
>
>>
>> --
>> Best Regards,
>> Ayan Guha
>>
>


Re: How to execute non-timestamp-based aggregations in spark structured streaming?

2019-04-22 Thread Tathagata Das
SQL windows with the 'over' syntax does not work in Structured Streaming.
It is very hard to incrementalize that in the general case. Hence non-time
windows are not supported.

On Sat, Apr 20, 2019, 2:16 PM Stephen Boesch  wrote:

> Consider the following *intended* sql:
>
> select row_number()
>   over (partition by Origin order by OnTimeDepPct desc) OnTimeDepRank,*
>   from flights
>
> This will *not* work in *structured streaming* : The culprit is:
>
>  partition by Origin
>
> The requirement is to use a timestamp-typed field such as
>
>  partition by flightTime
>
> Tathagata Das (core committer for *spark streaming*) - replies on that in
> a nabble thread:
>
>  The traditional SQL windows with `over` is not supported in streaming.
> Only time-based windows, that is, `window("timestamp", "10 minutes")` is
> supported in streaming
>
> *W**hat then* for my query above - which *must* be based on the *Origin* 
> field?
> What is the closest equivalent to that query? Or what would be a workaround
> or different approach to achieve same results?
>


Re: Iterator of KeyValueGroupedDataset.flatMapGroupsWithState function

2018-10-31 Thread Tathagata Das
It is okay to collect the iterator. That will not break Spark. However,
collecting it requires memory in the executor, so you may cause OOMs if a
group has a LOT of new data.

On Wed, Oct 31, 2018 at 3:44 AM Antonio Murgia -
antonio.murg...@studio.unibo.it  wrote:

> Hi all,
>
> I'm currently developing a Spark Structured Streaming job and I'm
> performing flatMapGroupsWithState.
>
> I'm concerned about the laziness of the Iterator[V] that is passed to my
> custom function (func: (K, Iterator[V], GroupState[S]) => Iterator[U]).
>
> Is it ok to collect that iterator (with a toList, for example)? I have a
> logic that is practically impossible to perform on a Iterator, but I do not
> want to break Spark lazy chain, obviously.
>
>
> Thank you in advance.
>
>
> #A.M.
>


Re: [Structured Streaming] Two watermarks and StreamingQueryListener

2018-08-10 Thread Tathagata Das
Structured Streaming internally maintains one global watermark by taking a
min of the two watermarks. Thats why one gets reported. In Spark 2.4, there
will be the option of choosing max instead of min.

Just curious. Why do you have to two watermarks? Whats the query like.

TD

On Thu, Aug 9, 2018 at 3:15 PM, subramgr 
wrote:

> Hi,
>
> We have two *flatMapGroupWithState* in our job and we have two
> *withWatermark*
>
> We are getting the event max time, event time and watermarks from
> *QueryProgressEvent*.
>
> Right now it just returns one *watermark* value.
>
> Are two watermarks maintained by Spark or just one.
> If one which one
> If one watermark is maintained per *Dataframe* how do I get the values for
> them ?
>
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: How to read json data from kafka and store to hdfs with spark structued streaming?

2018-07-26 Thread Tathagata Das
Are you writing multiple streaming query output to the same location? If
so, I can see this error occurring. Multiple streaming queries writing to
the same directory is not supported.

On Tue, Jul 24, 2018 at 3:38 PM, dddaaa  wrote:

> I'm trying to read json messages from kafka and store them in hdfs with
> spark
> structured streaming.
>
> I followed the example here:
> https://spark.apache.org/docs/2.1.0/structured-streaming-
> kafka-integration.html
>
> and when my code looks like this:
>
> df = spark \
>   .read \
>   .format("kafka") \
>   .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
>   .option("subscribe", "topic1") \
>   .load()
> df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
> df.writeStream.format("json").option("checkpointLocation",
> "some/hdfs/path").start(/data")
> Then I get rows with binary values in hdfs.
>
> {"value":"BINARY
> DATA","topic":"test_hdfs2","partition":0,"offset":3463075,
> "timestamp":"2018-07-24T20:51:33.655Z","timestampType":0}
>
> These rows are continually written as expected, but in the binary format.
>
> I found this post:
>
> https://databricks.com/blog/2017/04/26/processing-data-in-
> apache-kafka-with-structured-streaming-in-apache-spark-2-2.html
>
> and I'm trying to implement this example:
>
> schema = StructType().add("a", IntegerType()).add("b", StringType())
> df.select( \
>   col("key").cast("string"),
>   from_json(col("value").cast("string"), schema))
> But here I get an odd behvaiur. I have a small file written to hdfs with
> multiple empty json rows - {}
>
> and very quickly the jobs fails with the following excption:
>
> 18/07/24 22:25:47 ERROR datasources.FileFormatWriter: Aborting job null.
> java.lang.IllegalStateException:
> hdfs://SOME_PATH/_spark_metadata/399.compact doesn't exist when compacting
> batch 409 (compactInterval: 10) at
> org.apache.spark.sql.execution.streaming.CompactibleFileStreamLog$$
> anonfun$4$$anonfun$apply$1.apply(CompactibleFileStreamLog.scala:174)
> at
> org.apache.spark.sql.execution.streaming.CompactibleFileStreamLog$$
> anonfun$4$$anonfun$apply$1.apply(CompactibleFileStreamLog.scala:174)
> at scala.Option.getOrElse(Option.scala:121) at
> org.apache.spark.sql.execution.streaming.CompactibleFileStreamLog$$
> anonfun$4.apply(CompactibleFileStreamLog.scala:173)
> at
> org.apache.spark.sql.execution.streaming.CompactibleFileStreamLog$$
> anonfun$4.apply(CompactibleFileStreamLog.scala:172)
> at
> scala.collection.TraversableLike$$anonfun$map$
> 1.apply(TraversableLike.scala:234)
> at
> scala.collection.TraversableLike$$anonfun$map$
> 1.apply(TraversableLike.scala:234)
> at scala.collection.immutable.NumericRange.foreach(NumericRange.scala:73)
> at
> scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at
> scala.collection.AbstractTraversable.map(Traversable.scala:104) at
> org.apache.spark.sql.execution.streaming.CompactibleFileStreamLog.compact(
> CompactibleFileStreamLog.scala:172)
> at
> org.apache.spark.sql.execution.streaming.CompactibleFileStreamLog.add(
> CompactibleFileStreamLog.scala:156)
> at
> org.apache.spark.sql.execution.streaming.ManifestFileCommitProtocol.
> commitJob(ManifestFileCommitProtocol.scala:64)
> at
> org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(
> FileFormatWriter.scala:213)
> at
> org.apache.spark.sql.execution.streaming.FileStreamSink.addBatch(
> FileStreamSink.scala:123)
> at
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$
> org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$
> 3$$anonfun$apply$16.apply(MicroBatchExecution.scala:477)
> at
> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(
> SQLExecution.scala:77)
> at
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$
> org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$
> 3.apply(MicroBatchExecution.scala:475)
> at
> org.apache.spark.sql.execution.streaming.ProgressReporter$class.
> reportTimeTaken(ProgressReporter.scala:271)
> at
> org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(
> StreamExecution.scala:58)
> at
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$
> apache$spark$sql$execution$streaming$MicroBatchExecution$
> $runBatch(MicroBatchExecution.scala:474)
> at
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$
> runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(
> MicroBatchExecution.scala:133)
> at
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$
> runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(
> MicroBatchExecution.scala:121)
> at
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$
> runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(
> MicroBatchExecution.scala:121)
> at
> org.apache.spark.sql.execution.streaming.ProgressReporter$class.
> reportTimeTaken(ProgressReporter.scala:271)
> at
> 

Re: Exceptions with simplest Structured Streaming example

2018-07-26 Thread Tathagata Das
Unfortunately, your output is not visible in the email that we see. Was it
an image that some got removed?
Maybe best to copy the output text (i.e. the error message) into the email.

On Thu, Jul 26, 2018 at 5:41 AM, Jonathan Apple 
wrote:

> Hello,
>
> There is a streaming World Count example at the beginning of the
> Structured
> Streaming Programming Guide
>  streaming-programming-guide.html>
> .
>
> First, we execute *nc -lk * in a separate terminal.
>
> Next, following the Python code, we have this in *example.py:*
>
>
>
> We test the application by:
>
> *
> spark-submit example.py
> *
>
> The application runs and waits for data on the socket. We type single words
> several times in the terminal running netcat, each time with carriage
> returns.
>
> The application is failing every time. Here is some (snipped) output:
>
>
>
> What is strange is the the exact same example works for other members of
> the
> team, using the same Python version (3.7.0) and the same Spark version
> (2.3.1).
>
> Has anyone seen similar behavior?
>
> Many thanks,
>
> Jonathan
>
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Dataset - withColumn and withColumnRenamed that accept Column type

2018-07-17 Thread Tathagata Das
Yes. Yes you can.

On Tue, Jul 17, 2018 at 11:42 AM, Sathi Chowdhury  wrote:

> Hi,
> My question is about ability to integrate spark streaming with multiple
> clusters.Is it a supported use case. An example of that is that two topics
> owned by different group and they have their own kakka infra .
> Can i have two dataframes as a result of spark.readstream listening to
> different kafka clueters in the same spark screaming job?
> Any one has solved this usecase before?
>
>
> Thanks.
> Sathi
>


Re: [Structured Streaming] Custom StateStoreProvider

2018-07-10 Thread Tathagata Das
Note that this is not public API yet. Hence this is not very documented. So
use it at your own risk :)

On Tue, Jul 10, 2018 at 11:04 AM, subramgr 
wrote:

> Hi,
>
> This looks very daunting *trait* is there some blog post or some articles
> which explains on how to implement this *trait*
>
> Thanks
> Girish
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: [Structured Streaming] Reading Checkpoint data

2018-07-09 Thread Tathagata Das
Only the stream metadata (e.g., streamid, offsets) are stored as json. The
stream state data is stored in an internal binary format.

On Mon, Jul 9, 2018 at 4:07 PM, subramgr 
wrote:

> Hi,
>
> I read somewhere that with Structured Streaming all the checkpoint data is
> more readable (Json) like. Is there any documentation on how to read the
> checkpoint data.
>
> If I do `hadoop fs -ls` on the `state` directory I get some encoded data.
>
> Thanks
> Girish
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: How to branch a Stream / have multiple Sinks / do multiple Queries on one Stream

2018-07-05 Thread Tathagata Das
Hey all,

In Spark 2.4.0, there will be a new feature called *foreachBatch* which
will expose the output rows of every micro-batch as a dataframe, on which
you apply a user-defined function. With that, you can reuse existing batch
sources for writing results as well as write results to multiple locations.

*Reuse existing batch data sources*
For many storage systems, there may not be a streaming sink available yet,
but there may already exist a data writer for batch queries. Using
foreachBatch(), you can use the batch data writers on the output of each
micro-batch. For example writing from a stream to cassandra using the
Cassandra Spark Connector will be like

streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long)
=>
  batchDF.write.cassandraFormat(...).save(...)
}

*Write to multiple locations*
If you want to write the output of a streaming query to multiple locations,
then you can simply write the output DataFrame/Dataset multiple times.
However, each attempt to write can cause the output data to be recomputed
(including possible re-reading of the input data). To avoid recomputations,
you should cache the output DataFrame/Dataset, write it to multiple
locations, and then uncache it. Here is an outline.

streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long)
=>
  batchDF.cache()
  batchDF.write.format(...).save(...)  // location 1
  batchDF.write.format(...).save(...)  // location 2
  batchDF.uncache()
}

*Apply additional DataFrame operations*
Many DataFrame and Dataset operations are not supported in streaming
DataFrames because Spark does not support generating incremental plans in
those cases. Using foreachBatch() you can apply some of these operations on
each micro-batch output. However, you will have to reason about the
end-to-end semantics of doing that operation yourself.

*NOTE: *By default foreachBatch() provides only at-least-once write
guarantees. However, you can use the batchId provided to the function as a
way to deduplicate the output and get an exactly-once guarantee.

TD

On Thu, Jul 5, 2018 at 12:33 AM, Amiya Mishra <
amiya.mis...@bitwiseglobal.com> wrote:

> Hi Chandan/Jürgen,
>
> I had tried through a native code having single input data frame with
> multiple sinks as :
>
> Spark provides a method called awaitAnyTermination() in
> StreamingQueryManager.scala which provides all the required details to
> handle the query processed by spark.By observing documentation of spark
> with
> below points :
> -> Wait until any of the queries on the associated
> SQLContext has
> terminated since the creation of the context, or since `resetTerminated()`
> was called. If any query was terminated
> -> If a query has terminated, then subsequent calls to
> `awaitAnyTermination()` will either return immediately (if the query was
> terminated  by `query.stop()`),or throw the exception immediately (if the
> query was terminated with exception). Use `resetTerminated()` to clear past
> terminations and wait for new terminations.
> -> In the case where multiple queries have terminated since
> `resetTermination()` was called, if any query has terminated with
> exception,
> when `awaitAnyTermination()` will throw any of the exception. For correctly
> documenting exceptions across multiple queries,users need to  stop all of
> them after any of them terminates with exception, and then check the
> `query.exception()` for each query.
>
>
> val inputdf:DataFrame =
> sparkSession.readStream.schema(schema).format("csv").
> option("delimiter",",").csv("src/main/streamingInput")
> query1 =
> inputdf.writeStream.option("path","first_output").option("
> checkpointLocation","checkpointloc").format("csv").start()
> query2 =
> inputdf.writeStream.option("path","second_output").option(
> "checkpointLocation","checkpoint2").format("csv").start()
> sparkSession.streams.awaitAnyTermination()
>
>
> Now, both "first_output" and "second_output" file write successfully.
>
> Try it out on your site and let me know if you found any limitation.And try
> to posting if you found any other way.
>
> Let me correct if i had grammatical mistake.
>
> Thanks
> Amiya
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Lag and queued up batches info in Structured Streaming UI

2018-06-28 Thread Tathagata Das
The SQL plan of each micro-batch in the Spark UI (SQL tab) has links to the
actual Spark jobs that ran in the micro-batch. From that you can drill down
into the stage information. I agree that its not there as a nice per-stream
table as with the Streaming tab, but all the information is present if you
dig for it.

On Wed, Jun 27, 2018 at 4:14 PM, swetha kasireddy  wrote:

> Thanks TD, but the sql plan does not seem to provide any information on
> which stage is taking longer time or to identify any bottlenecks about
> various stages. Spark kafka Direct used to provide information about
> various stages in a micro batch and the time taken by each stage. Is there
> a way to find out stage level information like time take by each stage,
> shuffle read/write data etc? Do you have any documentation on how to use
> SQL tab for troubleshooting?
>
> On Wed, Jun 20, 2018 at 6:07 PM, Tathagata Das <
> tathagata.das1...@gmail.com> wrote:
>
>> Also, you can get information about the last progress made (input rates,
>> etc.) from StreamingQuery.lastProgress, StreamingQuery.recentProgress, and
>> using StreamingQueryListener.
>> Its all documented - https://spark.apache.org/docs/
>> latest/structured-streaming-programming-guide.html#monitorin
>> g-streaming-queries
>>
>> On Wed, Jun 20, 2018 at 6:06 PM, Tathagata Das <
>> tathagata.das1...@gmail.com> wrote:
>>
>>> Structured Streaming does not maintain a queue of batch like DStream.
>>> DStreams used to cut off batches at a fixed interval and put in a queue,
>>> and a different thread processed queued batches. In contrast, Structured
>>> Streaming simply cuts off and immediately processes a batch after the
>>> previous batch finishes. So the question about queue size and lag does not
>>> apply to Structured Streaming.
>>>
>>> That said, there is no UI for Structured Streaming. You can see the sql
>>> plans for each micro-batch in the SQL tab.
>>>
>>>
>>>
>>>
>>>
>>> On Wed, Jun 20, 2018 at 12:12 PM, SRK  wrote:
>>>
>>>> hi,
>>>>
>>>> How do we get information like lag and queued up batches in Structured
>>>> streaming? Following api does not seem to give any info about  lag and
>>>> queued up batches similar to DStreams.
>>>>
>>>> https://spark.apache.org/docs/2.2.1/api/java/org/apache/spar
>>>> k/streaming/scheduler/BatchInfo.html
>>>>
>>>> Thanks!
>>>>
>>>>
>>>>
>>>>
>>>> --
>>>> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>>>>
>>>> -
>>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>>>
>>>>
>>>
>>
>


Re: How to reduceByKeyAndWindow in Structured Streaming?

2018-06-28 Thread Tathagata Das
The fundamental conceptual difference between the windowing in DStream vs
Structured Streaming is that DStream used the arrival time of the record in
Spark (aka processing time) and Structured Streaming using event time. If
you want to exactly replicate DStream's processing time windows in
Structured Streaming, then you an just add the current timestamp as an
additional column in the DataFrame and group by using that.

streamingDF
.withColumn("processing_time", current_timestamp())
.groupBy($"key", window($"processing_time", "5 minutes"))
.agg(sum($"value") as "total")


On Thu, Jun 28, 2018 at 2:24 AM, Gerard Maas  wrote:

> Hi,
>
> In Structured Streaming lingo, "ReduceByKeyAndWindow" would be a window
> aggregation with a composite key.
> Something like:
> stream.groupBy($"key", window($"timestamp", "5 minutes"))
>.agg(sum($"value") as "total")
>
> The aggregate could be any supported SQL function.
> Is this what you are looking for? Otherwise, share your specific use case
> to see how it could be implemented in Structured Streaming.
>
> kr, Gerard.
>
> On Thu, Jun 28, 2018 at 10:21 AM oripwk  wrote:
>
>> In Structured Streaming, there's the notion of event-time windowing:
>>
>>
>>
>> However, this is not quite similar to DStream's windowing operations: in
>> Structured Streaming, windowing groups the data by fixed time-windows, and
>> every event in a time window is associated to its group:
>>
>>
>> And in DStreams it just outputs all the data according to a limited window
>> in time (last 10 minutes for example).
>>
>> The question was asked also  here
>> > someway-to-do-the-eqivalent-of-reducebykeyandwindow-in-spark-structured>
>> , if it makes it clearer.
>>
>> How the latter can be achieved in Structured Streaming?
>>
>>
>>
>> --
>> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>>


Re: Recommendation of using StreamSinkProvider for a custom KairosDB Sink

2018-06-25 Thread Tathagata Das
This is interface is actually unstable. The v2 of DataSource APIs is being
designed right now which will be public and stable in a release or two. So
unfortunately there is no stable interface right now that I can officially
recommend.

That said, you could always use the ForeachWriter interface (see
DataStreamWriter.foreach).
Also, in the next release, you will also have a foreachBatch interface that
allows you to do custom operation on the output of each micro-batch
represented as a DataFrame (exactly same as the Sink.addBatch).
Both of these should be useful for you until the interfaces are stabilized.

On Mon, Jun 25, 2018 at 9:55 AM, subramgr 
wrote:

> We are using Spark 2.3 and would want to know if it is recommended to
> create
> a custom KairoDBSink by implementing the StreamSinkProvider ?
>
> The interface is marked experimental and in-stable ?
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Does Spark Structured Streaming have a JDBC sink or Do I need to use ForEachWriter?

2018-06-21 Thread Tathagata Das
Actually, we do not support jdbc sink yet. The blog post was just an
example :) I agree it is misleading in hindsight.

On Wed, Jun 20, 2018 at 6:09 PM, kant kodali  wrote:

> Hi All,
>
> Does Spark Structured Streaming have a JDBC sink or Do I need to use
> ForEachWriter? I see the following code in this link
> 
>  and
> I can see that database name can be passed in the connection string,
> however, I wonder how to pass a table name?
>
> inputDF.groupBy($"action", window($"time", "1 hour")).count()
>.writeStream.format("jdbc")
>.save("jdbc:mysql//…")
>
>
> Thanks,
> Kant
>


Re: Lag and queued up batches info in Structured Streaming UI

2018-06-20 Thread Tathagata Das
Also, you can get information about the last progress made (input rates,
etc.) from StreamingQuery.lastProgress, StreamingQuery.recentProgress, and
using StreamingQueryListener.
Its all documented -
https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#monitoring-streaming-queries

On Wed, Jun 20, 2018 at 6:06 PM, Tathagata Das 
wrote:

> Structured Streaming does not maintain a queue of batch like DStream.
> DStreams used to cut off batches at a fixed interval and put in a queue,
> and a different thread processed queued batches. In contrast, Structured
> Streaming simply cuts off and immediately processes a batch after the
> previous batch finishes. So the question about queue size and lag does not
> apply to Structured Streaming.
>
> That said, there is no UI for Structured Streaming. You can see the sql
> plans for each micro-batch in the SQL tab.
>
>
>
>
>
> On Wed, Jun 20, 2018 at 12:12 PM, SRK  wrote:
>
>> hi,
>>
>> How do we get information like lag and queued up batches in Structured
>> streaming? Following api does not seem to give any info about  lag and
>> queued up batches similar to DStreams.
>>
>> https://spark.apache.org/docs/2.2.1/api/java/org/apache/spar
>> k/streaming/scheduler/BatchInfo.html
>>
>> Thanks!
>>
>>
>>
>>
>> --
>> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>>
>


Re: Lag and queued up batches info in Structured Streaming UI

2018-06-20 Thread Tathagata Das
Structured Streaming does not maintain a queue of batch like DStream.
DStreams used to cut off batches at a fixed interval and put in a queue,
and a different thread processed queued batches. In contrast, Structured
Streaming simply cuts off and immediately processes a batch after the
previous batch finishes. So the question about queue size and lag does not
apply to Structured Streaming.

That said, there is no UI for Structured Streaming. You can see the sql
plans for each micro-batch in the SQL tab.





On Wed, Jun 20, 2018 at 12:12 PM, SRK  wrote:

> hi,
>
> How do we get information like lag and queued up batches in Structured
> streaming? Following api does not seem to give any info about  lag and
> queued up batches similar to DStreams.
>
> https://spark.apache.org/docs/2.2.1/api/java/org/apache/
> spark/streaming/scheduler/BatchInfo.html
>
> Thanks!
>
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: [structured-streaming][parquet] readStream files order in Parquet

2018-06-15 Thread Tathagata Das
The files are processed in the order the file last modified timestamp. The
path and partitioning scheme are not used for ordering.


On Thu, Jun 14, 2018 at 6:59 AM, karthikjay  wrote:

> My parquet files are first partitioned by environment and then by date
> like:
>
> env=testing/
>date=2018-03-04/
>   part1.parquet
>   part2.parquet
>   part3.parquet
>date=2018-03-05/
>   part1.parquet
>   part2.parquet
>   part3.parquet
>date=2018-03-06/
>   part1.parquet
>   part2.parquet
>   part3.parquet
> In our read stream, I do the following:
>
> val tunerParquetDF = spark
>   .readStream
>   .schema(...)
>   .format("parquet")
>   .option("basePath", basePath)
>   .option("path", basePath+"/env*")
>   .option("maxFilesPerTrigger", 5)
>   .load()
>
> The expected behavior is that readstream will read files in order of the
> dates but the observed behavior is that files are shuffled in random order.
> How do I force the date order of read in Parquet files ?
>
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Spark can't identify the event time column being supplied to withWatermark()

2018-06-08 Thread Tathagata Das
Glad that it worked out! It's unfortunate that there exist such pitfalls.
And there is no easy way to get around it.
If you can, let us know how your experience with mapGroupsWithState has
been.

TD

On Fri, Jun 8, 2018 at 1:49 PM, frankdede 
wrote:

> You are exactly right! A few hours ago, I tried many things and finally got
> the example working by defining event timestamp column before groupByKey,
> just like what you suggested, but I wasn't able to figure out the reasoning
> behind my fix.
>
> val sessionUpdates = events
>   .withWatermark("timestamp", "10 seconds")
>   .groupByKey(event => event.sessionId)
>   .mapGroupsWithState[SessionInfo,
> SessionUpdate](GroupStateTimeout.EventTimeTimeout())
>
> It turns out that it's just impossible for the planner to figure out the
> source of the watermark column after applied flatMap.
>
> Thanks Tathagata!
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Reset the offsets, Kafka 0.10 and Spark

2018-06-08 Thread Tathagata Das
Structured Streaming really makes this easy. You can simply specify the
option of whether the start the query from earliest or latest.
Check out
-
https://www.slideshare.net/databricks/a-deep-dive-into-structured-streaming
-
https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html

On Thu, Jun 7, 2018 at 1:27 PM, Guillermo Ortiz Fernández <
guillermo.ortiz.f...@gmail.com> wrote:

> I'm consuming data from Kafka with  createDirectStream and store the
> offsets in Kafka (https://spark.apache.org/docs/2.1.0/streaming-kafka-0-
> 10-integration.html#kafka-itself)
>
> val stream = KafkaUtils.createDirectStream[String, String](
>   streamingContext,
>   PreferConsistent,
>   Subscribe[String, String](topics, kafkaParams))
>
>
>
> My Spark version is 2.0.2 and 0.10 from Kafka. This solution works well
> and when I restart the spark process starts from the last offset which
> Spark consumes, but sometimes I need to reprocess all the topic from the
> beginning.
>
> I have seen that I could reset the offset with a kafka script but it's not
> enable in Kafka 0.10...
>
> kafka-consumer-groups --bootstrap-server kafka-host:9092 --group
> my-group --reset-offsets --to-earliest --all-topics --execute
>
>
> Another possibility it's to set another kafka parameter in the
> createDirectStream with a map with the offsets but, how could I get first
> offset from each partition?, I have checked the api from the new consumer
> and I don't see any method to get these offsets.
>
> Any other way?? I could start with another groupId as well, but it doesn't
> seem a very clean option for production.
>


Re: Spark can't identify the event time column being supplied to withWatermark()

2018-06-08 Thread Tathagata Das
Try to define the watermark on the right column immediately before calling
`groupByKey(...).mapGroupsWithState(...)`. You are applying the watermark
and then doing a bunch of opaque transformation (user-defined flatMap that
the planner has no visibility into). This prevents the planner from
propagating the watermark tag through such operations.

Specifically, you are applying a flatMap that takes a timestmap and
splitting into multiple records with timestamp columns. The SQL
analyzer/planner cannot possibly reason from the opaque user-defined code
that the generated timestamp is same or different as the input timestamp
column, hence it cannot propagate the watermark information down to the
mapGropuswithState.


Hope this helps.

On Fri, Jun 8, 2018 at 7:50 AM, frankdede 
wrote:

> I was trying to find a way to resessionize features in different events
> based
> on the event timestamps using Spark and I found a code example that uses
> mapGroupsWithStateto resessionize events using processing timestamps in
> their repo.
>
> https://github.com/apache/spark/blob/v2.3.0/examples/
> src/main/scala/org/apache/spark/examples/sql/streaming/
> StructuredSessionization.scala
>
> To quickly test if this sessionization thing works with event timestamps, I
> added withWatermark("timestamp", "10 seconds") (treating processing time as
> the event timestamp) and changed ProcessingTimeTimeout to EventTimeTimeout.
>
>   val lines = spark.readStream
>   .format("socket")
>   .option("host", host)
>   .option("port", port)
>   .option("includeTimestamp", value = true)
>   .load()
>
>  // Split the lines into words, treat words as sessionId of events
>  val events = lines
>   .withWatermark("timestamp", "10 seconds") // added
>   .as[(String, Timestamp)]
>   .flatMap { case (line, timestamp) =>
> line.split(" ").map(word => Event(sessionId = word, timestamp))
>   }
>
>  val sessionUpdates = events
>   .groupByKey(event => event.sessionId)
>   .mapGroupsWithState[SessionInfo,
> SessionUpdate].(GroupStateTimeout.EventTimeTimeout) {
>...
>   }
>
>   // Start running the query that prints the session updates to the console
>  val query = sessionUpdates
>   .writeStream
>   .outputMode("update")
>   .format("console")
>   .start()
>
>  query.awaitTermination()
> However,when I ran it, Spark threw org.apache.spark.sql.AnalysisException
> and said that Watermark must be specified in the query using
> '[Dataset/DataFrame].withWatermark()' for using event-time timeout in a
> [map|flatMap]GroupsWithState. Event-time timeout not supported without
> watermark, which is not true and confusing, because that 'timestamp' column
> is clearly in the physical plan following that exception message:
>
> ...
> +- EventTimeWatermark timestamp#3: timestamp, interval 10 seconds
>+- StreamingRelation
> DataSource(org.apache.spark.sql.SparkSession@394a6d2b,socket,List(),...,
> [value#2, timestamp#3]
> Did I miss something or did something wrong?
>
> Thanks in advance!
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: OOM: Structured Streaming aggregation state not cleaned up properly

2018-05-22 Thread Tathagata Das
Just to be clear, these screenshots are about the memory consumption of the
driver. So this is nothing to do with streaming aggregation state which are
kept in the memory of the executors, not the driver.

On Tue, May 22, 2018 at 10:21 AM, Jungtaek Lim  wrote:

> 1. Could you share your Spark version?
> 2. Could you reduce "spark.sql.ui.retainedExecutions" and see whether it
> helps? This configuration is available in 2.3.0, and default value is 1000.
>
> Thanks,
> Jungtaek Lim (HeartSaVioR)
>
> 2018년 5월 22일 (화) 오후 4:29, weand 님이 작성:
>
>> You can see it even better on this screenshot:
>>
>> TOP Entries Collapsed #2
>> > file/t8542/27_001.png>
>>
>> Sorry for the spam, attached a not so perfect screen in the mail before.
>>
>>
>>
>> --
>> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>>


Re: can we use mapGroupsWithState in raw sql?

2018-04-16 Thread Tathagata Das
Unfortunately no. Honestly it does not make sense as for type-aware
operations like map, mapGroups, etc., you have to provide an actual JVM
function. That does not fit in with the SQL language structure.

On Mon, Apr 16, 2018 at 7:34 PM, kant kodali  wrote:

> Hi All,
>
> can we use mapGroupsWithState in raw SQL? or is it in the roadmap?
>
> Thanks!
>
>
>


Re: Structured Streaming on Kubernetes

2018-04-13 Thread Tathagata Das
Structured streaming is stable in production! At Databricks, we and our
customers collectively process almost 100s of billions of records per day
using SS. However, we are not using kubernetes :)

Though I don't think it will matter too much as long as kubes are correctly
provisioned+configured and you are checkpointing to HDFS (for
fault-tolerance guarantees).

TD

On Fri, Apr 13, 2018, 12:28 AM Krishna Kalyan 
wrote:

> Hello All,
> We were evaluating Spark Structured Streaming on Kubernetes (Running on
> GCP). It would be awesome if the spark community could share their
> experience around this. I would like to know more about you production
> experience and the monitoring tools you are using.
>
> Since spark on kubernetes is a relatively new addition to spark, I was
> wondering if structured streaming is stable in production. We were also
> evaluating Apache Beam with Flink.
>
> Regards,
> Krishna
>
>
>


Re: Does partition by and order by works only in stateful case?

2018-04-12 Thread Tathagata Das
The traditional SQL windows with `over` is not supported in streaming. Only
time-based windows, that is, `window("timestamp", "10 minutes")` is
supported in streaming.

On Thu, Apr 12, 2018 at 7:34 PM, kant kodali  wrote:

> Hi All,
>
> Does partition by and order by works only in stateful case?
>
> For example:
>
> select row_number() over (partition by id order by timestamp) from table
>
> gives me
>
> *SEVERE: Exception occured while submitting the query:
> java.lang.RuntimeException: org.apache.spark.sql.AnalysisException:
> Non-time-based windows are not supported on streaming DataFrames/Datasets;;*
>
> I wonder what time based window means? is it not the window from over()
> clause or does it mean group by(window('timestamp'), '10 minutes') like the
> stateful case?
>
> Thanks
>


Re: Nullpointerexception error when in repartition

2018-04-12 Thread Tathagata Das
Have you read through the documentation of Structured Streaming?
https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html

One of the basic mistakes you are making is defining the dataset as with
`spark.read()`. You define a streaming Dataset as `spark.readStream()`

On Thu, Apr 12, 2018 at 3:02 AM, Junfeng Chen <darou...@gmail.com> wrote:

> Hi, Tathagata
>
> I have tried structured streaming, but in line
>
>> Dataset rowDataset = spark.read().json(jsondataset);
>
>
> Always throw
>
>> Queries with streaming sources must be executed with writeStream.start()
>
>
> But what i need to do in this step is only transforming json string data
> to Dataset . How to fix it?
>
> Thanks!
>
>
> Regard,
> Junfeng Chen
>
> On Thu, Apr 12, 2018 at 3:08 PM, Tathagata Das <
> tathagata.das1...@gmail.com> wrote:
>
>> It's not very surprising that doing this sort of RDD to DF conversion
>> inside DStream.foreachRDD has weird corner cases like this. In fact, you
>> are going to have additional problems with partial parquet files (when
>> there are failures) in this approach. I strongly suggest that you use
>> Structured Streaming, which is designed to do this sort of processing. It
>> will take care of tracking the written parquet files correctly.
>>
>> TD
>>
>> On Wed, Apr 11, 2018 at 6:58 PM, Junfeng Chen <darou...@gmail.com> wrote:
>>
>>> I write a program to read some json data from kafka and purpose to save
>>> them to parquet file on hdfs.
>>> Here is my code:
>>>
>>>> JavaInputDstream stream = ...
>>>> JavaDstream rdd = stream.map...
>>>> rdd.repartition(taksNum).foreachRDD(VoldFunction<JavaRDD
>>>> stringjavardd->{
>>>> Dataset df = spark.read().json( stringjavardd ); // convert
>>>> json to df
>>>> JavaRDD rowJavaRDD = df.javaRDD().map...  //add some new fields
>>>> StructType type = df.schema()...; // constuct new type for new
>>>> added fields
>>>> Dataset<Row) newdf = spark.createDataFrame(rowJavaRDD.type);
>>>> //create new dataframe
>>>> newdf.repatition(taskNum).write().mode(SaveMode.Append).pati
>>>> tionedBy("appname").parquet(savepath); // save to parquet
>>>> })
>>>
>>>
>>>
>>> However, if I remove the repartition method of newdf in writing parquet
>>> stage, the program always throw nullpointerexception error in json convert
>>> line:
>>>
>>> Java.lang.NullPointerException
>>>>  at org.apache.spark.SparkContext.getPreferredLocs(SparkContext.
>>>> scala:1783)
>>>> ...
>>>
>>>
>>> While it looks make no sense, writing parquet operation should be in
>>> different stage with json transforming operation.
>>> So how to solve it? Thanks!
>>>
>>> Regard,
>>> Junfeng Chen
>>>
>>
>>
>


Re: Nullpointerexception error when in repartition

2018-04-12 Thread Tathagata Das
It's not very surprising that doing this sort of RDD to DF conversion
inside DStream.foreachRDD has weird corner cases like this. In fact, you
are going to have additional problems with partial parquet files (when
there are failures) in this approach. I strongly suggest that you use
Structured Streaming, which is designed to do this sort of processing. It
will take care of tracking the written parquet files correctly.

TD

On Wed, Apr 11, 2018 at 6:58 PM, Junfeng Chen  wrote:

> I write a program to read some json data from kafka and purpose to save
> them to parquet file on hdfs.
> Here is my code:
>
>> JavaInputDstream stream = ...
>> JavaDstream rdd = stream.map...
>> rdd.repartition(taksNum).foreachRDD(VoldFunction> stringjavardd->{
>> Dataset df = spark.read().json( stringjavardd ); // convert
>> json to df
>> JavaRDD rowJavaRDD = df.javaRDD().map...  //add some new fields
>> StructType type = df.schema()...; // constuct new type for new added
>> fields
>> Dataset> //create new dataframe
>> newdf.repatition(taskNum).write().mode(SaveMode.Append).
>> patitionedBy("appname").parquet(savepath); // save to parquet
>> })
>
>
>
> However, if I remove the repartition method of newdf in writing parquet
> stage, the program always throw nullpointerexception error in json convert
> line:
>
> Java.lang.NullPointerException
>>  at org.apache.spark.SparkContext.getPreferredLocs(SparkContext.
>> scala:1783)
>> ...
>
>
> While it looks make no sense, writing parquet operation should be in
> different stage with json transforming operation.
> So how to solve it? Thanks!
>
> Regard,
> Junfeng Chen
>


Re: Does structured streaming support Spark Kafka Direct?

2018-04-12 Thread Tathagata Das
The parallelism is same for Structured Streaming. In fact, the Kafka
Structured Streaming source is based on the same principle as DStream's
Kafka Direct, hence it has very similar behavior.


On Tue, Apr 10, 2018 at 11:03 PM, SRK  wrote:

> hi,
>
> We have code based on Spark Kafka Direct in production and we want to port
> this code to Structured Streaming. Does structured streaming support spark
> kafka direct? What are the configs for parallelism and scalability in
> structured streaming? In Spark Kafka Direct, the number of kafka partitions
> take care of parallelism when doing the consumption. Is it the same case
> with Structured Streaming?
>
> Thanks for the help in Advance!
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Apache Spark Structured Streaming - Kafka Consumer cannot fetch records for offset exception

2018-03-22 Thread Tathagata Das
Structured Streaming AUTOMATICALLY saves the offsets in a checkpoint
directory that you provide. And when you start the query again with the
same directory it will just pick up where it left off.
https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#recovering-from-failures-with-checkpointing

On Thu, Mar 22, 2018 at 8:06 PM, M Singh 
wrote:

> Hi:
>
> I am working with Spark (2.2.1) and Kafka (0.10) on AWS EMR and for the
> last few days, after running the application for 30-60 minutes get
> exception from Kafka Consumer included below.
>
> The structured streaming application is processing 1 minute worth of data
> from kafka topic. So I've tried increasing request.timeout.ms from 4
> seconds default to 45000 seconds and receive.buffer.bytes to 1mb but still
> get the same exception.
>
> Is there any spark/kafka configuration that can save the offset and retry
> it next time rather than throwing an exception and killing the application.
>
> I've tried googling but have not found substantial
> solution/recommendation.  If anyone has any suggestions or a different
> version etc, please let me know.
>
> Thanks
>
> Here is the exception stack trace.
>
> java.util.concurrent.TimeoutException: Cannot fetch record for offset
>  in 12 milliseconds
> at org.apache.spark.sql.kafka010.CachedKafkaConsumer.org$
> apache$spark$sql$kafka010$CachedKafkaConsumer$$
> fetchData(CachedKafkaConsumer.scala:219)
> at org.apache.spark.sql.kafka010.CachedKafkaConsumer$$anonfun$get$1.apply(
> CachedKafkaConsumer.scala:117)
> at org.apache.spark.sql.kafka010.CachedKafkaConsumer$$anonfun$get$1.apply(
> CachedKafkaConsumer.scala:106)
> at org.apache.spark.util.UninterruptibleThread.runUninterruptibly(
> UninterruptibleThread.scala:85)
> at org.apache.spark.sql.kafka010.CachedKafkaConsumer.
> runUninterruptiblyIfPossible(CachedKafkaConsumer.scala:68)
> at org.apache.spark.sql.kafka010.CachedKafkaConsumer.get(
> CachedKafkaConsumer.scala:106)
> at org.apache.spark.sql.kafka010.KafkaSourceRDD$$anon$1.
> getNext(KafkaSourceRDD.scala:157)
> at
>


Re: [Structured Streaming] Application Updates in Production

2018-03-22 Thread Tathagata Das
Yes indeed, we dont directly support schema migration of state as of now.
However, depending on what stateful operator you are using, you can work
around it. For example, if you are using mapGroupsWithState /
flatMapGroupsWithState, you can save explicitly convert your state to
avro-encoded bytes and save bytes as state. You will be responsible for
encoding the state in avro such that when you can migrate schema yourself
(much like kafka + avro + schema registry).

On Wed, Mar 21, 2018 at 5:45 PM, Priyank Shrivastava <priy...@us.ibm.com>
wrote:

> TD,
>
> But what if the state schema does change?  My understanding is that if in
> the new code I change the state schema the application will not be able to
> use the old checkpoints.  Is that not correct?
>
> Applications running is parallel is to ensure there is no downtime in
> production i.e because the new app will not pick up from the old
> checkpoints, one would need to keep the old app and the new app running
> until new app catches up on data processing with the old app.
>
>
> ----- Original message -
> From: Tathagata Das <tathagata.das1...@gmail.com>
> To: Priyank Shrivastava <priy...@asperasoft.com>
> Cc: user <user@spark.apache.org>
> Subject: Re: [Structured Streaming] Application Updates in Production
> Date: Wed, Mar 21, 2018 5:28 PM
>
> Why do you want to start the new code in parallel to the old one? Why not
> stop the old one, and then start the new one? Structured Streaming ensures
> that all checkpoint information (offsets and state) are future-compatible
> (as long as state schema is unchanged), hence new code should be able to
> pick exactly where the old code left off.
>
> TD
>
> On Wed, Mar 21, 2018 at 11:56 AM, Priyank Shrivastava <
> priy...@asperasoft.com> wrote:
>
> I am using Structured Streaming with Spark 2.2.  We are using Kafka as our
> source and are using checkpoints for failure recovery and e2e exactly once
> guarantees.  I would like to get some more information on how to handle
> updates to the application when there is a change in stateful operations
> and/or output schema.
>
> As some of the sources suggest I can start the updated application
> parallelly with the old application until it catches up with the old
> application in terms of data, and then kill the old one.  But then the new
> application will have to re-read/re-process all the data in kafka which
> could take a long time.
>
> I want to AVOID this re-processing of the data in the newly deployed
> updated application.
>
> One way I can think of is for the application to keep writing the offsets
> into something in addition to the checkpoint directory, for example in
> zookeeper/hdfs.  And then, on an update of the application, I command Kafka
> readstream() to start reading from the offsets stored in this new location
> (zookeeper/hdfs) - since the updated application can't read from the
> checkpoint directory which is now deemed incompatible.
>
> So a couple of questions:
> 1.  Is the above-stated solution a valid solution?
> 2.  If yes, How can I automate the detection of whether the application is
> being restarted because of a failure/maintenance or because of code changes
> to stateful operations and/or output schema?
>
> Any guidance, example or information source is appreciated.
>
> Thanks,
> Priyank
>
>
>
>


Re: [Structured Streaming] Application Updates in Production

2018-03-21 Thread Tathagata Das
Why do you want to start the new code in parallel to the old one? Why not
stop the old one, and then start the new one? Structured Streaming ensures
that all checkpoint information (offsets and state) are future-compatible
(as long as state schema is unchanged), hence new code should be able to
pick exactly where the old code left off.

TD

On Wed, Mar 21, 2018 at 11:56 AM, Priyank Shrivastava <
priy...@asperasoft.com> wrote:

> I am using Structured Streaming with Spark 2.2.  We are using Kafka as our
> source and are using checkpoints for failure recovery and e2e exactly once
> guarantees.  I would like to get some more information on how to handle
> updates to the application when there is a change in stateful operations
> and/or output schema.
>
> As some of the sources suggest I can start the updated application
> parallelly with the old application until it catches up with the old
> application in terms of data, and then kill the old one.  But then the new
> application will have to re-read/re-process all the data in kafka which
> could take a long time.
>
> I want to AVOID this re-processing of the data in the newly deployed
> updated application.
>
> One way I can think of is for the application to keep writing the offsets
> into something in addition to the checkpoint directory, for example in
> zookeeper/hdfs.  And then, on an update of the application, I command Kafka
> readstream() to start reading from the offsets stored in this new location
> (zookeeper/hdfs) - since the updated application can't read from the
> checkpoint directory which is now deemed incompatible.
>
> So a couple of questions:
> 1.  Is the above-stated solution a valid solution?
> 2.  If yes, How can I automate the detection of whether the application is
> being restarted because of a failure/maintenance or because of code changes
> to stateful operations and/or output schema?
>
> Any guidance, example or information source is appreciated.
>
> Thanks,
> Priyank
>


Re: Multiple Kafka Spark Streaming Dataframe Join query

2018-03-15 Thread Tathagata Das
Chris identified the problem correctly. You need to parse out the json text
from Kafka into separate columns before you can join them up.
I walk through an example of this in my slides -
https://www.slideshare.net/databricks/easy-scalable-fault-tolerant-stream-processing-with-structured-streaming-with-tathagata-das


On Thu, Mar 15, 2018 at 8:37 AM, Bowden, Chris <chris.bow...@microfocus.com>
wrote:

> You need to tell Spark about the structure of the data, it doesn't know
> ahead of time if you put avro, json, protobuf, etc. in kafka for the
> message format. If the messages are in json, Spark provides from_json out
> of the box. For a very simple POC you can happily cast the value to a
> string, etc. if you are prototyping and pushing messages by hand with a
> console producer on the kafka side.
>
> 
> From: Aakash Basu <aakash.spark@gmail.com>
> Sent: Thursday, March 15, 2018 7:52:28 AM
> To: Tathagata Das
> Cc: Dylan Guedes; Georg Heiler; user
> Subject: Re: Multiple Kafka Spark Streaming Dataframe Join query
>
> Hi,
>
> And if I run this below piece of code -
>
>
> from pyspark.sql import SparkSession
> import time
>
> class test:
>
>
> spark = SparkSession.builder \
> .appName("DirectKafka_Spark_Stream_Stream_Join") \
> .getOrCreate()
> # ssc = StreamingContext(spark, 20)
>
> table1_stream = 
> (spark.readStream.format("kafka").option("startingOffsets",
> "earliest").option("kafka.bootstrap.servers", 
> "localhost:9092").option("subscribe",
> "test1").load())
>
> table2_stream = (
> spark.readStream.format("kafka").option("startingOffsets",
> "earliest").option("kafka.bootstrap.servers",
>
> "localhost:9092").option("subscribe",
>
>  "test2").load())
>
> joined_Stream = table1_stream.join(table2_stream, "Id")
> #
> # joined_Stream.show()
>
> # query =
> table1_stream.writeStream.format("console").start().awaitTermination()
> # .queryName("table_A").format("memory")
> # spark.sql("select * from table_A").show()
> time.sleep(10)  # sleep 20 seconds
> # query.stop()
> # query
>
>
> # /home/kafka/Downloads/spark-2.2.1-bin-hadoop2.7/bin/spark-submit
> --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.1.0
> Stream_Stream_Join.py
>
>
>
>
> I get the below error (in Spark 2.3.0) -
>
> Traceback (most recent call last):
>   File "/home/aakashbasu/PycharmProjects/AllMyRnD/
> Kafka_Spark/Stream_Stream_Join.py", line 4, in 
> class test:
>   File "/home/aakashbasu/PycharmProjects/AllMyRnD/
> Kafka_Spark/Stream_Stream_Join.py", line 19, in test
> joined_Stream = table1_stream.join(table2_stream, "Id")
>   File "/home/kafka/Downloads/spark-2.3.0-bin-hadoop2.7/python/
> lib/pyspark.zip/pyspark/sql/dataframe.py", line 931, in join
>   File "/home/kafka/Downloads/spark-2.3.0-bin-hadoop2.7/python/
> lib/py4j-0.10.6-src.zip/py4j/java_gateway.py", line 1160, in __call__
>   File "/home/kafka/Downloads/spark-2.3.0-bin-hadoop2.7/python/
> lib/pyspark.zip/pyspark/sql/utils.py", line 69, in deco
> pyspark.sql.utils.AnalysisException: u'USING column `Id` cannot be
> resolved on the left side of the join. The left-side columns: [key, value,
> topic, partition, offset, timestamp, timestampType];'
>
> Seems, as per the documentation, they key and value are deserialized as
> byte arrays.
>
> I am badly stuck at this step, not many materials online, with steps to
> proceed on this, too.
>
> Any help, guys?
>
> Thanks,
> Aakash.
>
>
> On Thu, Mar 15, 2018 at 7:54 PM, Aakash Basu <aakash.spark@gmail.com<
> mailto:aakash.spark@gmail.com>> wrote:
> Any help on the above?
>
> On Thu, Mar 15, 2018 at 3:53 PM, Aakash Basu <aakash.spark@gmail.com<
> mailto:aakash.spark@gmail.com>> wrote:
> Hi,
>
> I progressed a bit in the above mentioned topic -
>
> 1) I am feeding a CSV file into the Kafka topic.
> 2) Feeding the Kafka topic as readStream as TD's article suggests.
> 3) Then, simply trying to do a show on the streaming dataframe, using
> queryName('XYZ') in the writeStream and writing a sql query on top of it,
> but that doesn't show anything.
> 4) Once all the above problems are resolved, I want to perform a
> stream-stream join.
>
> The CSV file I'm ingesting into Kafka has -
>
> id,first_name,last_name
&g

Re: Multiple Kafka Spark Streaming Dataframe Join query

2018-03-14 Thread Tathagata Das
Relevant:
https://databricks.com/blog/2018/03/13/introducing-stream-stream-joins-in-apache-spark-2-3.html


This is true stream-stream join which will automatically buffer delayed
data and appropriately join stuff with SQL join semantics. Please check it
out :)

TD



On Wed, Mar 14, 2018 at 12:07 PM, Dylan Guedes  wrote:

> I misread it, and thought that you question was if pyspark supports kafka
> lol. Sorry!
>
> On Wed, Mar 14, 2018 at 3:58 PM, Aakash Basu 
> wrote:
>
>> Hey Dylan,
>>
>> Great!
>>
>> Can you revert back to my initial and also the latest mail?
>>
>> Thanks,
>> Aakash.
>>
>> On 15-Mar-2018 12:27 AM, "Dylan Guedes"  wrote:
>>
>>> Hi,
>>>
>>> I've been using the Kafka with pyspark since 2.1.
>>>
>>> On Wed, Mar 14, 2018 at 3:49 PM, Aakash Basu >> > wrote:
>>>
 Hi,

 I'm yet to.

 Just want to know, when does Spark 2.3 with 0.10 Kafka Spark Package
 allows Python? I read somewhere, as of now Scala and Java are the languages
 to be used.

 Please correct me if am wrong.

 Thanks,
 Aakash.

 On 14-Mar-2018 8:24 PM, "Georg Heiler" 
 wrote:

> Did you try spark 2.3 with structured streaming? There watermarking
> and plain sql might be really interesting for you.
> Aakash Basu  schrieb am Mi. 14. März 2018
> um 14:57:
>
>> Hi,
>>
>>
>>
>> *Info (Using):Spark Streaming Kafka 0.8 package*
>>
>> *Spark 2.2.1*
>> *Kafka 1.0.1*
>>
>> As of now, I am feeding paragraphs in Kafka console producer and my
>> Spark, which is acting as a receiver is printing the flattened words, 
>> which
>> is a complete RDD operation.
>>
>> *My motive is to read two tables continuously (being updated) as two
>> distinct Kafka topics being read as two Spark Dataframes and join them
>> based on a key and produce the output. *(I am from Spark-SQL
>> background, pardon my Spark-SQL-ish writing)
>>
>> *It may happen, the first topic is receiving new data 15 mins prior
>> to the second topic, in that scenario, how to proceed? I should not lose
>> any data.*
>>
>> As of now, I want to simply pass paragraphs, read them as RDD,
>> convert to DF and then join to get the common keys as the output. (Just 
>> for
>> R).
>>
>> Started using Spark Streaming and Kafka today itself.
>>
>> Please help!
>>
>> Thanks,
>> Aakash.
>>
>
>>>
>


Re: what is the right syntax for self joins in Spark 2.3.0 ?

2018-03-12 Thread Tathagata Das
You have understood the problem right. However note that your
interpretation of the output *(K, leftValue, null), **(K, leftValue,
rightValue1), **(K, leftValue, rightValue2)* is subject to the knowledge of
the semantics of the join. That if you are processing the output rows
*manually*, you are being aware that the operator is a join where you can
make the semantics interpretation of *"null replaced by first match, then
all matches are just addition rows".* This is however not a general
solution for any sink, and for any operator. We need to find a way to
expose these semantics through the APIs such that a sink can use it without
the knowledge of exactly what operator is in the query writing to the sink.
Therefore we still need some work before we can do join in update mode
clearly.

Hope that makes it clear. :)

On Sat, Mar 10, 2018 at 12:14 AM, kant kodali <kanth...@gmail.com> wrote:

> I will give an attempt to answer this.
>
> since rightValue1 and rightValue2 have the same key "K"(two matches) why
> would it ever be the case *rightValue2* replacing *rightValue1* replacing 
> *null?
> *Moreover, why does user need to care?
>
> The result in this case (after getting 2 matches) should be
>
> *(K, leftValue, rightValue1)*
> *(K, leftValue, rightValue2)*
>
> This basically means only one of them replaced the earlier null. which one
> of two? Depends on whichever arrived first. Other words, "null's" will be
> replaced by first matching row and subsequently, if there is a new matching
> row it will just be another row with the same key in the result table or if
> there a new unmatched row then the result table should have null's for the
> unmatched fields.
>
> From a user perspective, I believe just spitting out nulls for every
> trigger until there is a match and when there is match spitting out the
> joined rows should suffice isn't it?
>
> Sorry if my thoughts are too naive!
>
>
>
>
>
>
>
>
>
>
> On Thu, Mar 8, 2018 at 6:14 PM, Tathagata Das <tathagata.das1...@gmail.com
> > wrote:
>
>> This doc is unrelated to the stream-stream join we added in Structured
>> Streaming. :)
>>
>> That said we added append mode first because it easier to reason about
>> the semantics of append mode especially in the context of outer joins. You
>> output a row only when you know it wont be changed ever. The semantics of
>> update mode in outer joins is trickier to reason about and expose through
>> the APIs. Consider a left outer join. As soon as we get a left-side record
>> with a key K that does not have a match, do we output *(K, leftValue,
>> null)*? And if we do so, then later get 2 matches from the right side,
>> we have to output *(K, leftValue, rightValue1) and (K, leftValue,
>> rightValue2)*. But how do we convey that *rightValue1* and *rightValue2 
>> *together
>> replace the earlier *null*, rather than *rightValue2* replacing
>> *rightValue1* replacing *null?*
>>
>> We will figure these out in future releases. For now, we have released
>> append mode, which allow quite a large range of use cases, including
>> multiple cascading joins.
>>
>> TD
>>
>>
>>
>> On Thu, Mar 8, 2018 at 9:18 AM, Gourav Sengupta <
>> gourav.sengu...@gmail.com> wrote:
>>
>>> super interesting.
>>>
>>> On Wed, Mar 7, 2018 at 11:44 AM, kant kodali <kanth...@gmail.com> wrote:
>>>
>>>> It looks to me that the StateStore described in this doc
>>>> <https://docs.google.com/document/d/1i528WI7KFica0Dg1LTQfdQMsW8ai3WDvHmUvkH1BKg4/edit>
>>>>  Actually
>>>> has full outer join and every other join is a filter of that. Also the doc
>>>> talks about update mode but looks like Spark 2.3 ended up with append mode?
>>>> Anyways the moment it is in master I am ready to test so JIRA tickets on
>>>> this would help to keep track. please let me know.
>>>>
>>>> Thanks!
>>>>
>>>> On Tue, Mar 6, 2018 at 9:16 PM, kant kodali <kanth...@gmail.com> wrote:
>>>>
>>>>> Sorry I meant Spark 2.4 in my previous email
>>>>>
>>>>> On Tue, Mar 6, 2018 at 9:15 PM, kant kodali <kanth...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi TD,
>>>>>>
>>>>>> I agree I think we are better off either with a full fix or no fix. I
>>>>>> am ok with the complete fix being available in master or some branch. I
>>>>>> guess the solution for me is to just build from the source.
>>>>&g

Re: Upgrades of streaming jobs

2018-03-09 Thread Tathagata Das
Yes, all checkpoints are forward compatible.

However, you do need to restart the query if you want to update the code of
the query. This downtime can be in less than a second (if you just restart
the query without stopping the application/Spark driver) or 10s of seconds
(if you have to stop the application and resubmit your application to the
cluster).



On Thu, Mar 8, 2018 at 12:11 PM, Georg Heiler 
wrote:

> Hi
>
> What is the state of spark structured streaming jobs and upgrades?
>
> Can checkpoints of version 1 be read by version 2 of a job? Is downtime
> required to upgrade the job?
>
> Thanks
>


Re: what is the right syntax for self joins in Spark 2.3.0 ?

2018-03-08 Thread Tathagata Das
This doc is unrelated to the stream-stream join we added in Structured
Streaming. :)

That said we added append mode first because it easier to reason about the
semantics of append mode especially in the context of outer joins. You
output a row only when you know it wont be changed ever. The semantics of
update mode in outer joins is trickier to reason about and expose through
the APIs. Consider a left outer join. As soon as we get a left-side record
with a key K that does not have a match, do we output *(K, leftValue, null)*?
And if we do so, then later get 2 matches from the right side, we have to
output *(K, leftValue, rightValue1) and (K, leftValue, rightValue2)*. But
how do we convey that *rightValue1* and *rightValue2 *together replace the
earlier *null*, rather than *rightValue2* replacing *rightValue1* replacing
*null?*

We will figure these out in future releases. For now, we have released
append mode, which allow quite a large range of use cases, including
multiple cascading joins.

TD



On Thu, Mar 8, 2018 at 9:18 AM, Gourav Sengupta <gourav.sengu...@gmail.com>
wrote:

> super interesting.
>
> On Wed, Mar 7, 2018 at 11:44 AM, kant kodali <kanth...@gmail.com> wrote:
>
>> It looks to me that the StateStore described in this doc
>> <https://docs.google.com/document/d/1i528WI7KFica0Dg1LTQfdQMsW8ai3WDvHmUvkH1BKg4/edit>
>>  Actually
>> has full outer join and every other join is a filter of that. Also the doc
>> talks about update mode but looks like Spark 2.3 ended up with append mode?
>> Anyways the moment it is in master I am ready to test so JIRA tickets on
>> this would help to keep track. please let me know.
>>
>> Thanks!
>>
>> On Tue, Mar 6, 2018 at 9:16 PM, kant kodali <kanth...@gmail.com> wrote:
>>
>>> Sorry I meant Spark 2.4 in my previous email
>>>
>>> On Tue, Mar 6, 2018 at 9:15 PM, kant kodali <kanth...@gmail.com> wrote:
>>>
>>>> Hi TD,
>>>>
>>>> I agree I think we are better off either with a full fix or no fix. I
>>>> am ok with the complete fix being available in master or some branch. I
>>>> guess the solution for me is to just build from the source.
>>>>
>>>> On a similar note, I am not finding any JIRA tickets related to full
>>>> outer joins and update mode for maybe say Spark 2.3. I wonder how hard is
>>>> it two implement both of these? It turns out the update mode and full outer
>>>> join is very useful and required in my case, therefore, I'm just asking.
>>>>
>>>> Thanks!
>>>>
>>>> On Tue, Mar 6, 2018 at 6:25 PM, Tathagata Das <
>>>> tathagata.das1...@gmail.com> wrote:
>>>>
>>>>> I thought about it.
>>>>> I am not 100% sure whether this fix should go into 2.3.1.
>>>>>
>>>>> There are two parts to this bug fix to enable self-joins.
>>>>>
>>>>> 1. Enabling deduping of leaf logical nodes by extending
>>>>> MultInstanceRelation
>>>>>   - This is safe to be backported into the 2.3 branch as it does not
>>>>> touch production code paths.
>>>>>
>>>>> 2. Fixing attribute rewriting in MicroBatchExecution, when the
>>>>> micro-batch plan is spliced into the streaming plan.
>>>>>   - This touches core production code paths and therefore, may not
>>>>> safe to backport.
>>>>>
>>>>> Part 1 enables self-joins in all but a small fraction of self-join
>>>>> queries. That small fraction can produce incorrect results, and part 2
>>>>> avoids that.
>>>>>
>>>>> So for 2.3.1, we can enable self-joins by merging only part 1, but it
>>>>> can give wrong results in some cases. I think that is strictly worse than
>>>>> no fix.
>>>>>
>>>>> TD
>>>>>
>>>>>
>>>>>
>>>>> On Thu, Feb 22, 2018 at 2:32 PM, kant kodali <kanth...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi TD,
>>>>>>
>>>>>> I pulled your commit that is listed on this ticket
>>>>>> https://issues.apache.org/jira/browse/SPARK-23406 specifically I did
>>>>>> the following steps and self joins work after I cherry-pick your commit!
>>>>>> Good Job! I was hoping it will be part of 2.3.0 but looks like it is
>>>>>> targeted for 2.3.1 :(
>>>>>>
>>>>>> git clone https://github.com/apache/spark.gitcd spark

Re: CachedKafkaConsumer: CachedKafkaConsumer is not running in UninterruptibleThread warning

2018-03-07 Thread Tathagata Das
These issues have likely been solved in future versions. Please use the
latest release - Spark 2.3.0.

On Tue, Mar 6, 2018 at 5:11 PM, Junfeng Chen <darou...@gmail.com> wrote:

> Spark 2.1.1.
>
> Actually it is a warning rather than an exception, so there is no stack
> trace. Just many this line:
>
>> CachedKafkaConsumer: CachedKafkaConsumer is not running in
>> UninterruptibleThread. It may hang when CachedKafkaConsumer's method are
>> interrupted because of KAFKA-1894.
>
>
>
> Regard,
> Junfeng Chen
>
> On Wed, Mar 7, 2018 at 3:34 AM, Tathagata Das <tathagata.das1...@gmail.com
> > wrote:
>
>> Which version of Spark are you using? And can you give us the full stack
>> trace of the exception?
>>
>> On Tue, Mar 6, 2018 at 1:53 AM, Junfeng Chen <darou...@gmail.com> wrote:
>>
>>> I am trying to read kafka and save the data as parquet file on hdfs
>>> according to this  https://stackoverflow.co
>>> m/questions/45827664/read-from-kafka-and-write-to-hdfs-in-parquet
>>> <https://stackoverflow.com/questions/45827664/read-from-kafka-and-write-to-hdfs-in-parquet>
>>>
>>>
>>> The code is similar to :
>>>
>>> val df = spark
>>>   .read
>>>   .format("kafka")
>>>   .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
>>>   .option("subscribe", "topic1")
>>>   .load()
>>>
>>> while I am writing in Java.
>>>
>>> However, I keep throwing the following warning:
>>> CachedKafkaConsumer: CachedKafkaConsumer is not running in
>>> UninterruptibleThread. It may hang when CachedKafkaConsumer's method are
>>> interrupted because of KAFKA-1894.
>>>
>>> How to solve it? Thanks!
>>>
>>>
>>> Regard,
>>> Junfeng Chen
>>>
>>
>>
>


Re: what is the right syntax for self joins in Spark 2.3.0 ?

2018-03-06 Thread Tathagata Das
I thought about it.
I am not 100% sure whether this fix should go into 2.3.1.

There are two parts to this bug fix to enable self-joins.

1. Enabling deduping of leaf logical nodes by extending
MultInstanceRelation
  - This is safe to be backported into the 2.3 branch as it does not touch
production code paths.

2. Fixing attribute rewriting in MicroBatchExecution, when the micro-batch
plan is spliced into the streaming plan.
  - This touches core production code paths and therefore, may not safe to
backport.

Part 1 enables self-joins in all but a small fraction of self-join queries.
That small fraction can produce incorrect results, and part 2 avoids that.

So for 2.3.1, we can enable self-joins by merging only part 1, but it can
give wrong results in some cases. I think that is strictly worse than no
fix.

TD



On Thu, Feb 22, 2018 at 2:32 PM, kant kodali <kanth...@gmail.com> wrote:

> Hi TD,
>
> I pulled your commit that is listed on this ticket https://issues.apache.
> org/jira/browse/SPARK-23406 specifically I did the following steps and
> self joins work after I cherry-pick your commit! Good Job! I was hoping it
> will be part of 2.3.0 but looks like it is targeted for 2.3.1 :(
>
> git clone https://github.com/apache/spark.gitcd spark
> git fetch
> git checkout branch-2.3
> git cherry-pick 658d9d9d785a30857bf35d164e6cbbd9799d6959
> export MAVEN_OPTS="-Xmx2g -XX:ReservedCodeCacheSize=512m"
> ./build/mvn -DskipTests compile
> ./dev/make-distribution.sh --name custom-spark --pip --r --tgz -Psparkr 
> -Phadoop-2.7 -Phive -Phive-thriftserver -Pmesos -Pyarn
>
>
> On Thu, Feb 22, 2018 at 11:25 AM, Tathagata Das <
> tathagata.das1...@gmail.com> wrote:
>
>> Hey,
>>
>> Thanks for testing out stream-stream joins and reporting this issue. I am
>> going to take a look at this.
>>
>> TD
>>
>>
>>
>> On Tue, Feb 20, 2018 at 8:20 PM, kant kodali <kanth...@gmail.com> wrote:
>>
>>> if I change it to the below code it works. However, I don't believe it
>>> is the solution I am looking for. I want to be able to do it in raw SQL and
>>> moreover, If a user gives a big chained raw spark SQL join query I am not
>>> even sure how to make copies of the dataframe to achieve the self-join. Is
>>> there any other way here?
>>>
>>>
>>>
>>> import org.apache.spark.sql.streaming.Trigger
>>>
>>> val jdf = 
>>> spark.readStream.format("kafka").option("kafka.bootstrap.servers", 
>>> "localhost:9092").option("subscribe", 
>>> "join_test").option("startingOffsets", "earliest").load();
>>> val jdf1 = 
>>> spark.readStream.format("kafka").option("kafka.bootstrap.servers", 
>>> "localhost:9092").option("subscribe", 
>>> "join_test").option("startingOffsets", "earliest").load();
>>>
>>> jdf.createOrReplaceTempView("table")
>>> jdf1.createOrReplaceTempView("table")
>>>
>>> val resultdf = spark.sql("select * from table inner join table1 on 
>>> table.offset=table1.offset")
>>>
>>> resultdf.writeStream.outputMode("append").format("console").option("truncate",
>>>  false).trigger(Trigger.ProcessingTime(1000)).start()
>>>
>>>
>>> On Tue, Feb 20, 2018 at 8:16 PM, kant kodali <kanth...@gmail.com> wrote:
>>>
>>>> If I change it to this
>>>>
>>>>
>>>>
>>>>
>>>> On Tue, Feb 20, 2018 at 7:52 PM, kant kodali <kanth...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi All,
>>>>>
>>>>> I have the following code
>>>>>
>>>>> import org.apache.spark.sql.streaming.Trigger
>>>>>
>>>>> val jdf = 
>>>>> spark.readStream.format("kafka").option("kafka.bootstrap.servers", 
>>>>> "localhost:9092").option("subscribe", 
>>>>> "join_test").option("startingOffsets", "earliest").load();
>>>>>
>>>>> jdf.createOrReplaceTempView("table")
>>>>>
>>>>> val resultdf = spark.sql("select * from table as x inner join table as y 
>>>>> on x.offset=y.offset")
>>>>>
>>>>> resultdf.writeStream.outputMode("update").format("console").option("truncate",
>>>>>  false).trigger(

Re: CachedKafkaConsumer: CachedKafkaConsumer is not running in UninterruptibleThread warning

2018-03-06 Thread Tathagata Das
Which version of Spark are you using? And can you give us the full stack
trace of the exception?

On Tue, Mar 6, 2018 at 1:53 AM, Junfeng Chen  wrote:

> I am trying to read kafka and save the data as parquet file on hdfs
> according to this  https://stackoverflow.com/questions/45827664/read-from
> -kafka-and-write-to-hdfs-in-parquet
> 
>
>
> The code is similar to :
>
> val df = spark
>   .read
>   .format("kafka")
>   .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
>   .option("subscribe", "topic1")
>   .load()
>
> while I am writing in Java.
>
> However, I keep throwing the following warning:
> CachedKafkaConsumer: CachedKafkaConsumer is not running in
> UninterruptibleThread. It may hang when CachedKafkaConsumer's method are
> interrupted because of KAFKA-1894.
>
> How to solve it? Thanks!
>
>
> Regard,
> Junfeng Chen
>


Re: [Beginner] How to save Kafka Dstream data to parquet ?

2018-03-02 Thread Tathagata Das
Structured Streaming's file sink solves these problems by writing a
log/manifest of all the authoritative files written out (for any format).
So if you run batch or interactive queries on the output directory with
Spark, it will automatically read the manifest and only process files are
that are in the manifest, thus skipping any partial files, etc.



On Fri, Mar 2, 2018 at 1:37 PM, Sunil Parmar <sunilosu...@gmail.com> wrote:

> Is there a way to get finer control over file writing in parquet file
> writer ?
>
> We've an streaming application using Apache Apex ( on path of migration to
> Spark ...story for a different thread). The existing streaming application
> read JSON from Kafka and writes Parquet to HDFS. We're trying to deal with
> partial files by writing .tmp files and renaming them as the last step. We
> only commit offset after rename is successful. This way we get at least
> once semantic and partial file write issue.
>
> Thoughts ?
>
>
> Sunil Parmar
>
> On Wed, Feb 28, 2018 at 1:59 PM, Tathagata Das <
> tathagata.das1...@gmail.com> wrote:
>
>> There is no good way to save to parquet without causing downstream
>> consistency issues.
>> You could use foreachRDD to get each RDD, convert it to
>> DataFrame/Dataset, and write out as parquet files. But you will later run
>> into issues with partial files caused by failures, etc.
>>
>>
>> On Wed, Feb 28, 2018 at 11:09 AM, karthikus <aswin8...@gmail.com> wrote:
>>
>>> Hi all,
>>>
>>> I have a Kafka stream data and I need to save the data in parquet format
>>> without using Structured Streaming (due to the lack of Kafka Message
>>> header
>>> support).
>>>
>>> val kafkaStream =
>>>   KafkaUtils.createDirectStream(
>>> streamingContext,
>>> LocationStrategies.PreferConsistent,
>>> ConsumerStrategies.Subscribe[String, String](
>>>   topics,
>>>   kafkaParams
>>> )
>>>   )
>>> // process the messages
>>> val messages = kafkaStream.map(record => (record.key, record.value))
>>> val lines = messages.map(_._2)
>>>
>>> Now, how do I save it as parquet ? All the examples that I have come
>>> across
>>> uses SQLContext which is deprecated. ! Any help appreciated !
>>>
>>>
>>>
>>> --
>>> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>>>
>>> -
>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>>
>>>
>>
>


Re: [Beginner] Kafka 0.11 header support in Spark Structured Streaming

2018-02-28 Thread Tathagata Das
I made a JIRA for it - https://issues.apache.org/jira/browse/SPARK-23539
Unfortunately it is blocked by Kafka version upgrade, which has a few nasty
issues related to Kafka bugs -
https://issues.apache.org/jira/browse/SPARK-18057

On Wed, Feb 28, 2018 at 3:17 PM, karthikus  wrote:

> TD,
>
> Thanks for your response.
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: [Beginner] How to save Kafka Dstream data to parquet ?

2018-02-28 Thread Tathagata Das
There is no good way to save to parquet without causing downstream
consistency issues.
You could use foreachRDD to get each RDD, convert it to DataFrame/Dataset,
and write out as parquet files. But you will later run into issues with
partial files caused by failures, etc.


On Wed, Feb 28, 2018 at 11:09 AM, karthikus  wrote:

> Hi all,
>
> I have a Kafka stream data and I need to save the data in parquet format
> without using Structured Streaming (due to the lack of Kafka Message header
> support).
>
> val kafkaStream =
>   KafkaUtils.createDirectStream(
> streamingContext,
> LocationStrategies.PreferConsistent,
> ConsumerStrategies.Subscribe[String, String](
>   topics,
>   kafkaParams
> )
>   )
> // process the messages
> val messages = kafkaStream.map(record => (record.key, record.value))
> val lines = messages.map(_._2)
>
> Now, how do I save it as parquet ? All the examples that I have come across
> uses SQLContext which is deprecated. ! Any help appreciated !
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: How does Spark Structured Streaming determine an event has arrived late?

2018-02-27 Thread Tathagata Das
Let me answer the original question directly, that is, how do we determine
that an event is late. We simply track the maximum event time the engine
has seen in the data it has processed till now. And any data that has event
time less than the max is basically "late" (as it is out-of-order). Now, in
a distributed setting, it is very hard define to whether each record is
late or not, because it is hard to have a consistent definition of
max-event-time-seen. Fortunately, we dont have to do this precisely because
we dont really care whether a record is "late"; we only care whether a
record is "too late", that is, older than the watermark =
max-event-time-seen - watermark-delay). As the programming guide says, if
data is "late" but not "too late" we process it in the same way as non-late
data. Only when the data is "too late" do we drop it.

To further clarify, we do not in any way to correlate processing-time with
event-time. The definition of lateness is only based on event-time and has
nothing to do with processing-time. This allows us to do event-time
processing with old data streams as well. For example, you may replay
1-week old data as a stream, and the processing will be exactly the same as
it would have been if you had processed the stream in real-time a week ago.
This is fundamentally necessary for achieving the deterministic processing
that Structured Streaming guarantees.

Regarding the picture, the "time" is actually "event-time". My apologies
for not making this clear in the picture. In hindsight, the picture can be
made much better.  :)

Hope this explanation helps!

TD

On Tue, Feb 27, 2018 at 2:26 AM, kant kodali  wrote:

> I read through the spark structured streaming documentation and I wonder
> how does spark structured streaming determine an event has arrived late?
> Does it compare the event-time with the processing time?
>
> [image: enter image description here]
> 
>
> Taking the above picture as an example Is the bold right arrow line "Time"
> represent processing time? If so
>
> 1) where does this processing time come from? since its streaming Is it
> assuming someone is likely using an upstream source that has processing
> timestamp in it or spark adds a processing timestamp field? For example,
> when reading messages from Kafka we do something like
>
> Dataset kafkadf = spark.readStream().forma("kafka").load()
>
> This dataframe has timestamp column by default which I am assuming is the
> processing time. correct? If so, Does Kafka or Spark add this timestamp?
>
> 2) I can see there is a time comparison between bold right arrow line and
> time in the message. And is that how spark determines an event is late?
>


Re: [Beginner] Kafka 0.11 header support in Spark Structured Streaming

2018-02-27 Thread Tathagata Das
Unfortunately, exposing Kafka headers is not yet supported in Structured
Streaming. The community is more than welcome to add support for it :)

On Tue, Feb 27, 2018 at 2:51 PM, Karthik Jayaraman 
wrote:

> Hi all,
>
> I am using Spark 2.2.1 Structured Streaming to read messages from Kafka. I
> would like to know how to access the Kafka headers programmatically ? Since
> the Kafka message header support is introduced in Kafka 0.11 (
> https://issues.apache.org/jira/browse/KAFKA-4208), is it supported in
> Spark. ? If yes, can anyone point me to an example ?
>
> - Karthik
>


Re: Trigger.ProcessingTime("10 seconds") & Trigger.Continuous(10.seconds)

2018-02-25 Thread Tathagata Das
The continuous one is our new low latency continuous processing engine in
Structured Streaming (to be released in 2.3).
Here is the pre-release doc -
https://dist.apache.org/repos/dist/dev/spark/v2.3.0-rc5-docs/_site/structured-streaming-programming-guide.html#continuous-processing

On Sun, Feb 25, 2018 at 12:26 PM, naresh Goud 
wrote:

> Hello Spark Experts,
>
> What is the difference between Trigger.Continuous(10.seconds) and
> Trigger.ProcessingTime("10 seconds") ?
>
>
>
> Thank you,
> Naresh
>


Re: what is the right syntax for self joins in Spark 2.3.0 ?

2018-02-22 Thread Tathagata Das
Hey,

Thanks for testing out stream-stream joins and reporting this issue. I am
going to take a look at this.

TD



On Tue, Feb 20, 2018 at 8:20 PM, kant kodali  wrote:

> if I change it to the below code it works. However, I don't believe it is
> the solution I am looking for. I want to be able to do it in raw SQL and
> moreover, If a user gives a big chained raw spark SQL join query I am not
> even sure how to make copies of the dataframe to achieve the self-join. Is
> there any other way here?
>
>
>
> import org.apache.spark.sql.streaming.Trigger
>
> val jdf = spark.readStream.format("kafka").option("kafka.bootstrap.servers", 
> "localhost:9092").option("subscribe", "join_test").option("startingOffsets", 
> "earliest").load();
> val jdf1 = spark.readStream.format("kafka").option("kafka.bootstrap.servers", 
> "localhost:9092").option("subscribe", "join_test").option("startingOffsets", 
> "earliest").load();
>
> jdf.createOrReplaceTempView("table")
> jdf1.createOrReplaceTempView("table")
>
> val resultdf = spark.sql("select * from table inner join table1 on 
> table.offset=table1.offset")
>
> resultdf.writeStream.outputMode("append").format("console").option("truncate",
>  false).trigger(Trigger.ProcessingTime(1000)).start()
>
>
> On Tue, Feb 20, 2018 at 8:16 PM, kant kodali  wrote:
>
>> If I change it to this
>>
>>
>>
>>
>> On Tue, Feb 20, 2018 at 7:52 PM, kant kodali  wrote:
>>
>>> Hi All,
>>>
>>> I have the following code
>>>
>>> import org.apache.spark.sql.streaming.Trigger
>>>
>>> val jdf = 
>>> spark.readStream.format("kafka").option("kafka.bootstrap.servers", 
>>> "localhost:9092").option("subscribe", 
>>> "join_test").option("startingOffsets", "earliest").load();
>>>
>>> jdf.createOrReplaceTempView("table")
>>>
>>> val resultdf = spark.sql("select * from table as x inner join table as y on 
>>> x.offset=y.offset")
>>>
>>> resultdf.writeStream.outputMode("update").format("console").option("truncate",
>>>  false).trigger(Trigger.ProcessingTime(1000)).start()
>>>
>>> and I get the following exception.
>>>
>>> org.apache.spark.sql.AnalysisException: cannot resolve '`x.offset`' given 
>>> input columns: [x.value, x.offset, x.key, x.timestampType, x.topic, 
>>> x.timestamp, x.partition]; line 1 pos 50;
>>> 'Project [*]
>>> +- 'Join Inner, ('x.offset = 'y.offset)
>>>:- SubqueryAlias x
>>>:  +- SubqueryAlias table
>>>: +- StreamingRelation 
>>> DataSource(org.apache.spark.sql.SparkSession@15f3f9cf,kafka,List(),None,List(),None,Map(startingOffsets
>>>  -> earliest, subscribe -> join_test, kafka.bootstrap.servers -> 
>>> localhost:9092),None), kafka, [key#28, value#29, topic#30, partition#31, 
>>> offset#32L, timestamp#33, timestampType#34]
>>>+- SubqueryAlias y
>>>   +- SubqueryAlias table
>>>  +- StreamingRelation 
>>> DataSource(org.apache.spark.sql.SparkSession@15f3f9cf,kafka,List(),None,List(),None,Map(startingOffsets
>>>  -> earliest, subscribe -> join_test, kafka.bootstrap.servers -> 
>>> localhost:9092),None), kafka, [key#28, value#29, topic#30, partition#31, 
>>> offset#32L, timestamp#33, timestampType#34]
>>>
>>> any idea whats wrong here?
>>>
>>> Thanks!
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>
>


Re: [Structured Streaming] Avoiding multiple streaming queries

2018-02-14 Thread Tathagata Das
Of course, you can write to multiple Kafka topics from a single query. If
your dataframe that you want to write has a column named "topic" (along
with "key", and "value" columns), it will write the contents of a row to
the topic in that row. This automatically works. So the only thing you need
to figure out is how to generate the value of that column.

This is documented -
https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html#writing-data-to-kafka

Or am i misunderstanding the problem?

TD




On Tue, Feb 13, 2018 at 10:45 AM, Yogesh Mahajan 
wrote:

> I had a similar issue and i think that’s where the structured streaming
> design lacks.
> Seems like Question#2 in your email is a viable workaround for you.
>
> In my case, I have a custom Sink backed by an efficient in-memory column
> store suited for fast ingestion.
>
> I have a Kafka stream coming from one topic, and I need to classify the
> stream based on schema.
> For example, a Kafka topic can have three different types of schema
> messages and I would like to ingest into the three different column
> tables(having different schema) using my custom Sink implementation.
>
> Right now only(?) option I have is to create three streaming queries
> reading the same topic and ingesting to respective column tables using
> their Sink implementations.
> These three streaming queries create underlying three
> IncrementalExecutions and three KafkaSources, and three queries reading the
> same data from the same Kafka topic.
> Even with CachedKafkaConsumers at partition level, this is not an
> efficient way to handle a simple streaming use case.
>
> One workaround to overcome this limitation is to have same schema for all
> the messages in a Kafka partition, unfortunately this is not in our control
> and customers cannot change it due to their dependencies on other
> subsystems.
>
> Thanks,
> http://www.snappydata.io/blog 
>
> On Mon, Feb 12, 2018 at 5:54 PM, Priyank Shrivastava <
> priy...@asperasoft.com> wrote:
>
>> I have a structured streaming query which sinks to Kafka.  This query has
>> a complex aggregation logic.
>>
>>
>> I would like to sink the output DF of this query to multiple Kafka topics
>> each partitioned on a different ‘key’ column.  I don’t want to have
>> multiple Kafka sinks for each of the different Kafka topics because that
>> would mean running multiple streaming queries - one for each Kafka topic,
>> especially since my aggregation logic is complex.
>>
>>
>> Questions:
>>
>> 1.  Is there a way to output the results of a structured streaming query
>> to multiple Kafka topics each with a different key column but without
>> having to execute multiple streaming queries?
>>
>>
>> 2.  If not,  would it be efficient to cascade the multiple queries such
>> that the first query does the complex aggregation and writes output
>> to Kafka and then the other queries just read the output of the first query
>> and write their topics to Kafka thus avoiding doing the complex aggregation
>> again?
>>
>>
>> Thanks in advance for any help.
>>
>>
>> Priyank
>>
>>
>>
>


Re: Spark structured streaming: periodically refresh static data frame

2018-02-14 Thread Tathagata Das
1. Just loop like this.


def startQuery(): Streaming Query = {
   // Define the dataframes and start the query
}

// call this on main thread
while (notShutdown) {
   val query = startQuery()
   query.awaitTermination(refreshIntervalMs)
   query.stop()
   // refresh static data
}


2. Yes, stream-stream joins in 2.3.0, soon to be released. RC3 is available
if you want to test it right now -
https://dist.apache.org/repos/dist/dev/spark/v2.3.0-rc3-bin/.



On Wed, Feb 14, 2018 at 3:34 AM, Appu K <kut...@gmail.com> wrote:

> TD,
>
> Thanks a lot for the quick reply :)
>
>
> Did I understand it right that in the main thread, to wait for the
> termination of the context I'll not be able to use
>  outStream.awaitTermination()  -  [ since i'll be closing in inside another
> thread ]
>
> What would be a good approach to keep the main app long running if I’ve to
> restart queries?
>
> Should i just wait for 2.3 where i'll be able to join two structured
> streams ( if the release is just a few weeks away )
>
> Appreciate all the help!
>
> thanks
> App
>
>
>
> On 14 February 2018 at 4:41:52 PM, Tathagata Das (
> tathagata.das1...@gmail.com) wrote:
>
> Let me fix my mistake :)
> What I suggested in that earlier thread does not work. The streaming query
> that joins a streaming dataset with a batch view, does not correctly pick
> up when the view is updated. It works only when you restart the query. That
> is,
> - stop the query
> - recreate the dataframes,
> - start the query on the new dataframe using the same checkpoint location
> as the previous query
>
> Note that you dont need to restart the whole process/cluster/application,
> just restart the query in the same process/cluster/application. This should
> be very fast (within a few seconds). So, unless you have latency SLAs of 1
> second, you can periodically restart the query without restarting the
> process.
>
> Apologies for my misdirections in that earlier thread. Hope this helps.
>
> TD
>
> On Wed, Feb 14, 2018 at 2:57 AM, Appu K <kut...@gmail.com> wrote:
>
>> More specifically,
>>
>> Quoting TD from the previous thread
>> "Any streaming query that joins a streaming dataframe with the view will
>> automatically start using the most updated data as soon as the view is
>> updated”
>>
>> Wondering if I’m doing something wrong in  https://gist.github.com/anony
>> mous/90dac8efadca3a69571e619943ddb2f6
>>
>> My streaming dataframe is not using the updated data, even though the
>> view is updated!
>>
>> Thank you
>>
>>
>> On 14 February 2018 at 2:54:48 PM, Appu K (kut...@gmail.com) wrote:
>>
>> Hi,
>>
>> I had followed the instructions from the thread https://mail-archives.a
>> pache.org/mod_mbox/spark-user/201704.mbox/%3CD1315D33-41CD-
>> 4ba3-8b77-0879f3669...@qvantel.com%3E while trying to reload a static
>> data frame periodically that gets joined to a structured streaming query.
>>
>> However, the streaming query results does not reflect the data from the
>> refreshed static data frame.
>>
>> Code is here https://gist.github.com/anonymous/90dac8efadca3a69571e6
>> 19943ddb2f6
>>
>> I’m using spark 2.2.1 . Any pointers would be highly helpful
>>
>> Thanks a lot
>>
>> Appu
>>
>>
>


Re: Spark structured streaming: periodically refresh static data frame

2018-02-14 Thread Tathagata Das
Let me fix my mistake :)
What I suggested in that earlier thread does not work. The streaming query
that joins a streaming dataset with a batch view, does not correctly pick
up when the view is updated. It works only when you restart the query. That
is,
- stop the query
- recreate the dataframes,
- start the query on the new dataframe using the same checkpoint location
as the previous query

Note that you dont need to restart the whole process/cluster/application,
just restart the query in the same process/cluster/application. This should
be very fast (within a few seconds). So, unless you have latency SLAs of 1
second, you can periodically restart the query without restarting the
process.

Apologies for my misdirections in that earlier thread. Hope this helps.

TD

On Wed, Feb 14, 2018 at 2:57 AM, Appu K  wrote:

> More specifically,
>
> Quoting TD from the previous thread
> "Any streaming query that joins a streaming dataframe with the view will
> automatically start using the most updated data as soon as the view is
> updated”
>
> Wondering if I’m doing something wrong in  https://gist.github.com/
> anonymous/90dac8efadca3a69571e619943ddb2f6
>
> My streaming dataframe is not using the updated data, even though the view
> is updated!
>
> Thank you
>
>
> On 14 February 2018 at 2:54:48 PM, Appu K (kut...@gmail.com) wrote:
>
> Hi,
>
> I had followed the instructions from the thread https://mail-archives.
> apache.org/mod_mbox/spark-user/201704.mbox/%3CD1315D33-
> 41cd-4ba3-8b77-0879f3669...@qvantel.com%3E while trying to reload a
> static data frame periodically that gets joined to a structured streaming
> query.
>
> However, the streaming query results does not reflect the data from the
> refreshed static data frame.
>
> Code is here https://gist.github.com/anonymous/
> 90dac8efadca3a69571e619943ddb2f6
>
> I’m using spark 2.2.1 . Any pointers would be highly helpful
>
> Thanks a lot
>
> Appu
>
>


Re: Spark Streaming withWatermark

2018-02-06 Thread Tathagata Das
That may very well be possible. The watermark delay guarantees that any
record newer than or equal to watermark (that is, max event time seen - 20
seconds), will be considered and never be ignored.  It does not guarantee
the other way, that is, it does NOT guarantee that records older than the
watermark will definitely get ignored. In a distributed setting, it is
super hard to get strict guarantees, so we choose to err on the side of
being more inclusive (that is, include some old data), rather than the side
of dropping any not-old data.

I will update the programming guide to make this more clear.

On Feb 6, 2018 5:01 PM, "Vishnu Viswanath" 
wrote:

> Could it be that these messages were processed in the same micro batch? In
> that case, watermark will be updated only after the batch finishes which
> did not have any effect of the late data in the current batch.
>
> On Tue, Feb 6, 2018 at 4:18 PM Jiewen Shao  wrote:
>
>> Ok, Thanks for confirmation.
>>
>> So based on my code, I have messages with following timestamps (converted
>> to more readable format) in the following order:
>>
>> 2018-02-06 12:00:00
>> 2018-02-06 12:00:01
>> 2018-02-06 12:00:02
>> 2018-02-06 12:00:03
>> 2018-02-06 11:59:00  <-- this message should not be counted, right?
>> however in my test, this one is still counted
>>
>>
>>
>> On Tue, Feb 6, 2018 at 2:05 PM, Vishnu Viswanath <
>> vishnu.viswanat...@gmail.com> wrote:
>>
>>> Yes, that is correct.
>>>
>>> On Tue, Feb 6, 2018 at 4:56 PM, Jiewen Shao 
>>> wrote:
>>>
 Vishnu, thanks for the reply
 so "event time" and "window end time" have nothing to do with current
 system timestamp, watermark moves with the higher value of "timestamp"
 field of the input and never moves down, is that correct understanding?


 On Tue, Feb 6, 2018 at 11:47 AM, Vishnu Viswanath <
 vishnu.viswanat...@gmail.com> wrote:

> Hi
>
> 20 second corresponds to when the window state should be cleared. For
> the late message to be dropped, it should come in after you receive a
> message with event time >= window end time + 20 seconds.
>
> I wrote a post on this recently: http://vishnuviswanath.com/spark_
> structured_streaming.html#watermark
>
> Thanks,
> Vishnu
>
> On Tue, Feb 6, 2018 at 12:11 PM Jiewen Shao 
> wrote:
>
>> sample code:
>>
>> Let's say Xyz is POJO with a field called timestamp,
>>
>> regarding code withWatermark("timestamp", "20 seconds")
>>
>> I expect the msg with timestamp 20 seconds or older will be dropped,
>> what does 20 seconds compare to? based on my test nothing was dropped no
>> matter how old the timestamp is, what did i miss?
>>
>> Dataset xyz = lines
>> .as(Encoders.STRING())
>> .map((MapFunction) value -> mapper.readValue(value, 
>> Xyz.class), Encoders.bean(Xyz.class));
>>
>> Dataset aggregated = xyz.withWatermark("timestamp", "20 seconds")
>> .groupBy(functions.window(xyz.col("timestamp"), "5 seconds"), 
>> xyz.col("x") //tumbling window of size 5 seconds (timestamp)
>> ).count();
>>
>> Thanks
>>
>>

>>>
>>


Re: Spark Structured Streaming for Twitter Streaming data

2018-01-31 Thread Tathagata Das
The code uses the format "socket" which is only for text sent over a simple
socket, which is completely different from how Twitter APIs works. So this
wont work at all.
Fundamentally, for Structured Streaming, we have focused only on those
streaming sources that have the capabilities record-level tracking offsets
(e.g. Kafka offsets) and replayability in order to give strong exactly-once
fault-tolerance guarantees. Hence we have focused on files, Kafka, Kinesis
(socket is just for testing as is documented). Twitter APIs as a source
does not provide those, hence we have not focused on building one. In
general, for such sources (ones that are not perfectly replayable), there
are two possible solutions.

1. Build your own source: A quick google search shows that others in the
community have attempted to build structured-streaming sources for Twitter.
It wont provide the same fault-tolerance guarantees as Kafka, etc. However,
I dont recommend this now because the DataSource APIs to build streaming
sources are not public yet, and are in flux.

2. Use Kafka/Kinesis as an intermediate system: Write something simple that
uses Twitter APIs directly to read tweets and write them into
Kafka/Kinesis. And then just read from Kafka/Kinesis.

Hope this helps.

TD

On Wed, Jan 31, 2018 at 7:18 PM, Divya Gehlot <divya.htco...@gmail.com>
wrote:

> Hi ,
> I see ,Does that means Spark structured streaming doesn't work with
> Twitter streams ?
> I could see people used kafka or other streaming tools and used spark to
> process the data in structured streaming .
>
> The below doesn't work directly with Twitter Stream until I set up Kafka  ?
>
>> import org.apache.spark.sql.SparkSession
>> val spark = SparkSession
>>   .builder()
>>   .appName("Spark SQL basic example")
>>   .config("spark.some.config.option", "some-value")
>>   .getOrCreate()
>> // For implicit conversions like converting RDDs to DataFrames
>> import spark.implicits
>>>
>>> / Read text from socket
>>
>> val socketDF = spark
>>
>>   .readStream
>>
>>   .format("socket")
>>
>>   .option("host", "localhost")
>>
>>   .option("port", )
>>
>>   .load()
>>
>>
>>> socketDF.isStreaming// Returns True for DataFrames that have
>>> streaming sources
>>
>>
>>> socketDF.printSchema
>>
>>
>>
>
>
> Thanks,
> Divya
>
> On 1 February 2018 at 10:30, Tathagata Das <tathagata.das1...@gmail.com>
> wrote:
>
>> Hello Divya,
>>
>> To add further clarification, the Apache Bahir does not have any
>> Structured Streaming support for Twitter. It only has support for Twitter +
>> DStreams.
>>
>> TD
>>
>>
>>
>> On Wed, Jan 31, 2018 at 2:44 AM, vermanurag <anurag.ve...@fnmathlogic.com
>> > wrote:
>>
>>> Twitter functionality is not part of Core Spark. We have successfully
>>> used
>>> the following packages from maven central in past
>>>
>>> org.apache.bahir:spark-streaming-twitter_2.11:2.2.0
>>>
>>> Earlier there used to be a twitter package under spark, but I find that
>>> it
>>> has not been updated beyond Spark 1.6
>>> org.apache.spark:spark-streaming-twitter_2.11:1.6.0
>>>
>>> Anurag
>>> www.fnmathlogic.com
>>>
>>>
>>>
>>>
>>> --
>>> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>>>
>>> -
>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>>
>>>
>>
>


Re: Spark Structured Streaming for Twitter Streaming data

2018-01-31 Thread Tathagata Das
Hello Divya,

To add further clarification, the Apache Bahir does not have any Structured
Streaming support for Twitter. It only has support for Twitter + DStreams.

TD



On Wed, Jan 31, 2018 at 2:44 AM, vermanurag 
wrote:

> Twitter functionality is not part of Core Spark. We have successfully used
> the following packages from maven central in past
>
> org.apache.bahir:spark-streaming-twitter_2.11:2.2.0
>
> Earlier there used to be a twitter package under spark, but I find that it
> has not been updated beyond Spark 1.6
> org.apache.spark:spark-streaming-twitter_2.11:1.6.0
>
> Anurag
> www.fnmathlogic.com
>
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Apache Spark - Exception on adding column to Structured Streaming DataFrame

2018-01-31 Thread Tathagata Das
Could you give the full stack trace of the exception?

Also, can you do `dataframe2.explain(true)` and show us the plan output?



On Wed, Jan 31, 2018 at 3:35 PM, M Singh 
wrote:

> Hi Folks:
>
> I have to add a column to a structured *streaming* dataframe but when I
> do that (using select or withColumn) I get an exception.  I can add a
> column in structured *non-streaming* structured dataframe. I could not
> find any documentation on how to do this in the following doc  [
> https://spark.apache.org/docs/latest/
> *structured-streaming-programming-guide*.html]
>
> I am using spark 2.4.0-SNAPSHOT
>
> Please let me know what I could be missing.
>
> Thanks for your help.
>
> (I am also attaching the source code for the structured streaming,
> structured non-streaming classes and input file with this email)
>
> 
> org.apache.spark.sql.catalyst.analysis.UnresolvedException: Invalid call
> to dataType on unresolved object, tree: 'cts
> at org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute.dataType(
> unresolved.scala:105)
> at org.apache.spark.sql.types.StructType$$anonfun$fromAttributes$1.apply(
> StructType.scala:435)
> 
>
> Here is the input file (in the ./data directory) - note tokens are
> separated by '\t'
>
> 1 v1
> 2 v1
> 2 v2
> 3 v3
> 3 v1
>
> Here is the code with dataframe (*non-streaming*) which works:
>
> import scala.collection.immutable
> import org.apache.spark.sql.functions._
> import org.apache.spark.sql._
> import org.apache.spark.sql.types._
>
> object StructuredTest {
>   def main(args:Array[String]) : Unit = {
> val sparkBuilder = SparkSession
>   .builder.
>   appName("StreamingTest").master("local[4]")
>
> val spark = sparkBuilder.getOrCreate()
>
> val schema = StructType(
> Array(
>   StructField("id", StringType, false),
>   StructField("visit", StringType, false)
>   ))
> var dataframe = spark.read.option("sep","\t").schema(schema).csv(
> "./data/")
> var dataframe2 = dataframe.select(expr("*"), current_timestamp().as(
> "cts"))
> dataframe2.show(false)
> spark.stop()
>
>   }
> }
>
> Output of the above code is:
>
> +---+-+---+
> |id |visit|cts|
> +---+-+---+
> |1  |v1   |2018-01-31 15:07:00.758|
> |2  |v1   |2018-01-31 15:07:00.758|
> |2  |v2   |2018-01-31 15:07:00.758|
> |3  |v3   |2018-01-31 15:07:00.758|
> |3  |v1   |2018-01-31 15:07:00.758|
> +---+-+---+
>
>
> Here is the code with *structured streaming* which throws the exception:
>
> import scala.collection.immutable
> import org.apache.spark.sql.functions._
> import org.joda.time._
> import org.apache.spark.sql._
> import org.apache.spark.sql.types._
> import org.apache.spark.sql.streaming._
> import org.apache.log4j._
>
> object StreamingTest {
>   def main(args:Array[String]) : Unit = {
> val sparkBuilder = SparkSession
>   .builder.
>   config("spark.sql.streaming.checkpointLocation", "./checkpointes").
>   appName("StreamingTest").master("local[4]")
>
> val spark = sparkBuilder.getOrCreate()
>
> val schema = StructType(
> Array(
>   StructField("id", StringType, false),
>   StructField("visit", StringType, false)
>   ))
> var dataframeInput = spark.readStream.option("sep","\t"
> ).schema(schema).csv("./data/")
> var dataframe2 = dataframeInput.select("*")
> dataframe2 = dataframe2.withColumn("cts", current_timestamp())
> val query = dataframe2.writeStream.option("trucate","false").format("
> console").start
> query.awaitTermination()
>   }
> }
>
> Here is the exception:
>
> 18/01/31 15:10:25 ERROR MicroBatchExecution: Query [id =
> 0fe655de-9096-4d69-b6a5-c593400d2eba, runId = 
> 2394a402-dd52-49b4-854e-cb46684bf4d8]
> terminated with error
> *org.apache.spark.sql.catalyst.analysis.UnresolvedException: Invalid call
> to dataType on unresolved object, tree: 'cts*
> at org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute.dataType(
> unresolved.scala:105)
> at org.apache.spark.sql.types.StructType$$anonfun$fromAttributes$1.apply(
> StructType.scala:435)
> at org.apache.spark.sql.types.StructType$$anonfun$fromAttributes$1.apply(
> StructType.scala:435)
>
> I've also used snippets (shown in bold below) from (
> https://docs.databricks.com/spark/latest/structured-
> streaming/examples.html)
> but still get the same exception:
>
> Here is the code:
>
> import scala.collection.immutable
> import org.apache.spark.sql.functions._
> import org.joda.time._
> import org.apache.spark.sql._
> import org.apache.spark.sql.types._
> import org.apache.spark.sql.streaming._
> import org.apache.log4j._
>
> object StreamingTest {
>   def main(args:Array[String]) : Unit = {
> val sparkBuilder = SparkSession
>   .builder.
>   config("spark.sql.streaming.checkpointLocation", "./checkpointes").
>   appName("StreamingTest").master("local[4]")
>
> val 

Re: mapGroupsWithState in Python

2018-01-31 Thread Tathagata Das
Hello Ayan,

>From what I understand, mapGroupsWithState (probably the more general
flatMapGroupsWithState) is the best way forward (not available in python).
However, you need to figure out your desired semantics of when you want to
output the deduplicated data from the stremaing query. For example, if
there is the following sequence of events

(id, last_update_timestamp, attribute)
1, 12:00, A  < do you want to output this immediately or wait for
sometime to see if there are new data?
1, 11:59, B  < ignored as duplicate
1, 12:01, C < do you want to output this?
1, 12:02, D

If you want to output something every time there is a newer
last_update_timestamp,
then thats not really a strict "deduplication". Its more like aggregation
with keeping the latest. In that case, you can try using UDAFs as well.
However, with UDAFs you wont get any state cleanup. So the
flatMapGroupsWithState is the best solution as you can do whatever tracking
you want, output whenever you want, and get state cleanup using timeouts.

FYI: i have elaborated on flatMapGroupsWithState and timeouts in my talk -
https://databricks.com/session/deep-dive-into-stateful-stream-processing-in-structured-streaming







On Tue, Jan 30, 2018 at 5:14 AM, ayan guha  wrote:

> Any help would be much appreciated :)
>
> On Mon, Jan 29, 2018 at 6:25 PM, ayan guha  wrote:
>
>> Hi
>>
>> I want to write something in Structured streaming:
>>
>> 1. I have a dataset which has 3 columns: id, last_update_timestamp,
>> attribute
>> 2. I am receiving the data through Kinesis
>>
>> I want to deduplicate records based on last_updated. In batch, it looks
>> like:
>>
>> spark.sql("select * from (Select *, row_number() OVER(Partition by id
>> order by last_updated desc) rank  from table1) tmp where rank =1")
>>
>> But now I would like to do it in Structured Stream. I need to maintain
>> the state of id as per the highest last_updated, across the triggers, for a
>> certain period (24 hours).
>>
>> Questions:
>>
>> 1. Should I use mapGroupsWithState or is there any other (SQL?)
>> solution? Can anyone help me to write it?
>> 2. Is mapGroupsWithState supported in Python?
>>
>>  Just to ensure we cover bases, I have already tried using
>> dropDuplicates, but it is keeping the 1st record encountered for an Id, not
>> updating the state:
>>
>> unpackedDF = kinesisDF.selectExpr("cast (data as STRING) jsonData")
>> dataDF = unpackedDF.select(get_json_object(unpackedDF.jsonData, '$.
>> header.id').alias('id'),
>>   get_json_object(unpackedDF.jsonData,
>> '$.header.last_updated').cast('timestamp').alias('last_updated'),
>>   unpackedDF.jsonData)
>>
>> dedupDF = 
>> dataDF.dropDuplicates(subset=['id']).withWatermark('last_updated','24
>> hours')
>>
>>
>> So it is not working. Any help is appreciated.
>>
>> --
>> Best Regards,
>> Ayan Guha
>>
>
>
>
> --
> Best Regards,
> Ayan Guha
>


Re: Max number of streams supported ?

2018-01-31 Thread Tathagata Das
Just to clarify a subtle difference between DStreams and Structured
Streaming. Multiple input streams in a DStreamGraph is likely to mean they
are all being processed/computed in the same way as there can be only one
streaming query / context active in the StreamingContext. However, in the
case of Structured Streaming, there can be any number of independent
streaming queries (i.e. different computations), and each streaming query
with any number if separate input sources. So Michael's comment of "each
stream will have a thread on the driver" is correct when there are many
independent queries with different computations simultaneously running.
However if all your streams need to be processed in the same way, then its
one streaming query with many inputs, and will require one thread.

Hope this helps.

TD

On Wed, Jan 31, 2018 at 12:39 PM, Michael Armbrust 
wrote:

> -dev +user
>
>
>> Similarly for structured streaming, Would there be any limit on number of
>> of streaming sources I can have ?
>>
>
> There is no fundamental limit, but each stream will have a thread on the
> driver that is doing coordination of execution.  We comfortably run 20+
> streams on a single cluster in production, but I have not pushed the
> limits.  You'd want to test with your specific application.
>


Re: Apache Spark - Custom structured streaming data source

2018-01-25 Thread Tathagata Das
Hello Mans,

The streaming DataSource APIs are still evolving and are not public yet.
Hence there is no official documentation. In fact, there is a new
DataSourceV2 API (in Spark 2.3) that we are migrating towards. So at this
point of time, it's hard to make any concrete suggestion. You can take a
look at the classes DataSourceV2, DataReader, MicroBatchDataReader in the
spark source code, along with their implementations.

Hope this helps.

TD

On Jan 25, 2018 8:36 PM, "M Singh"  wrote:

Hi:

I am trying to create a custom structured streaming source and would like
to know if there is any example or documentation on the steps involved.

I've looked at the some methods available in the SparkSession but these are
internal to the sql package:

  *private**[sql]* def internalCreateDataFrame(
  catalystRows: RDD[InternalRow],
  schema: StructType,
  isStreaming: Boolean = false): DataFrame = {
// TODO: use MutableProjection when rowRDD is another DataFrame and the
applied
// schema differs from the existing schema on any field data type.
val logicalPlan = LogicalRDD(
  schema.toAttributes,
  catalystRows,
  isStreaming = isStreaming)(self)
Dataset.ofRows(self, logicalPlan)
  }

Please let me know where I can find the appropriate API or documentation.

Thanks

Mans


Re: [Spark structured streaming] Use of (flat)mapgroupswithstate takes long time

2018-01-22 Thread Tathagata Das
For computing mapGroupsWithState, can you check the following.
- How many tasks are being launched in the reduce stage (that is, the stage
after the shuffle, that is computing mapGroupsWithState)
- How long each task is taking?
- How many cores does the cluster have?


On Thu, Jan 18, 2018 at 11:28 PM, chris-sw 
wrote:

> Hi,
>
> I recently did some experiments with stateful structured streaming by using
> flatmapgroupswithstate. The streaming application is quit simple: It
> receives data from Kafka, feed it to the stateful operator
> (flatmapgroupswithstate) and sinks the output to console.
> During a test with small datasets (3-5 records per batch) I experienced
> long
> batch runs.
>
> Taking a look at the log I see an explosion of tasks with these log
> entries:
> -
> 2018-01-18 13:33:46,668 [Executor task launch worker for task 287] INFO
> org.apache.spark.executor.Executor - Running task 85.0 in stage 3.0 (TID
> 287)
> 2018-01-18 13:33:46,672 [Executor task launch worker for task 287] INFO
> org.apache.spark.sql.execution.streaming.state.
> HDFSBackedStateStoreProvider
> - Retrieved version 1 of HDFSStateStoreProvider[id = (op=0, part=85), dir =
> /tmp/temporary-8b418cec-0378-4324-a916-6e3864500d56/state/0/85] for update
> 2018-01-18 13:33:46,672 [Executor task launch worker for task 287] INFO
> org.apache.spark.storage.ShuffleBlockFetcherIterator - Getting 0 non-empty
> blocks out of 1 blocks
> 2018-01-18 13:33:46,672 [Executor task launch worker for task 287] INFO
> org.apache.spark.storage.ShuffleBlockFetcherIterator - Started 0 remote
> fetches in 0 ms
> 2018-01-18 13:33:46,691 [Executor task launch worker for task 287] INFO
> org.apache.spark.sql.execution.streaming.state.
> HDFSBackedStateStoreProvider
> - Committed version 2 for
> HDFSStateStore[id=(op=0,part=85),dir=/tmp/temporary-
> 8b418cec-0378-4324-a916-6e3864500d56/state/0/85]
> to file
> /tmp/temporary-8b418cec-0378-4324-a916-6e3864500d56/state/0/85/2.delta
> 2018-01-18 13:33:46,691 [Executor task launch worker for task 287] INFO
> org.apache.spark.executor.Executor - Finished task 85.0 in stage 3.0 (TID
> 287). 2212 bytes result sent to driver
> -
>
> A batch run takes approx. 5 seconds and it seems most of the time it is
> doing tasks like above.
> I created several apps using the non-Spark SQL approach with mapWithState
> but never experienced these long batch runs.
>
> Anyone has this experience as well or can help me finding a solution for
> these long runs.
>
> Regards,
>
> Chris
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: flatMapGroupsWithState not timing out (spark 2.2.1)

2018-01-12 Thread Tathagata Das
Aah okay!

How are testing whether there is a timeout? The situation that would lead
to the *EventTimeTimeout* would be the following.
1. Send bunch of data to group1, to set the timeout timestamp using
event-time
2. Then send more data to group2 only, to advance the watermark (since it's
based on event time across all the groups) and see timeout occurs.
Note that you have to keep sending some data to other groups so
that microbatches are triggered continuously and watermark is recalculated.
If you send bunch of data and then stop sending and just wait, then the
watermark will not advance (as there is no data to recalculate watermark)
and therefore may not hit the condition watermark > timeout timestamp.

For *ProcessingTimeTimeout* the situation is different. That should
rely solely on the wallclock time, not on any watermark.
In that case, you still have to keep sending data to continuously trigger
microbatches, as without any data, there wont be microbatches triggered and
therefore no timeouts will be processed. This is a known issue that we will
fix. It should work fine if you keep pushing data to group2; group1 should
timeout.

Did that make sense?

TD

On Fri, Jan 12, 2018 at 3:43 PM, daniel williams <daniel.willi...@gmail.com>
wrote:

> Hi Tathagata,
>
> Thanks for the response and consideration. Noted in my points in my email
> that was actually one of the tests that I did (EventTimeTimeout solely with
> watermark) for the group — and it again never timed out. The code I posted
> was a later test where I was trying to use some of the additional
> GroupState methods to force a timeout. I suppose I could create an
> additional test of when.plus(1 minute) and see what happens.
>
> Thanks and let me know if you have any more thoughts.
>
> dan
> ​
>
>
> On Fri, Jan 12, 2018 at 4:39 PM, Tathagata Das <
> tathagata.das1...@gmail.com> wrote:
>
>> Hello Dan,
>>
>> From your code, it seems like you are setting the timeout timestamp based
>> on the current processing-time / wall-clock-time, while the watermark is
>> being calculated on the event-time ("when" column). The semantics of the
>> EventTimeTimeout is that when the last set timeout timestamp of a group
>> becomes older than the watermark (that is calculated across all groups)
>> because that group did not get any new data for a while, then there is a
>> timeout and the function is called with hasTimedOut to true. However, in
>> this case, the timeout timestamp is being from a different source of time
>> (using the wall clock time) than the watermark (using event-time), so they
>> may not correlate correctly. For example, if the event-time in the test
>> data is such that it is always one hour behind the wall clock time, the
>> watermark will be atleast 1 hour older than the set timeout timestamp, and
>> the group would have to not received data for more than an hour before it
>> times out.
>>
>> So I would verify what is the gap between the event-time in data, and the
>> wall-clock time that is being used to set to understand what is going on.
>> Or even better, just use the event-time in the data to calculate the
>> timeout timestamp and not use processing time timeout anywhere.
>>
>> Let me know how it goes.
>>
>> TD
>>
>>
>>
>> On Fri, Jan 12, 2018 at 2:36 PM, daniel williams <
>> daniel.willi...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> I’m attempting to leverage flatMapGroupsWithState to handle some
>>> arbitrary aggregations and am noticing a couple of things:
>>>
>>>- *ProcessingTimeTimeout* + *setTimeoutDuration* timeout not being
>>>honored
>>>- *EventTimeTimeout* + watermark value not being honored.
>>>- *EventTimeTimeout* + *setTimeoutTimestamp* not being honored
>>>
>>> I’ve come to this conclusion due to never hitting a conditional check
>>> (with log output) for the *hasTimedOut* property. Each of these
>>> scenarios was tested in isolation from each other and all three exhibited
>>> the same behavior — failure to reach a timeout event, and Spark induced
>>> huge duration between batches.
>>>
>>> The test was 2000 messages read from a Kafka topic with two distinct
>>> groups (1000 messages / group).
>>>
>>> To give an idea of what I’m attempting to do: aggregate all events into
>>> a single bucket given some timeout expiry.
>>>
>>> Also, it should be noted, in this example I’m attempting to get the
>>> *final* value of the GroupState object as its timedout. This is why I
>>> attempt to do a second pass on the timeout — but that d

Re: flatMapGroupsWithState not timing out (spark 2.2.1)

2018-01-12 Thread Tathagata Das
Hello Dan,

>From your code, it seems like you are setting the timeout timestamp based
on the current processing-time / wall-clock-time, while the watermark is
being calculated on the event-time ("when" column). The semantics of the
EventTimeTimeout is that when the last set timeout timestamp of a group
becomes older than the watermark (that is calculated across all groups)
because that group did not get any new data for a while, then there is a
timeout and the function is called with hasTimedOut to true. However, in
this case, the timeout timestamp is being from a different source of time
(using the wall clock time) than the watermark (using event-time), so they
may not correlate correctly. For example, if the event-time in the test
data is such that it is always one hour behind the wall clock time, the
watermark will be atleast 1 hour older than the set timeout timestamp, and
the group would have to not received data for more than an hour before it
times out.

So I would verify what is the gap between the event-time in data, and the
wall-clock time that is being used to set to understand what is going on.
Or even better, just use the event-time in the data to calculate the
timeout timestamp and not use processing time timeout anywhere.

Let me know how it goes.

TD



On Fri, Jan 12, 2018 at 2:36 PM, daniel williams 
wrote:

> Hi,
>
> I’m attempting to leverage flatMapGroupsWithState to handle some
> arbitrary aggregations and am noticing a couple of things:
>
>- *ProcessingTimeTimeout* + *setTimeoutDuration* timeout not being
>honored
>- *EventTimeTimeout* + watermark value not being honored.
>- *EventTimeTimeout* + *setTimeoutTimestamp* not being honored
>
> I’ve come to this conclusion due to never hitting a conditional check
> (with log output) for the *hasTimedOut* property. Each of these scenarios
> was tested in isolation from each other and all three exhibited the same
> behavior — failure to reach a timeout event, and Spark induced huge
> duration between batches.
>
> The test was 2000 messages read from a Kafka topic with two distinct
> groups (1000 messages / group).
>
> To give an idea of what I’m attempting to do: aggregate all events into a
> single bucket given some timeout expiry.
>
> Also, it should be noted, in this example I’m attempting to get the
> *final* value of the GroupState object as its timedout. This is why I
> attempt to do a second pass on the timeout — but that doesn’t really matter
> as I’m not even getting the timeout event.
>
> My code is here:
>
> val stream = reader
>   .load()
>   .selectExpr(
> "CAST(key AS STRING)",
> "topic",
> "CAST(value AS BINARY)",
> "timestamp"
>   )
>   .as[KafkaLoadType].map(el => 
> getJacksonReader(classOf[Data]).readValue[Data](new String(el._3)))
>   .withWatermark("when", "10 seconds")
>   .groupByKey(f => (f.name, f.when))
>   .flatMapGroupsWithState[SessionInfo, Result](OutputMode.Append, 
> GroupStateTimeout.EventTimeTimeout()) {
>   case ((name, when),
>   events: Iterator[Data], state: GroupState[SessionInfo]) => {
>
> state.setTimeoutTimestamp(DateTime.now.plusMinutes(1).getMillis)
>
> info("Starting flatMapGroupsWithState func")
>
> val asList = events.toList
>
> info(s"${name} iterator size: ${asList.size}")
>
> if (state.exists) {
>   info(s"State exists: ${state.get}")
> }
>
> var session = state.getOption.getOrElse(SessionInfo.zero(when, name))
>
> asList.foreach(e => {
>   session = session.add(e.value)
> })
>
> info(s"Updating value to ${session}")
>
> state.update(session)
>
> val result = if (state.hasTimedOut && !state.get.finalized) {
>   info("State has timedout ... finalizing")
>
>   state.update(state.get.copy(finalized = true))
>
>   Iterator(Option(state.get).map(r => Result(r.when, r.name, 
> r.value)).get)
> } else if (state.hasTimedOut && state.get.finalized) {
>   info("State has timedout AND is finalized")
>
>   val r = state.get
>
>   state.remove()
>
>   Iterator(Option(r).map(r => Result(r.when, r.name, r.value)).get)
> } else {
>   val result = state.get
>
>   info(s"Returning ${result}")
>
>   //  state.remove()
>
>   Iterator(Option(result).map(r => Result(r.when, r.name, 
> r.value)).get)
> }
>
> info("Exiting flatMapGroupsWithState func")
>
> result
>   }
> }.writeStream.trigger(Trigger.ProcessingTime(500))
>   .format("console").option("truncate", false)
>   .outputMode(OutputMode.Append)
>   .start()
>
> ​
>
>
>
> Thanks for any help.
>
> dan
>


Re: Spark structured streaming time series forecasting

2018-01-09 Thread Tathagata Das
Spark-ts has been under development for a while. So I doubt there is any
integration with Structured Streaming. That said, Structured Streaming uses
DataFrames and Datasets, and a lot of existing libraries build on
Datasets/DataFrames should work directly, especially if they are map-like
functions.

On Mon, Jan 8, 2018 at 7:04 AM, Bogdan Cojocar 
wrote:

> Hello,
>
> Is there a method to do time series forecasting in spark structured
> streaming? Is there any integration going on with spark-ts or a similar
> library?
>
> Many thanks,
> Bogdan Cojocar
>


Re: Apache Spark - Question about Structured Streaming Sink addBatch dataframe size

2018-01-03 Thread Tathagata Das
1. It is all the result data in that trigger. Note that it takes a
DataFrame which is a purely logical representation of data and has no
association with partitions, etc. which are physical representations.

2. If you want to limit the amount of data that is processed in a trigger,
then you should either control the trigger interval or use the rate limit
options on sources that support it (e.g. for kafka, you can use the option
"maxOffsetsPerTrigger", see the guide

).

Related note, these APIs are subject to change. In fact in the upcoming
release 2.3, we are adding a DataSource V2 API for
batch/microbatch-streaming/continuous-streaming sources and sinks.

On Wed, Jan 3, 2018 at 11:23 PM, M Singh 
wrote:

> Hi:
>
> The documentation for Sink.addBatch is as follows:
>
>   /**
>* Adds a batch of data to this sink. The data for a given `batchId` is
> deterministic and if
>* this method is called more than once with the same batchId (which
> will happen in the case of
>* failures), then `data` should only be added once.
>*
>* Note 1: You cannot apply any operators on `data` except consuming it
> (e.g., `collect/foreach`).
>* Otherwise, you may get a wrong result.
>*
>* Note 2: The method is supposed to be executed synchronously, i.e.
> the method should only return
>* after data is consumed by sink successfully.
>*/
>   def addBatch(batchId: Long, data: DataFrame): Unit
>
> A few questions about the data is each DataFrame passed as the argument to
> addBatch -
> 1. Is it all the data in a partition for each trigger or is it all the
> data in that trigger ?
> 2. Is there a way to control the size in each addBatch invocation to make
> sure that we don't run into OOM exception on the executor while calling
> collect ?
>
> Thanks
>


Re: Can we pass the Calcite streaming sql queries to spark sql?

2017-11-09 Thread Tathagata Das
I dont think so. Calcite's SQL is an extension of standard SQL (keywords
like STREAM, etc.) which we dont support; we just support regular SQL, so
queries like "SELECT STREAM " will not work.


On Thu, Nov 9, 2017 at 11:50 AM, kant kodali  wrote:

> Can we pass the Calcite streaming sql queries to spark sql?
>
> https://calcite.apache.org/docs/stream.html#references
>


Re: Writing custom Structured Streaming receiver

2017-11-01 Thread Tathagata Das
Structured Streaming source APIs are not yet public, so there isnt a guide.
However, if you are adventurous enough, you can take a look at the source
code in Spark.
Source API:
https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Source.scala
Text socket source implementation:
https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/socket.scala

Note that these APIs are still internal APIs and are very likely to change
in future versions of Spark.


On Wed, Nov 1, 2017 at 5:45 PM, Daniel Haviv  wrote:

> Hi,
> Is there a guide to writing a custom Structured Streaming receiver?
>
> Thank you.
> Daniel
>


Re: Structured Stream in Spark

2017-10-25 Thread Tathagata Das
Please do not confuse old Spark Streaming (DStreams) with Structured
Streaming. Structured Streaming's offset and checkpoint management is far
more robust than DStreams.
Take a look at my talk -
https://spark-summit.org/2017/speakers/tathagata-das/

On Wed, Oct 25, 2017 at 9:29 PM, KhajaAsmath Mohammed <
mdkhajaasm...@gmail.com> wrote:

> Thanks Subhash.
>
> Have you ever used zero data loss concept with streaming. I am bit worried
> to use streamig when it comes to data loss.
>
> https://blog.cloudera.com/blog/2017/06/offset-management-for-apache-kafka-
> with-apache-spark-streaming/
>
>
> does structured streaming handles it internally?
>
> On Wed, Oct 25, 2017 at 3:10 PM, Subhash Sriram <subhash.sri...@gmail.com>
> wrote:
>
>> No problem! Take a look at this:
>>
>> http://spark.apache.org/docs/latest/structured-streaming-pro
>> gramming-guide.html#recovering-from-failures-with-checkpointing
>>
>> Thanks,
>> Subhash
>>
>> On Wed, Oct 25, 2017 at 4:08 PM, KhajaAsmath Mohammed <
>> mdkhajaasm...@gmail.com> wrote:
>>
>>> Hi Sriram,
>>>
>>> Thanks. This is what I was looking for.
>>>
>>> one question, where do we need to specify the checkpoint directory in
>>> case of structured streaming?
>>>
>>> Thanks,
>>> Asmath
>>>
>>> On Wed, Oct 25, 2017 at 2:52 PM, Subhash Sriram <
>>> subhash.sri...@gmail.com> wrote:
>>>
>>>> Hi Asmath,
>>>>
>>>> Here is an example of using structured streaming to read from Kafka:
>>>>
>>>> https://github.com/apache/spark/blob/master/examples/src/mai
>>>> n/scala/org/apache/spark/examples/sql/streaming/StructuredKa
>>>> fkaWordCount.scala
>>>>
>>>> In terms of parsing the JSON, there is a from_json function that you
>>>> can use. The following might help:
>>>>
>>>> https://databricks.com/blog/2017/02/23/working-complex-data-
>>>> formats-structured-streaming-apache-spark-2-1.html
>>>>
>>>> I hope this helps.
>>>>
>>>> Thanks,
>>>> Subhash
>>>>
>>>> On Wed, Oct 25, 2017 at 2:59 PM, KhajaAsmath Mohammed <
>>>> mdkhajaasm...@gmail.com> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> Could anyone provide suggestions on how to parse json data from kafka
>>>>> and load it back in hive.
>>>>>
>>>>> I have read about structured streaming but didn't find any examples.
>>>>> is there any best practise on how to read it and parse it with structured
>>>>> streaming for this use case?
>>>>>
>>>>> Thanks,
>>>>> Asmath
>>>>>
>>>>
>>>>
>>>
>>
>


Re: Cases when to clear the checkpoint directories.

2017-10-09 Thread Tathagata Das
Any changes in the Java code (to be specific, the generated bytecode) in
the functions you pass to Spark (i.e., map function, reduce function, as
well as it closure dependencies) counts as "application code change", and
will break the recovery from checkpoints.

On Sat, Oct 7, 2017 at 11:53 AM, John, Vishal (Agoda)  wrote:

>
>
> Hello TD,
>
> You had replied to one of the questions about checkpointing –
>
> This is an unfortunate design on my part when I was building DStreams :)
>
> Fortunately, we learnt from our mistakes and built Structured Streaming
> the correct way. Checkpointing in Structured Streaming stores only the
> progress information (offsets, etc.), and the user can change their
> application code (within certain constraints, of course) and still restart
> from checkpoints (unlike DStreams). If you are just building out your
> streaming applications, then I highly recommend you to try out Structured
> Streaming instead of DStreams (which is effectively in maintenance mode).
>
> Can you please elaborate on what you mean by application code change in
> DStream applications?
>
> If I add a couple of println statements in my application code will that
> become an application code change? or do you mean, changing method
> signatures or adding new methods etc.
> Could you please point to relevant source code in Spark, which does this
> type of code validation/de-serialisation in case of DStreams?
>
> We are using mapWithState in our application and it builds its state from
> checkpointed RDDs.  I would like understand the cases where we can avoid
> clearing the checkpoint directories.
>
>
> thanks in advance,
> Vishal
>
>
> 
> This message is confidential and is for the sole use of the intended
> recipient(s). It may also be privileged or otherwise protected by copyright
> or other legal rules. If you have received it by mistake please let us know
> by reply email and delete it from your system. It is prohibited to copy
> this message or disclose its content to anyone. Any confidentiality or
> privilege is not waived or lost by any mistaken delivery or unauthorized
> disclosure of the message. All messages sent to and from Agoda may be
> monitored to ensure compliance with company policies, to protect the
> company's interests and to remove potential malware. Electronic messages
> may be intercepted, amended, lost or deleted, or contain viruses.
>


Re: Should I use Dstream or Structured Stream to transfer data from source to sink and then back from sink to source?

2017-09-14 Thread Tathagata Das
Are you sure the code is correct? A Dataset does not have a method
"trigger". Rather I believe the correct code should be

StreamingQuery query = resultDataSet*.writeStream.*trigger(
ProcesingTime(1000)).format("kafka").start();

You can do all the same things you can do with Structured Streaming as
DStreams. For example, there is foreach in Structured Streaming. E.g.
resultDataSet.writeStream.foreach(...)

When you say mapPartitions code is not getting executed. ... are you sure
the query is running? Maybe actual code (not pseudocode) may help debug
this.


On Wed, Sep 13, 2017 at 11:20 AM, kant kodali  wrote:

> Hi All,
>
> I am trying to read data from kafka, insert into Mongo and read from mongo
> and insert back into Kafka. I went with structured stream approach first
> however I believe I am making some naiver error because my map operations
> are not getting invoked.
>
> The pseudo code looks like this
>
> DataSet resultDataSet = jsonDataset.mapPartitions(
> insertIntoMongo).mapPartitions(readFromMongo);
>
> StreamingQuery query = resultDataSet.trigger(ProcesingTime(1000)).format("
> kafka").start();
>
> query.awaitTermination();
>
> The mapPartitions in this code is not getting executed. Is this because I
> am not calling any action on my streaming dataset? In the Dstream case, I
> used to call forEachRDD and it worked well. so how do I do this using
> structured streaming?
>
> Thanks!
>


Re: Different watermark for different kafka partitions in Structured Streaming

2017-08-30 Thread Tathagata Das
Why not set the watermark to be looser, one that works across all
partitions? The main usage of watermark is to drop state. If you loosen the
watermark threshold (e.g. from 1 hour to 10 hours), then you will keep more
state with older data, but you are guaranteed that you will not drop
important data.

On Wed, Aug 30, 2017 at 7:41 AM, KevinZwx  wrote:

> Hi,
>
> I'm working with Structured Streaming to process logs from kafka and use
> watermark to handle late events. Currently the watermark is computed by
> (max
> event time seen by the engine - late threshold), and the same watermark is
> used for all partitions.
>
> But in production environment it happens frequently that different
> partition
> is consumed at different speed, the consumption of some partitions may be
> left behind, so the newest event time in these partitions may be much
> smaller than than the others'. In this case using the same watermark for
> all
> partitions may cause heavy data loss.
>
> So is there any way to achieve different watermark for different kafka
> partition or any plan to work on this?
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: How to select the entire row that has max timestamp for every key in Spark Structured Streaming 2.1.1?

2017-08-30 Thread Tathagata Das
The per-key state S is kept in the memory. It has to be of a type that can
be encoded by Datasets. All you have to do is update S every time the
function is called, and the engine takes care of serializing/checkpointing
the state value, and retrieving the correct version of the value when
restarting from failures. So you explicitly don't have to "store" the state
anywhere, the engine takes care of it under the hood. Internally, there is
an interface called StateStore
<https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala>,
which defines a component who is actually responsible for checkpointing the
values, etc. And there is a single implementation
<https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala>
of the store that keeps the values in a hashmap and writes all changes to
the values to a HDFS-API-compatible fault-tolerant filesystem for
checkpointing. With this, by default, you really don't have to worry about
externalizing it and you don't have overload any thing in GroupState. You
just use it as the example shows.

It's important to note that all the state of all the keys is distributed
over the executors. So each executor will have in its memory, a fraction of
the all the train state. Depending on the number of trains, and the amount
of data in the state, you will have to size the cluster and the workers
accordingly. If you keep a lot of state for each train, then your overall
memory requirements are going to increase. So you have to be judicious
about how much data to keep as state data for each key.

Regarding aggregation vs mapGroupsWithState, it's a trade-off between
efficiency and flexibility. With aggregation, you can do sliding window of
"24 hours" sliding every "1 hour", which will give max in "last 24 hours"
updated every "1 hour". If you are okay with this approximation, then this
is easiest to implement (don't forget setting watermarks) and most
efficient. If you really want something more precise than that, then
mapGroupsWithState is the ultimate flexible tool. However, you have to do
bookkeeping of "last 24 hours" and calculate the max yourself. :)

Hope this helps.

On Wed, Aug 30, 2017 at 10:58 AM, kant kodali <kanth...@gmail.com> wrote:

> I think I understand *groupByKey/**mapGroupsWithState *and I am still
> trying to wrap my head around *GroupState*. so, I believe I have a
> naive questions to ask on *GroupState*.
>
> If I were to represent a state that has history of events (say 24 hours)
> and say the number of events can be big for a given 24 hour period. where
> do I store the state S? An external store like Kafka or a Database or a
> Distributed File system ? I wonder if I can represent the state S using a
> DataSet that represents the history of events? GroupState also has
> .exists() and  .get() and if I am not wrong I should override these methods
> right so comparisons and retrieval from external store can work?
>
> Thanks!
>
>
>
> On Wed, Aug 30, 2017 at 1:39 AM, kant kodali <kanth...@gmail.com> wrote:
>
>> Hi TD,
>>
>> Thanks for the explanation and for the clear pseudo code and an example!
>>
>> mapGroupsWithState is cool and looks very flexible however I have few
>> concerns and questions. For example
>>
>> Say I store TrainHistory as max heap from the Java Collections library
>> and I keep adding to to this heap for 24 hours and at some point I will run
>> out of Java heap space right? Do I need to store TrainHistory as a
>> DataSet or DataFrame instead of in memory max heap object from Java
>> Collections library?
>>
>> I wonder between *Nested query* vs  *groupByKey/**mapGroupsWithState*
>> which approach is more efficient to solve this particular problem ?
>>
>> Thanks!
>>
>>
>>
>>
>>
>> On Tue, Aug 29, 2017 at 9:50 PM, Tathagata Das <
>> tathagata.das1...@gmail.com> wrote:
>>
>>> Aah, I might have misinterpreted. The groupBy + window solution would
>>> give the max time for each train over 24 hours (non-overlapping window) of
>>> event data (timestamped by activity_timestamp). So the output would be
>>> like.
>>>
>>> Train Dest   Window(activity_timestamp)max(Time)
>>> 1 HK Aug28-00:00 to Aug29-00:0010:00<- updating
>>> currently through aug29
>>> 1 HKAug27-00:00 to Aug28-00:00 09:00<- not updating
>>> as no new updates coming in with activity_timestamp in this range.
>>>
>>> The drawback of this approach is that as soon as Aug28 starts, 

Re: Python UDF to convert timestamps (performance question)

2017-08-30 Thread Tathagata Das
1. Generally, adding columns, etc. will not affect performance because the
Spark's optimizer will automatically figure out columns that are not needed
and eliminate in the optimization step. So that should never be a concern.
2. Again, this is generally not a concern as the optimizer will take care
of moving such expressions around
3. However, using Python UDF is bd for perf. In your case, if the
problem is that the timestamp is in float, you can cast to the float to
timestamp type, and it should automatically convert it correctly.
Something like this *col("ts").cast("timestamp")*

On Wed, Aug 30, 2017 at 11:45 AM, Brian Wylie 
wrote:

> Hi All,
>
> I'm using structured streaming in Spark 2.2.
>
> I'm using PySpark and I have data (from a Kafka publisher) where the
> timestamp is a float that looks like this:  1379288667.631940
>
> So here's my code (which is working fine)
>
> # SUBSCRIBE: Setup connection to Kafka Stream
> raw_data = spark.readStream.format('kafka') \
>   .option('kafka.bootstrap.servers', 'localhost:9092') \
>   .option('subscribe', 'dns') \
>   .option('startingOffsets', 'latest') \
>   .load()
>
> # ETL: Hardcoded Schema for DNS records (do this better later)
> from pyspark.sql.types import StructType, StringType, BooleanType,
> IntegerType, FloatType
> from pyspark.sql.functions import from_json, to_json, col, struct
>
> dns_schema = StructType() \
> .add('ts', FloatType()) \
> .add('uid', StringType()) \
> .add('id.orig_h', StringType()) \
>   
>
> # ETL: Convert raw data into parsed and proper typed data
> from pyspark.sql.functions import col, length, to_timestamp
>
> parsed_data = raw_data \
>   .select(from_json(col("value").cast("string"),
> dns_schema).alias('data')) \
>   .select('data.*')
>
> # Convert Bro IDS time to an actual TimeStamp type
> from pyspark.sql.functions import udf
> import datetime
> my_udf = udf(lambda x: datetime.datetime.fromtimestamp(float(x)),
> TimestampType())
> parsed_data_with_dt = parsed_data.withColumn('dt', my_udf('ts'))
>
> # Then a writestream later...
>
> Okay so all this code works fine (the 'dt' field has exactly what I
> want)... but I'll be streaming in a lot of data so here's the questions:
>
> - Will the creation of a new dataframe withColumn basically kill
> performance?
> - Should I move my UDF into the parsed_data.select(...)  part?
> - Can my UDF be done by spark.sql directly? (I tried to_timestamp but
> without luck)
>
> Any suggestions/pointers are greatly appreciated.
>
> -Brian Wylie
>
>
>


Re: Why do checkpoints work the way they do?

2017-08-29 Thread Tathagata Das
Hello,

This is an unfortunate design on my part when I was building DStreams :)

Fortunately, we learnt from our mistakes and built Structured Streaming the
correct way. Checkpointing in Structured Streaming stores only the progress
information (offsets, etc.), and the user can change their application code
(within certain constraints, of course) and still restart from checkpoints
(unlike DStreams). If you are just building out your streaming
applications, then I highly recommend you to try out Structured Streaming
instead of DStreams (which is effectively in maintenance mode).


On Fri, Aug 25, 2017 at 7:41 PM, Hugo Reinwald 
wrote:

> Hello,
>
> I am implementing a spark streaming solution with Kafka and read that
> checkpoints cannot be used across application code changes - here
> 
>
> I tested changes in application code and got the error message as b below
> -
>
> 17/08/25 15:10:47 WARN CheckpointReader: Error reading checkpoint from
> file file:/tmp/checkpoint/checkpoint-150364116.bk
> java.io.InvalidClassException: scala.collection.mutable.ArrayBuffer;
> local class incompatible: stream classdesc serialVersionUID =
> -2927962711774871866, local class serialVersionUID = 1529165946227428979
>
> While I understand that this is as per design, can I know why does
> checkpointing work the way that it does verifying the class signatures?
> Would it not be easier to let the developer decide if he/she wants to use
> the old checkpoints depending on what is the change in application logic
> e.g. changes in code unrelated to spark/kafka - Logging / conf changes etc
>
> This is first post in the group. Apologies if I am asking the question
> again, I did a nabble search and it didnt throw up the answer.
>
> Thanks for the help.
> Hugo
>


Re: Time window on Processing Time

2017-08-29 Thread Tathagata Das
Yes, it can be! There is a sql function called current_timestamp() which is
self-explanatory. So I believe you should be able to do something like

import org.apache.spark.sql.functions._

ds.withColumn("processingTime", current_timestamp())
  .groupBy(window("processingTime", "1 minute"))
  .count()


On Mon, Aug 28, 2017 at 5:46 AM, madhu phatak  wrote:

> Hi,
> As I am playing with structured streaming, I observed that window function
> always requires a time column in input data.So that means it's event time.
>
> Is it possible to old spark streaming style window function based on
> processing time. I don't see any documentation on the same.
>
> --
> Regards,
> Madhukara Phatak
> http://datamantra.io/
>


Re: How to select the entire row that has max timestamp for every key in Spark Structured Streaming 2.1.1?

2017-08-29 Thread Tathagata Das
Aah, I might have misinterpreted. The groupBy + window solution would give
the max time for each train over 24 hours (non-overlapping window) of event
data (timestamped by activity_timestamp). So the output would be like.

Train Dest   Window(activity_timestamp)max(Time)
1 HK Aug28-00:00 to Aug29-00:0010:00<- updating
currently through aug29
1 HKAug27-00:00 to Aug28-00:00 09:00<- not updating as
no new updates coming in with activity_timestamp in this range.

The drawback of this approach is that as soon as Aug28 starts, you have
wait for new event about a train to get a new max(time). You may rather
want a rolling 24 hour period, that is, the max time known over events in
the last 24 hours.
Then maintaining our own custom state using
mapGroupsWithState/flatMapGroupsWithState()
is the best and most flexible option.
It is available in Spark 2.2 in Scala, Java.

Here is an example that tracks sessions based on events.
Scala -
https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredSessionization.scala

You will have to create a custom per-train state which keeps track of last
24 hours of trains history, and use that state to calculate the max time
for each train.


def updateHistoryAndGetMax(train: String, events: Iterator[TrainEvents],
state: GroupState[TrainHistory]): Long = {
// for every event, update history (i.e. last 24 hours of events) and
return the max time from the history
}

trainTimesDataset // Dataset[TrainEvents]
  .groupByKey(_.train)
  .mapGroupsWithState(updateHistoryAndGetMax)

Hope this helps.


On Tue, Aug 29, 2017 at 5:25 PM, Burak Yavuz <brk...@gmail.com> wrote:

> Hey TD,
>
> If I understood the question correctly, your solution wouldn't return the
> exact solution, since it also groups by on destination. I would say the
> easiest solution would be to use flatMapGroupsWithState, where you:
> .groupByKey(_.train)
>
> and keep in state the row with the maximum time.
>
> On Tue, Aug 29, 2017 at 5:18 PM, Tathagata Das <
> tathagata.das1...@gmail.com> wrote:
>
>> Yes. And in that case, if you just care about only the last few days of
>> max, then you should set watermark on the timestamp column.
>>
>>  *trainTimesDataset*
>> *  .withWatermark("**activity_timestamp", "5 days")*
>> *  .groupBy(window(activity_timestamp, "24 hours", "24 hours"), "train",
>> "dest")*
>> *  .max("time")*
>>
>> Any counts which are more than 5 days old will be dropped from the
>> streaming state.
>>
>> On Tue, Aug 29, 2017 at 2:06 PM, kant kodali <kanth...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> Thanks for the response. Since this is a streaming based query and in my
>>> case I need to hold state for 24 hours which I forgot to mention in my
>>> previous email. can I do ?
>>>
>>>  *trainTimesDataset.groupBy(window(activity_timestamp, "24 hours", "24
>>> hours"), "train", "dest").max("time")*
>>>
>>>
>>> On Tue, Aug 29, 2017 at 1:38 PM, Tathagata Das <
>>> tathagata.das1...@gmail.com> wrote:
>>>
>>>> Say, *trainTimesDataset* is the streaming Dataset of schema *[train:
>>>> Int, dest: String, time: Timestamp] *
>>>>
>>>>
>>>> *Scala*: *trainTimesDataset.groupBy("train", "dest").max("time")*
>>>>
>>>>
>>>> *SQL*: *"select train, dest, max(time) from trainTimesView group by
>>>> train, dest"*// after calling
>>>> *trainTimesData.createOrReplaceTempView(trainTimesView)*
>>>>
>>>>
>>>> On Tue, Aug 29, 2017 at 12:59 PM, kant kodali <kanth...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi All,
>>>>>
>>>>> I am wondering what is the easiest and concise way to express the
>>>>> computation below in Spark Structured streaming given that it supports 
>>>>> both
>>>>> imperative and declarative styles?
>>>>> I am just trying to select rows that has max timestamp for each train?
>>>>> Instead of doing some sort of nested queries like we normally do in any
>>>>> relational database I am trying to see if I can leverage both imperative
>>>>> and declarative at the same time. If nested queries or join are not
>>>>> required then I would like to see how this can be possible? I am using
>>>>> spark 2.1.1.
>>>>>
>>>>> Dataset
>>>>>
>>>>> TrainDest  Time1HK10:001SH12:001  
>>>>>   SZ14:002HK13:002SH09:002
>>>>> SZ07:00
>>>>>
>>>>> The desired result should be:
>>>>>
>>>>> TrainDest  Time1SZ14:002HK13:00
>>>>>
>>>>>
>>>>
>>>
>>
>


Re: How to select the entire row that has max timestamp for every key in Spark Structured Streaming 2.1.1?

2017-08-29 Thread Tathagata Das
Yes. And in that case, if you just care about only the last few days of
max, then you should set watermark on the timestamp column.

 *trainTimesDataset*
*  .withWatermark("**activity_timestamp", "5 days")*
*  .groupBy(window(activity_timestamp, "24 hours", "24 hours"), "train",
"dest")*
*  .max("time")*

Any counts which are more than 5 days old will be dropped from the
streaming state.

On Tue, Aug 29, 2017 at 2:06 PM, kant kodali <kanth...@gmail.com> wrote:

> Hi,
>
> Thanks for the response. Since this is a streaming based query and in my
> case I need to hold state for 24 hours which I forgot to mention in my
> previous email. can I do ?
>
>  *trainTimesDataset.groupBy(window(activity_timestamp, "24 hours", "24
> hours"), "train", "dest").max("time")*
>
>
> On Tue, Aug 29, 2017 at 1:38 PM, Tathagata Das <
> tathagata.das1...@gmail.com> wrote:
>
>> Say, *trainTimesDataset* is the streaming Dataset of schema *[train:
>> Int, dest: String, time: Timestamp] *
>>
>>
>> *Scala*: *trainTimesDataset.groupBy("train", "dest").max("time")*
>>
>>
>> *SQL*: *"select train, dest, max(time) from trainTimesView group by
>> train, dest"*// after calling
>> *trainTimesData.createOrReplaceTempView(trainTimesView)*
>>
>>
>> On Tue, Aug 29, 2017 at 12:59 PM, kant kodali <kanth...@gmail.com> wrote:
>>
>>> Hi All,
>>>
>>> I am wondering what is the easiest and concise way to express the
>>> computation below in Spark Structured streaming given that it supports both
>>> imperative and declarative styles?
>>> I am just trying to select rows that has max timestamp for each train?
>>> Instead of doing some sort of nested queries like we normally do in any
>>> relational database I am trying to see if I can leverage both imperative
>>> and declarative at the same time. If nested queries or join are not
>>> required then I would like to see how this can be possible? I am using
>>> spark 2.1.1.
>>>
>>> Dataset
>>>
>>> TrainDest  Time1HK10:001SH12:001
>>> SZ14:002HK13:002SH09:002
>>> SZ07:00
>>>
>>> The desired result should be:
>>>
>>> TrainDest  Time1SZ14:002HK13:00
>>>
>>>
>>
>


Re: How to select the entire row that has max timestamp for every key in Spark Structured Streaming 2.1.1?

2017-08-29 Thread Tathagata Das
Say, *trainTimesDataset* is the streaming Dataset of schema *[train: Int,
dest: String, time: Timestamp] *


*Scala*: *trainTimesDataset.groupBy("train", "dest").max("time")*


*SQL*: *"select train, dest, max(time) from trainTimesView group by train,
dest"*// after calling
*trainTimesData.createOrReplaceTempView(trainTimesView)*


On Tue, Aug 29, 2017 at 12:59 PM, kant kodali  wrote:

> Hi All,
>
> I am wondering what is the easiest and concise way to express the
> computation below in Spark Structured streaming given that it supports both
> imperative and declarative styles?
> I am just trying to select rows that has max timestamp for each train?
> Instead of doing some sort of nested queries like we normally do in any
> relational database I am trying to see if I can leverage both imperative
> and declarative at the same time. If nested queries or join are not
> required then I would like to see how this can be possible? I am using
> spark 2.1.1.
>
> Dataset
>
> TrainDest  Time1HK10:001SH12:001  
>   SZ14:002HK13:002SH09:002SZ  
>   07:00
>
> The desired result should be:
>
> TrainDest  Time1SZ14:002HK13:00
>
>


Re: Terminating Structured Streaming Applications on Source Failure

2017-08-29 Thread Tathagata Das
When you say "the application remained alive", do you mean the
StreamingQuery stayed alive, or the whole process stayed alive? The
StreamingQuery should be terminated immediately. And the stream execution
threads are all daemon threads, so it should not affect the termination of
the application whether the queries are active or not. May be something
else is keeping the application alive?



On Tue, Aug 29, 2017 at 2:09 AM, Yuval Itzchakov  wrote:

> I wasn't sure if this would be a proper bug or not.
>
> Today, the behavior of Structured Streaming is such that if a source fails
> with an exception, the `StreamExecution` class halts reading further from
> the source, but the application is remained alive. For applications where
> the sole purpose is to transform data from a non static source (such as
> Kafka), this is rather useless and might be surprising.
>
> For example, if you have a simple monitor which checks whether the
> application is alive or not, you'll still get reports that the application
> is alive and running, but actually it isn't consuming anything from the
> source and is logically dead.
>
> Should this be the behavior? I think that perhaps there should be a
> configuration that asks whether to completely shutdown the application on
> source failure.
>
> What do you guys think?
>
> --
> Best Regards,
> Yuval Itzchakov.
>


Re: Structured Streaming: multiple sinks

2017-08-25 Thread Tathagata Das
My apologies Chris. Somehow I have not received the first email by OP, and
hence thought our answers to OP as cryptic questions. :/
I found the full thread on nabble. I agree with your analysis of OP's
question 1.


On Fri, Aug 25, 2017 at 12:48 AM, Chris Bowden <cbcweb...@gmail.com> wrote:

> Tathagata, thanks for filling in context for other readers on 2a and 2b, I
> summarized too much in hindsight.
>
> Regarding the OP's first question, I was hinting it is quite natural to
> chain processes via kafka. If you are already interested in writing
> processed data to kafka, why add complexity to a job by having it commit
> processed data to kafka and s3 vs. simply moving the processed data from
> kafka out to s3 as needed. Perhaps the OP's thread got lost in context
> based on how I responded.
>
> 1) We are consuming from  kafka using  structured streaming and  writing
> the processed data set to s3.
> We also want to write the processed data to kafka moving forward, is it
> possible to do it from the same streaming query ? (spark  version 2.1.1)
>
> Streaming queries are currently bound to a single sink, so multiplexing
> the write with existing sinks via the  streaming query isn't possible
> AFAIK. Arguably you can reuse the "processed data" DAG by starting multiple
> sinks against it, though you will effectively process the data twice on
> different "schedules" since each sink will effectively have its own
> instance of StreamExecution, TriggerExecutor, etc. If you *really* wanted
> to do one pass of the data and process the same exact block of data per
> micro batch you could implement it via foreach or a custom sink which
> writes to kafka and s3, but I wouldn't recommend it. As stated above, it is
> quite natural to chain processes via kafka.
>
> On Thu, Aug 24, 2017 at 11:03 PM, Tathagata Das <
> tathagata.das1...@gmail.com> wrote:
>
>> Responses inline.
>>
>> On Thu, Aug 24, 2017 at 7:16 PM, cbowden <cbcweb...@gmail.com> wrote:
>>
>>> 1. would it not be more natural to write processed to kafka and sink
>>> processed from kafka to s3?
>>>
>>
>> I am sorry i dont fully understand this question. Could you please
>> elaborate further, as in, what is more natural than what?
>>
>>
>>> 2a. addBatch is the time Sink#addBatch took as measured by
>>> StreamExecution.
>>>
>>
>> Yes. This essentially includes the time taken to compute the output and
>> finish writing the output to the sink.
>> (**to give some context for other readers, this person is referring to
>> the different time durations reported through StreamingQuery.lastProgress)
>>
>>
>>> 2b. getBatch is the time Source#getBatch took as measured by
>>> StreamExecution.
>>>
>> Yes, it is the time taken by the source prepare the DataFrame the has the
>> new data to be processed in the trigger.
>> Usually this is low, but its not guaranteed to be as some sources may
>> require complicated tracking and bookkeeping to prepare the DataFrame.
>>
>>
>>> 3. triggerExecution is effectively end-to-end processing time for the
>>> micro-batch, note all other durations sum closely to triggerExecution,
>>> there
>>> is a little slippage based on book-keeping activities in StreamExecution.
>>>
>>
>> Yes. Precisely.
>>
>>
>>>
>>>
>>>
>>> --
>>> View this message in context: http://apache-spark-user-list.
>>> 1001560.n3.nabble.com/Structured-Streaming-multiple-sinks-tp
>>> 29056p29105.html
>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>
>>> -
>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>>
>>>
>>
>


Re: Structured Streaming: multiple sinks

2017-08-25 Thread Tathagata Das
Responses inline.

On Thu, Aug 24, 2017 at 7:16 PM, cbowden  wrote:

> 1. would it not be more natural to write processed to kafka and sink
> processed from kafka to s3?
>

I am sorry i dont fully understand this question. Could you please
elaborate further, as in, what is more natural than what?


> 2a. addBatch is the time Sink#addBatch took as measured by StreamExecution.
>

Yes. This essentially includes the time taken to compute the output and
finish writing the output to the sink.
(**to give some context for other readers, this person is referring to the
different time durations reported through StreamingQuery.lastProgress)


> 2b. getBatch is the time Source#getBatch took as measured by
> StreamExecution.
>
Yes, it is the time taken by the source prepare the DataFrame the has the
new data to be processed in the trigger.
Usually this is low, but its not guaranteed to be as some sources may
require complicated tracking and bookkeeping to prepare the DataFrame.


> 3. triggerExecution is effectively end-to-end processing time for the
> micro-batch, note all other durations sum closely to triggerExecution,
> there
> is a little slippage based on book-keeping activities in StreamExecution.
>

Yes. Precisely.


>
>
>
> --
> View this message in context: http://apache-spark-user-list.
> 1001560.n3.nabble.com/Structured-Streaming-multiple-
> sinks-tp29056p29105.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Restart streaming query spark 2.1 structured streaming

2017-08-15 Thread Tathagata Das
Both works. The asynchronous method with listener will have less of down
time, just that the first trigger/batch after the asynchronous
unpersist+persist will probably take longer as it has to reload the data.


On Tue, Aug 15, 2017 at 2:29 PM, purna pradeep <purna2prad...@gmail.com>
wrote:

> Thanks tathagata das actually I'm planning to something like this
>
> activeQuery.stop()
>
> //unpersist and persist cached data frame
>
> df.unpersist()
>
> //read the updated data //data size of df is around 100gb
>
> df.persist()
>
>  activeQuery = startQuery()
>
>
> the cached data frame size around 100gb ,so the question is this the right
> place to refresh this huge cached data frame ?
>
> I'm also trying to refresh cached data frame in onqueryprogress() method
> in a class which extends StreamingQuerylistner
>
> Would like to know which is the best place to refresh cached data frame
> and why
>
> Thanks again for the below response
>
> On Tue, Aug 15, 2017 at 4:45 PM Tathagata Das <tathagata.das1...@gmail.com>
> wrote:
>
>> You can do something like this.
>>
>>
>> def startQuery(): StreamingQuery = {
>>// create your streaming dataframes
>>// start the query with the same checkpoint directory}
>>
>> // handle to the active queryvar activeQuery: StreamingQuery = null
>> while(!stopped) {
>>
>>if (activeQuery = null) { // if query not active, start query
>>  activeQuery = startQuery()
>>
>>} else if (shouldRestartQuery())  {  // check your condition and 
>> restart query
>>  activeQuery.stop()
>>  activeQuery = startQuery()
>>}
>>
>>activeQuery.awaitTermination(100)   // wait for 100 ms.
>>// if there is any error it will throw exception and quit the loop
>>// otherwise it will keep checking the condition every 100ms}
>>
>>
>>
>>
>> On Tue, Aug 15, 2017 at 1:13 PM, purna pradeep <purna2prad...@gmail.com>
>> wrote:
>>
>>> Thanks Michael
>>>
>>> I guess my question is little confusing ..let me try again
>>>
>>>
>>> I would like to restart streaming query programmatically while my
>>> streaming application is running based on a condition and why I want to do
>>> this
>>>
>>> I want to refresh a cached data frame based on a condition and the best
>>> way to do this restart streaming query suggested by Tdas below for similar
>>> problem
>>>
>>> http://mail-archives.apache.org/mod_mbox/spark-user/
>>> 201705.mbox/%3cCA+AHuKn+vSEWkJD=bSSt6G5bDZDaS6wmN+
>>> fwmn4jtm1x1nd...@mail.gmail.com%3e
>>>
>>> I do understand that checkpoint if helps in recovery and failures but I
>>> would like to know "how to restart streaming query programmatically without
>>> stopping my streaming application"
>>>
>>> In place of query.awaittermination should I need to have an logic to
>>> restart query? Please suggest
>>>
>>>
>>> On Tue, Aug 15, 2017 at 3:26 PM Michael Armbrust <mich...@databricks.com>
>>> wrote:
>>>
>>>> See https://spark.apache.org/docs/latest/structured-
>>>> streaming-programming-guide.html#recovering-from-failures-
>>>> with-checkpointing
>>>>
>>>> Though I think that this currently doesn't work with the console sink.
>>>>
>>>> On Tue, Aug 15, 2017 at 9:40 AM, purna pradeep <purna2prad...@gmail.com
>>>> > wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>>>
>>>>>> I'm trying to restart a streaming query to refresh cached data frame
>>>>>>
>>>>>> Where and how should I restart streaming query
>>>>>>
>>>>>
>>>>>
>>>>> val sparkSes = SparkSession
>>>>>
>>>>>   .builder
>>>>>
>>>>>   .config("spark.master", "local")
>>>>>
>>>>>   .appName("StreamingCahcePoc")
>>>>>
>>>>>   .getOrCreate()
>>>>>
>>>>>
>>>>>
>>>>> import sparkSes.implicits._
>>>>>
>>>>>
>>>>>
>>>>> val dataDF = sparkSes.readStream
>>>>>
>>>>>   .schema(streamSchema)
>>>>>
>>>>>   .csv("testData")
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>val query = counts.writeStream
>>>>>
>>>>>   .outputMode("complete")
>>>>>
>>>>>   .format("console")
>>>>>
>>>>>   .start()
>>>>>
>>>>>
>>>>> query.awaittermination()
>>>>>
>>>>>
>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>
>>


Re: Restart streaming query spark 2.1 structured streaming

2017-08-15 Thread Tathagata Das
You can do something like this.


def startQuery(): StreamingQuery = {
   // create your streaming dataframes
   // start the query with the same checkpoint directory}

// handle to the active queryvar activeQuery: StreamingQuery = null
while(!stopped) {

   if (activeQuery = null) { // if query not active, start query
 activeQuery = startQuery()

   } else if (shouldRestartQuery())  {  // check your condition
and restart query
 activeQuery.stop()
 activeQuery = startQuery()
   }

   activeQuery.awaitTermination(100)   // wait for 100 ms.
   // if there is any error it will throw exception and quit the loop
   // otherwise it will keep checking the condition every 100ms}




On Tue, Aug 15, 2017 at 1:13 PM, purna pradeep 
wrote:

> Thanks Michael
>
> I guess my question is little confusing ..let me try again
>
>
> I would like to restart streaming query programmatically while my
> streaming application is running based on a condition and why I want to do
> this
>
> I want to refresh a cached data frame based on a condition and the best
> way to do this restart streaming query suggested by Tdas below for similar
> problem
>
> http://mail-archives.apache.org/mod_mbox/spark-user/
> 201705.mbox/%3cCA+AHuKn+vSEWkJD=bSSt6G5bDZDaS6wmN+
> fwmn4jtm1x1nd...@mail.gmail.com%3e
>
> I do understand that checkpoint if helps in recovery and failures but I
> would like to know "how to restart streaming query programmatically without
> stopping my streaming application"
>
> In place of query.awaittermination should I need to have an logic to
> restart query? Please suggest
>
>
> On Tue, Aug 15, 2017 at 3:26 PM Michael Armbrust 
> wrote:
>
>> See https://spark.apache.org/docs/latest/structured-
>> streaming-programming-guide.html#recovering-from-failures-
>> with-checkpointing
>>
>> Though I think that this currently doesn't work with the console sink.
>>
>> On Tue, Aug 15, 2017 at 9:40 AM, purna pradeep 
>> wrote:
>>
>>> Hi,
>>>

 I'm trying to restart a streaming query to refresh cached data frame

 Where and how should I restart streaming query

>>>
>>>
>>> val sparkSes = SparkSession
>>>
>>>   .builder
>>>
>>>   .config("spark.master", "local")
>>>
>>>   .appName("StreamingCahcePoc")
>>>
>>>   .getOrCreate()
>>>
>>>
>>>
>>> import sparkSes.implicits._
>>>
>>>
>>>
>>> val dataDF = sparkSes.readStream
>>>
>>>   .schema(streamSchema)
>>>
>>>   .csv("testData")
>>>
>>>
>>>
>>>
>>>
>>>val query = counts.writeStream
>>>
>>>   .outputMode("complete")
>>>
>>>   .format("console")
>>>
>>>   .start()
>>>
>>>
>>> query.awaittermination()
>>>
>>>
>>>



>>


Re: Spark 2.2 streaming with append mode: empty output

2017-08-14 Thread Tathagata Das
In append mode, the aggregation outputs a row only when the watermark has
been crossed and the corresponding aggregate is *final*, that is, will not
be updated any more.
See
http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#handling-late-data-and-watermarking

On Mon, Aug 14, 2017 at 4:09 PM, Ashwin Raju  wrote:

> Hi,
>
> I am running Spark 2.2 and trying out structured streaming. I have the
> following code:
>
> from pyspark.sql import functions as F
>
> df=frame \
>
> .withWatermark("timestamp","1 minute") \
>
> .groupby(F.window("timestamp","1 day"),*groupby_cols) \
>
> .agg(f.sum('bytes'))
>
> query = frame.writeStream \
>
> .format("console")
>
> .option("checkpointLocation", '\some\chkpoint')
>
> .outputMode("complete")
>
> .start()
>
>
>
> query.awaitTermination()
>
>
>
> It prints out a bunch of aggregated rows to console. When I run the same
> query with outputMode("append") however, the output only has the column
> names, no rows. I was originally trying to output to parquet, which only
> supports append mode. I was seeing no data in my parquet files, so I
> switched to console output to debug, then noticed this issue. Am I
> misunderstanding something about how append mode works?
>
>
> Thanks,
>
> Ashwin
>
>


Re: Reusing dataframes for streaming (spark 1.6)

2017-08-09 Thread Tathagata Das
There is a DStream.transform() that does exactly this.

On Tue, Aug 8, 2017 at 7:55 PM, Ashwin Raju  wrote:

> Hi,
>
> We've built a batch application on Spark 1.6.1. I'm looking into how to
> run the same code as a streaming (DStream based) application. This is using
> pyspark.
>
> In the batch application, we have a sequence of transforms that read from
> file, do dataframe operations, then write to file. I was hoping to swap out
> the read from file with textFileStream, then use the dataframe operations
> as is. This would mean that if we change the batch pipeline, so long as it
> is a sequence of dataframe operations, the streaming version can just reuse
> the code.
>
> Looking at the sql_network_wordcount
> 
> example, it looks like I'd have to do DStream.foreachRDD, convert the
> passed in RDD into a dataframe and then do my sequence of dataframe
> operations. However, that list of dataframe operations looks to be
> hardcoded into the process method, is there any way to pass in a function
> that takes a dataframe as input and returns a dataframe?
>
> what i see from the example:
>
> words.foreachRDD(process)
>
> def process(time, rdd):
> # create dataframe from RDD
> # hardcoded operations on the dataframe
>
> what i would like to do instead:
> def process(time, rdd):
> # create dataframe from RDD - input_df
> # output_df = dataframe_pipeline_fn(input_df)
>
> -ashwin
>
>
>
>


Re: Multiple queries on same stream

2017-08-09 Thread Tathagata Das
Its important to note that running multiple streaming queries, as of today,
would read the input data that many number of time. So there is a trade off
between the two approaches.
So even though scenario 1 wont get great catalyst optimization, it may be
more efficient overall in terms of resource usage.

There may be an hybrid solution possible. You could craft multiple rules
using sql dsl. For N rules, you can have N boolean columns added with value
set based on each rule expressed through sql functions. Finally, the
foreach would take appropriate actions. A rough example would be.

dataframe
  .withColumn("rule1", when(...).otherwise(...))
  .withColumn("rule2", when(...).otherwise(...))
  ...
 .filter(...)  // filter out data where no rules were matched
 .as[RuleMatches].foreach { matches =>
// take action for each rule matched
  }

This would evalue the rules with catalyst optimization, and apply
non-optimized foreach function ONLY on rows that matched some rule (which
is hopefully << total rows).



On Tue, Aug 8, 2017 at 11:12 PM, Jörn Franke  wrote:

> This is not easy to say without testing. It depends on type of computation
> etc. it also depends on the Spark version. Generally vectorization / SIMD
> could be much faster if it is applied by Spark / the JVM in scenario 2.
>
> > On 9. Aug 2017, at 07:05, Raghavendra Pandey <
> raghavendra.pan...@gmail.com> wrote:
> >
> > I am using structured streaming to evaluate multiple rules on same
> running stream.
> > I have two options to do that. One is to use forEach and evaluate all
> the rules on the row..
> > The other option is to express rules in spark sql dsl and run multiple
> queries.
> > I was wondering if option 1 will result in better performance even
> though I can get catalyst optimization in option 2.
> >
> > Thanks
> > Raghav
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: [Structured Streaming] Recommended way of joining streams

2017-08-09 Thread Tathagata Das
Writing streams into some sink (preferably fault-tolerant, exactly once
sink, see docs) and then joining is definitely a possible way. But you will
likely incur higher latency. If you want lower latency, then stream-stream
joins is the best approach, which we are working on right now. Spark 2.3 is
likely to have stream-stream joins (no release date). For now, the best way
would be to use mapGroupsWithState (spark 2.2, scala/java). The rough idea
of how to implement inner join is as follows.

case class Type1(...)// fields in first streamcase class
Type2(...)// fields in second streamcase class CombinedType(first:
Type1, second: Type2)   // a combined type that can hold data from
both streams
val streamingDataset1 = streamingDF1.as[Type1].map { first =>
CombinedType(first, null) }// first stream as common typed
datasetval streamingDataset2 = streamingDF2.as[Type2].map { second =>
CombinedType(null, second) }   // second stream as common typed
dataset
val combinedDataset = streamingDataset1.union(streamingDataset2)
combinedDataset
  .groupByKey { x => getKey(x) }  // group by common id
  .flatMapGroupsWithState {  case (key, values, state) =>
  // update state for the key using the values, and possible
output an object
   }




On Wed, Aug 9, 2017 at 12:05 AM, Priyank Shrivastava  wrote:

> I have streams of data coming in from various applications through Kafka.
> These streams are converted into dataframes in Spark.  I would like to join
> these dataframes on a common ID they all contain.
>
> Since  joining streaming dataframes is currently not supported, what is
> the current recommended way to join two dataFrames  in a streaming context.
>
>
> Is it recommended to keep writing the streaming dataframes into some sink
> to convert them into static dataframes which can then be joined?  Would
> this guarantee end-to-end exactly once and fault tolerant guarantees?
>
> Priyank
>


Re: KafkaUtils.createRDD , How do I read all the data from kafka in a batch program for a given topic?

2017-08-07 Thread Tathagata Das
I dont think there is any easier way.

On Mon, Aug 7, 2017 at 7:32 PM, shyla deshpande <deshpandesh...@gmail.com>
wrote:

> Thanks TD for the response. I forgot to mention that I am not using
> structured streaming.
>
> I was looking into KafkaUtils.createRDD, and looks like I need to get the
> earliest and the latest offset for each partition to build the
> Array(offsetRange). I wanted to know if there was a easier way.
>
> 1 reason why we are hesitating to use structured streaming is because I
> need to persist the data in Cassandra database which I believe is not
> production ready.
>
>
> On Mon, Aug 7, 2017 at 6:11 PM, Tathagata Das <tathagata.das1...@gmail.com
> > wrote:
>
>> Its best to use DataFrames. You can read from as streaming or as batch.
>> More details here.
>>
>> https://spark.apache.org/docs/latest/structured-streaming-ka
>> fka-integration.html#creating-a-kafka-source-for-batch-queries
>> https://databricks.com/blog/2017/04/26/processing-data-in-ap
>> ache-kafka-with-structured-streaming-in-apache-spark-2-2.html
>>
>> On Mon, Aug 7, 2017 at 6:03 PM, shyla deshpande <deshpandesh...@gmail.com
>> > wrote:
>>
>>> Hi all,
>>>
>>> What is the easiest way to read all the data from kafka in a batch
>>> program for a given topic?
>>> I have 10 kafka partitions, but the data is not much. I would like to
>>> read  from the earliest from all the partitions for a topic.
>>>
>>> I appreciate any help. Thanks
>>>
>>
>>
>


Re: KafkaUtils.createRDD , How do I read all the data from kafka in a batch program for a given topic?

2017-08-07 Thread Tathagata Das
Its best to use DataFrames. You can read from as streaming or as batch.
More details here.

https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html#creating-a-kafka-source-for-batch-queries
https://databricks.com/blog/2017/04/26/processing-data-in-apache-kafka-with-structured-streaming-in-apache-spark-2-2.html

On Mon, Aug 7, 2017 at 6:03 PM, shyla deshpande 
wrote:

> Hi all,
>
> What is the easiest way to read all the data from kafka in a batch program
> for a given topic?
> I have 10 kafka partitions, but the data is not much. I would like to read
>  from the earliest from all the partitions for a topic.
>
> I appreciate any help. Thanks
>


Re: [SPARK STRUCTURED STREAMING]: Alternatives to using Foreach sink in pyspark

2017-07-27 Thread Tathagata Das
For built-in SQL functions, it does not matter which language you use as
the engine will use the most optimized JVM code to execute. However, in
your case, you are asking for foreach in python. My interpretation was that
you want to specify your python function that process the rows in python.
This is different from the built-in functions, as the engine will have to
invoke your function in the python inside a python VM.







On Wed, Jul 26, 2017 at 12:54 PM, ayan guha <guha.a...@gmail.com> wrote:

> Hi TD
>
> I thought structured streaming does provide similar concept of dataframes
> where it does not matter which language I use to invoke the APIs, with
> exception of udf.
>
> So, when I think of support foreach sink in python, I think it as just a
> wrapper api and data should remain in JVM only. Similar to, for example, a
> hive writer or hdfs writer in Dataframe API.
>
> Am I too simplifying? Or is it just early days in structured streaming?
> Happy to learn any mistakes in my thinking and understanding.
>
> Best
> Ayan
>
> On Thu, 27 Jul 2017 at 4:49 am, Priyank Shrivastava <
> priy...@asperasoft.com> wrote:
>
>> Thanks TD.  I am going to try the python-scala hybrid approach by using
>> scala only for custom redis sink and python for the rest of the app .  I
>> understand it might not be as efficient as purely writing the app in scala
>> but unfortunately I am constrained on scala resources.  Have you come
>> across other use cases where people have resided to such python-scala
>> hybrid approach?
>>
>> Regards,
>> Priyank
>>
>>
>>
>> On Wed, Jul 26, 2017 at 1:46 AM, Tathagata Das <
>> tathagata.das1...@gmail.com> wrote:
>>
>>> Hello Priyank
>>>
>>> Writing something purely in Scale/Java would be the most efficient. Even
>>> if we expose python APIs that allow writing custom sinks in pure Python, it
>>> wont be as efficient as Scala/Java foreach as the data would have to go
>>> through JVM / PVM boundary which has significant overheads. So Scala/Java
>>> foreach is always going to be the best option.
>>>
>>> TD
>>>
>>> On Tue, Jul 25, 2017 at 6:05 PM, Priyank Shrivastava <
>>> priy...@asperasoft.com> wrote:
>>>
>>>> I am trying to write key-values to redis using a DataStreamWriter
>>>> object using pyspark structured streaming APIs. I am using Spark 2.2
>>>>
>>>> Since the Foreach Sink is not supported for python; here
>>>> <http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#using-foreach>,
>>>> I am trying to find out some alternatives.
>>>>
>>>> One alternative is to write a separate Scala module only to push data
>>>> into redis using foreach; ForeachWriter
>>>> <http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.ForeachWriter>
>>>>  is
>>>> supported in Scala. BUT this doesn't seem like an efficient approach and
>>>> adds deployment overhead because now I will have to support Scala in my 
>>>> app.
>>>>
>>>> Another approach is obviously to use Scala instead of python, which is
>>>> fine but I want to make sure that I absolutely cannot use python for this
>>>> problem before I take this path.
>>>>
>>>> Would appreciate some feedback and alternative design approaches for
>>>> this problem.
>>>>
>>>> Thanks.
>>>>
>>>>
>>>>
>>>>
>>>
>> --
> Best Regards,
> Ayan Guha
>


Re: [SPARK STRUCTURED STREAMING]: Alternatives to using Foreach sink in pyspark

2017-07-26 Thread Tathagata Das
We see that all the time. For example, in SQL, people can write their
user-defined function in Scala/Java and use it from SQL/python/anywhere.
That is the recommended way to get the best combo of performance and
ease-of-use from non-jvm languages.

On Wed, Jul 26, 2017 at 11:49 AM, Priyank Shrivastava <
priy...@asperasoft.com> wrote:

> Thanks TD.  I am going to try the python-scala hybrid approach by using
> scala only for custom redis sink and python for the rest of the app .  I
> understand it might not be as efficient as purely writing the app in scala
> but unfortunately I am constrained on scala resources.  Have you come
> across other use cases where people have resided to such python-scala
> hybrid approach?
>
> Regards,
> Priyank
>
>
>
> On Wed, Jul 26, 2017 at 1:46 AM, Tathagata Das <
> tathagata.das1...@gmail.com> wrote:
>
>> Hello Priyank
>>
>> Writing something purely in Scale/Java would be the most efficient. Even
>> if we expose python APIs that allow writing custom sinks in pure Python, it
>> wont be as efficient as Scala/Java foreach as the data would have to go
>> through JVM / PVM boundary which has significant overheads. So Scala/Java
>> foreach is always going to be the best option.
>>
>> TD
>>
>> On Tue, Jul 25, 2017 at 6:05 PM, Priyank Shrivastava <
>> priy...@asperasoft.com> wrote:
>>
>>> I am trying to write key-values to redis using a DataStreamWriter object
>>> using pyspark structured streaming APIs. I am using Spark 2.2
>>>
>>> Since the Foreach Sink is not supported for python; here
>>> <http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#using-foreach>,
>>> I am trying to find out some alternatives.
>>>
>>> One alternative is to write a separate Scala module only to push data
>>> into redis using foreach; ForeachWriter
>>> <http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.ForeachWriter>
>>>  is
>>> supported in Scala. BUT this doesn't seem like an efficient approach and
>>> adds deployment overhead because now I will have to support Scala in my app.
>>>
>>> Another approach is obviously to use Scala instead of python, which is
>>> fine but I want to make sure that I absolutely cannot use python for this
>>> problem before I take this path.
>>>
>>> Would appreciate some feedback and alternative design approaches for
>>> this problem.
>>>
>>> Thanks.
>>>
>>>
>>>
>>>
>>
>


  1   2   3   4   5   6   7   8   9   >