Re: Does Structured Streaming support count(distinct) over all the streaming data?

2016-05-18 Thread Sean Owen
Late to the thread, but, why is counting distinct elements over a
24-hour window not possible? you can certainly do it now, and I'd
presume it's possible with structured streaming with a window.

countByValueAndWindow should do it right? the keys (with non-zero
counts, I suppose) in a window are the distinct values from the stream
in that window. Your example looks right.

On Wed, May 18, 2016 at 12:17 AM, Mich Talebzadeh
 wrote:
>
> Ok What can be used here below
>
> //val countDistinctByValueAndWindow = price.filter(_ > 0.0).reduceByKey((t1, 
> t2) -> t1).countByValueAndWindow(Seconds(windowLength), 
> Seconds(slidingInterval))
> //countDistinctByValueAndWindow.print()
>
>>> On 17 May 2016 at 20:02, Michael Armbrust  wrote:
 In 2.0 you won't be able to do this.  The long term vision would be to 
 make this possible, but a window will be required (like the 24 hours you 
 suggest).

 On Tue, May 17, 2016 at 1:36 AM, Todd  wrote:
>
> Hi,
> We have a requirement to do count(distinct) in a processing batch against 
> all the streaming data(eg, last 24 hours' data),that is,when we do 
> count(distinct),we actually want to compute distinct against last 24 
> hours' data.
> Does structured streaming support this scenario?Thanks!

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re:Re: Does Structured Streaming support count(distinct) over all the streaming data?

2016-05-17 Thread Todd
Thanks you guys for the help.I will try






At 2016-05-18 07:17:08, "Mich Talebzadeh"  wrote:

Thanks Chris,


In a nutshell I don't think one can do that.


So let us see.  Here is my program that is looking for share prices > 95.9. It 
does work. It is pretty simple


import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import org.apache.spark.sql.Row
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.sql.types._
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.functions._
import _root_.kafka.serializer.StringDecoder
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka._
import org.apache.spark.streaming.kafka.KafkaUtils
import scala.collection.mutable.ArrayBuffer
//
object CEP_AVG {
  def main(args: Array[String]) {
// Create a local StreamingContext with two working thread and batch interval 
of n seconds.
val sparkConf = new SparkConf().
 setAppName("CEP_AVG").
 setMaster("local[2]").
 set("spark.cores.max", "2").
 set("spark.streaming.concurrentJobs", "2").
 set("spark.driver.allowMultipleContexts", "true").
 set("spark.hadoop.validateOutputSpecs", "false")
  val sc = new SparkContext(sparkConf)
  // Create sqlContext based on HiveContext
  val sqlContext = new HiveContext(sc)

val ssc = new StreamingContext(sparkConf, Seconds(2))
ssc.checkpoint("checkpoint")
val kafkaParams = Map[String, String]("bootstrap.servers" -> "rhes564:9092", 
"schema.registry.url" -> "http://rhes564:8081;, "zookeeper.connect" -> 
"rhes564:2181", "group.id" -> "CEP_AVG" )
val topics = Set("newtopic")


val DStream = KafkaUtils.createDirectStream[String, String, StringDecoder, 
StringDecoder](ssc, kafkaParams, topics)
DStream.cache()
val lines = DStream.map(_._2)
val price = lines.map(_.split(',').view(2)).map(_.toDouble)

val windowLength = 4
val slidingInterval = 2
val countByValueAndWindow = price.filter(_ > 
95.0).countByValueAndWindow(Seconds(windowLength), Seconds(slidingInterval))
countByValueAndWindow.print()

//
//Now how I can get the distinct price values here?
//
//val countDistinctByValueAndWindow = price.filter(_ > 0.0).reduceByKey((t1, 
t2) -> t1).countByValueAndWindow(Seconds(windowLength), 
Seconds(slidingInterval))
//countDistinctByValueAndWindow.print()

ssc.start()
ssc.awaitTermination()
//ssc.stop()
  }
}


Ok What can be used here below


//val countDistinctByValueAndWindow = price.filter(_ > 0.0).reduceByKey((t1, 
t2) -> t1).countByValueAndWindow(Seconds(windowLength), 
Seconds(slidingInterval))
//countDistinctByValueAndWindow.print()


Let me know your thoughts?


Thanks







Dr Mich Talebzadeh

 

LinkedIn  
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 

http://talebzadehmich.wordpress.com

 



On 17 May 2016 at 23:47, Chris Fregly  wrote:

you can use HyperLogLog with Spark Streaming to accomplish this.


here is an example from my fluxcapacitor GitHub repo:


https://github.com/fluxcapacitor/pipeline/tree/master/myapps/spark/streaming/src/main/scala/com/advancedspark/streaming/rating/approx


here's an accompanying SlideShare presentation from one of my recent meetups 
(slides 70-83):


http://www.slideshare.net/cfregly/spark-after-dark-20-apache-big-data-conf-vancouver-may-11-2016-61970037



and a YouTube video for those that prefer video (starting at 32 mins into the 
video for your convenience):


https://youtu.be/wM9Z0PLx3cw?t=1922




On Tue, May 17, 2016 at 12:17 PM, Mich Talebzadeh  
wrote:

Ok but how about something similar to


val countByValueAndWindow = price.filter(_ > 
95.0).countByValueAndWindow(Seconds(windowLength), Seconds(slidingInterval))





Using a new count => countDistinctByValueAndWindow ?


val countDistinctByValueAndWindow = price.filter(_ > 
95.0).countDistinctByValueAndWindow(Seconds(windowLength), 
Seconds(slidingInterval))




HTH



Dr Mich Talebzadeh

 

LinkedIn  
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 

http://talebzadehmich.wordpress.com

 



On 17 May 2016 at 20:02, Michael Armbrust  wrote:

In 2.0 you won't be able to do this.  The long term vision would be to make 
this possible, but a window will be required (like the 24 hours you suggest).


On Tue, May 17, 2016 at 1:36 AM, Todd  wrote:

Hi,
We have a requirement to do count(distinct) in a processing batch against all 
the streaming data(eg, last 24 hours' data),that is,when we do 
count(distinct),we actually want to compute distinct against last 24 hours' 
data.
Does structured streaming support this scenario?Thanks!









--

Chris Fregly
Research Scientist @ Flux Capacitor AI
"Bringing AI Back to the Future!"
San Francisco, CA
http://fluxcapacitor.com



Re: Does Structured Streaming support count(distinct) over all the streaming data?

2016-05-17 Thread Mich Talebzadeh
Thanks Chris,

In a nutshell I don't think one can do that.

So let us see.  Here is my program that is looking for share prices > 95.9.
It does work. It is pretty simple

import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import org.apache.spark.sql.Row
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.sql.types._
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.functions._
import _root_.kafka.serializer.StringDecoder
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka._
import org.apache.spark.streaming.kafka.KafkaUtils
import scala.collection.mutable.ArrayBuffer
//
object CEP_AVG {
  def main(args: Array[String]) {
// Create a local StreamingContext with two working thread and batch
interval of n seconds.
val sparkConf = new SparkConf().
 setAppName("CEP_AVG").
 setMaster("local[2]").
 set("spark.cores.max", "2").
 set("spark.streaming.concurrentJobs", "2").
 set("spark.driver.allowMultipleContexts", "true").
 set("spark.hadoop.validateOutputSpecs", "false")
  val sc = new SparkContext(sparkConf)
  // Create sqlContext based on HiveContext
  val sqlContext = new HiveContext(sc)

val ssc = new StreamingContext(sparkConf, Seconds(2))
ssc.checkpoint("checkpoint")
val kafkaParams = Map[String, String]("bootstrap.servers" ->
"rhes564:9092", "schema.registry.url" -> "http://rhes564:8081;,
"zookeeper.connect" -> "rhes564:2181", "group.id" -> "CEP_AVG" )
val topics = Set("newtopic")

val DStream = KafkaUtils.createDirectStream[String, String, StringDecoder,
StringDecoder](ssc, kafkaParams, topics)
DStream.cache()
val lines = DStream.map(_._2)
val price = lines.map(_.split(',').view(2)).map(_.toDouble)

val windowLength = 4
val slidingInterval = 2


*val countByValueAndWindow = price.filter(_ >
95.0).countByValueAndWindow(Seconds(windowLength),
Seconds(slidingInterval))countByValueAndWindow.print()*
//
//Now how I can get the distinct price values here?
//
//val countDistinctByValueAndWindow = price.filter(_ >
0.0).reduceByKey((t1, t2) ->
t1).countByValueAndWindow(Seconds(windowLength), Seconds(slidingInterval))
//countDistinctByValueAndWindow.print()

ssc.start()
ssc.awaitTermination()
//ssc.stop()
  }
}
Ok What can be used here below

//val countDistinctByValueAndWindow = price.filter(_ >
0.0).reduceByKey((t1, t2) ->
t1).countByValueAndWindow(Seconds(windowLength), Seconds(slidingInterval))
//countDistinctByValueAndWindow.print()

Let me know your thoughts?

Thanks



Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com



On 17 May 2016 at 23:47, Chris Fregly  wrote:

> you can use HyperLogLog with Spark Streaming to accomplish this.
>
> here is an example from my fluxcapacitor GitHub repo:
>
>
> https://github.com/fluxcapacitor/pipeline/tree/master/myapps/spark/streaming/src/main/scala/com/advancedspark/streaming/rating/approx
>
> here's an accompanying SlideShare presentation from one of my recent
> meetups (slides 70-83):
>
>
> http://www.slideshare.net/cfregly/spark-after-dark-20-apache-big-data-conf-vancouver-may-11-2016-61970037
>
>
> 
> and a YouTube video for those that prefer video (starting at 32 mins into
> the video for your convenience):
>
> https://youtu.be/wM9Z0PLx3cw?t=1922
>
>
> On Tue, May 17, 2016 at 12:17 PM, Mich Talebzadeh <
> mich.talebza...@gmail.com> wrote:
>
>> Ok but how about something similar to
>>
>> val countByValueAndWindow = price.filter(_ >
>> 95.0).countByValueAndWindow(Seconds(windowLength), Seconds(slidingInterval))
>>
>>
>> Using a new count => c*ountDistinctByValueAndWindow ?*
>>
>> val countDistinctByValueAndWindow = price.filter(_ >
>> 95.0).countDistinctByValueAndWindow(Seconds(windowLength),
>> Seconds(slidingInterval))
>>
>>
>> HTH
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn * 
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> *
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>>
>> On 17 May 2016 at 20:02, Michael Armbrust  wrote:
>>
>>> In 2.0 you won't be able to do this.  The long term vision would be to
>>> make this possible, but a window will be required (like the 24 hours you
>>> suggest).
>>>
>>> On Tue, May 17, 2016 at 1:36 AM, Todd  wrote:
>>>
 Hi,
 We have a requirement to do count(distinct) in a processing batch
 against all the streaming data(eg, last 24 hours' data),that is,when we do
 count(distinct),we actually want to compute distinct against last 24 hours'
 data.
 Does structured streaming support this 

Re: Does Structured Streaming support count(distinct) over all the streaming data?

2016-05-17 Thread Chris Fregly
you can use HyperLogLog with Spark Streaming to accomplish this.

here is an example from my fluxcapacitor GitHub repo:

https://github.com/fluxcapacitor/pipeline/tree/master/myapps/spark/streaming/src/main/scala/com/advancedspark/streaming/rating/approx

here's an accompanying SlideShare presentation from one of my recent
meetups (slides 70-83):

http://www.slideshare.net/cfregly/spark-after-dark-20-apache-big-data-conf-vancouver-may-11-2016-61970037


and a YouTube video for those that prefer video (starting at 32 mins into
the video for your convenience):

https://youtu.be/wM9Z0PLx3cw?t=1922


On Tue, May 17, 2016 at 12:17 PM, Mich Talebzadeh  wrote:

> Ok but how about something similar to
>
> val countByValueAndWindow = price.filter(_ >
> 95.0).countByValueAndWindow(Seconds(windowLength), Seconds(slidingInterval))
>
>
> Using a new count => c*ountDistinctByValueAndWindow ?*
>
> val countDistinctByValueAndWindow = price.filter(_ >
> 95.0).countDistinctByValueAndWindow(Seconds(windowLength),
> Seconds(slidingInterval))
>
>
> HTH
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
>
> On 17 May 2016 at 20:02, Michael Armbrust  wrote:
>
>> In 2.0 you won't be able to do this.  The long term vision would be to
>> make this possible, but a window will be required (like the 24 hours you
>> suggest).
>>
>> On Tue, May 17, 2016 at 1:36 AM, Todd  wrote:
>>
>>> Hi,
>>> We have a requirement to do count(distinct) in a processing batch
>>> against all the streaming data(eg, last 24 hours' data),that is,when we do
>>> count(distinct),we actually want to compute distinct against last 24 hours'
>>> data.
>>> Does structured streaming support this scenario?Thanks!
>>>
>>
>>
>

-- 
*Chris Fregly*
Research Scientist @ Flux Capacitor AI
"Bringing AI Back to the Future!"
San Francisco, CA
http://fluxcapacitor.com


Re: Does Structured Streaming support count(distinct) over all the streaming data?

2016-05-17 Thread Mich Talebzadeh
Ok but how about something similar to

val countByValueAndWindow = price.filter(_ >
95.0).countByValueAndWindow(Seconds(windowLength), Seconds(slidingInterval))


Using a new count => c*ountDistinctByValueAndWindow ?*

val countDistinctByValueAndWindow = price.filter(_ >
95.0).countDistinctByValueAndWindow(Seconds(windowLength),
Seconds(slidingInterval))


HTH

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com



On 17 May 2016 at 20:02, Michael Armbrust  wrote:

> In 2.0 you won't be able to do this.  The long term vision would be to
> make this possible, but a window will be required (like the 24 hours you
> suggest).
>
> On Tue, May 17, 2016 at 1:36 AM, Todd  wrote:
>
>> Hi,
>> We have a requirement to do count(distinct) in a processing batch against
>> all the streaming data(eg, last 24 hours' data),that is,when we do
>> count(distinct),we actually want to compute distinct against last 24 hours'
>> data.
>> Does structured streaming support this scenario?Thanks!
>>
>
>


Re: Does Structured Streaming support count(distinct) over all the streaming data?

2016-05-17 Thread Michael Armbrust
In 2.0 you won't be able to do this.  The long term vision would be to make
this possible, but a window will be required (like the 24 hours you
suggest).

On Tue, May 17, 2016 at 1:36 AM, Todd  wrote:

> Hi,
> We have a requirement to do count(distinct) in a processing batch against
> all the streaming data(eg, last 24 hours' data),that is,when we do
> count(distinct),we actually want to compute distinct against last 24 hours'
> data.
> Does structured streaming support this scenario?Thanks!
>


Does Structured Streaming support count(distinct) over all the streaming data?

2016-05-17 Thread Todd
Hi,
We have a requirement to do count(distinct) in a processing batch against all 
the streaming data(eg, last 24 hours' data),that is,when we do 
count(distinct),we actually want to compute distinct against last 24 hours' 
data.
Does structured streaming support this scenario?Thanks!