[Error :] RDD TO Dataframe Spark Streaming

2018-01-31 Thread Divya Gehlot
Hi,
I am getting below error when creating Dataframe from twitter Streaming RDD

val sparkSession:SparkSession = SparkSession
.builder
.appName("twittertest2")
.master("local[*]")
.enableHiveSupport()
.getOrCreate()
val sc = sparkSession.sparkContext
val ssc = new StreamingContext(sc, Seconds(2))
val tweets = TwitterUtils.createStream(ssc, None)
val twt = tweets.window(Seconds(60))

case class Tweet(createdAt:Long, text:String)

import org.apache.spark.sql.types._
import sparkSession.implicits._
def row(line: List[String]): Row = Row(line(0).toLong, line(1).toString)

val schema =
  StructType(
StructField("createdAT", LongType, false) ::
  StructField("Text", StringType, true) :: Nil)


 twt.map(status=>
 Tweet(status.getCreatedAt().getTime()/1000, status.getText())
).foreachRDD(rdd=>

rdd.toDF()
)


Error :
Error:(106, 15) value toDF is not a member of
org.apache.spark.rdd.RDD[Tweet]
  rdd.toDF()

So much confusion in Spark 2 regarding the Spark Session :(

Appreciate the help!

Thanks,
Divya


Re: ML:One vs Rest with crossValidator for multinomial in logistic regression

2018-01-31 Thread Nicolas Paris
Hey 

I am also interested in how to get those parameters.
For example, the demo code 
spark-2.2.1-bin-hadoop2.7/examples/src/main/python/ml/estimator_transformer_param_example.py
return empty parameters when  printing "lr.extractParamMap()"

That's weird

Thanks

Le 30 janv. 2018 à 23:10, Bryan Cutler écrivait :
> Hi Michelle,
> 
> Your original usage of ParamGridBuilder was not quite right, `addGrid` expects
> (some parameter, array of values for that parameter).  If you want to do a 
> grid
> search with different regularization values, you would do the following:
> 
> val paramMaps = new ParamGridBuilder().addGrid(logist.regParam, Array(0.1,
> 0.3)).build()
> 
> * don't forget to build the grid after adding values
> 
> On Tue, Jan 30, 2018 at 6:55 AM, michelleyang 
> wrote:
> 
> I tried to use One vs Rest in spark ml with pipeline and crossValidator 
> for
> multimultinomial in logistic regression.
> 
> It came out with empty coefficients. I figured out it was the setting of
> ParamGridBuilder. Can anyone help me understand how does the parameter
> setting affect the crossValidator process?
> 
> the orginal code: //output empty coefficients.
> 
> val logist=new LogisticRegression
> 
> val ova = new OneVsRest().setClassifier(logist)
> 
> val paramMaps = new ParamGridBuilder().addGrid(ova.classifier,
> Array(logist.getRegParam))
> 
> New code://output multi classes coefficients
> 
> val logist=new LogisticRegression
> 
> val ova = new OneVsRest().setClassifier(logist)
> 
> val classifier1 = new LogisticRegression().setRegParam(2.0)
> 
> val classifier2 = new LogisticRegression().setRegParam(3.0)
> 
> val paramMaps = new ParamGridBuilder() .addGrid(ova.classifier,
> Array(classifier1, classifier2))
> 
> Please help Thanks.
> 
> 
> 
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
> 
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> 
> 
> 

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark Structured Streaming for Twitter Streaming data

2018-01-31 Thread Divya Gehlot
Got it Thanks for the clarification TD !

On Thu, 1 Feb 2018 at 11:36 AM, Tathagata Das 
wrote:

> The code uses the format "socket" which is only for text sent over a
> simple socket, which is completely different from how Twitter APIs works.
> So this wont work at all.
> Fundamentally, for Structured Streaming, we have focused only on those
> streaming sources that have the capabilities record-level tracking offsets
> (e.g. Kafka offsets) and replayability in order to give strong exactly-once
> fault-tolerance guarantees. Hence we have focused on files, Kafka, Kinesis
> (socket is just for testing as is documented). Twitter APIs as a source
> does not provide those, hence we have not focused on building one. In
> general, for such sources (ones that are not perfectly replayable), there
> are two possible solutions.
>
> 1. Build your own source: A quick google search shows that others in the
> community have attempted to build structured-streaming sources for Twitter.
> It wont provide the same fault-tolerance guarantees as Kafka, etc. However,
> I dont recommend this now because the DataSource APIs to build streaming
> sources are not public yet, and are in flux.
>
> 2. Use Kafka/Kinesis as an intermediate system: Write something simple
> that uses Twitter APIs directly to read tweets and write them into
> Kafka/Kinesis. And then just read from Kafka/Kinesis.
>
> Hope this helps.
>
> TD
>
> On Wed, Jan 31, 2018 at 7:18 PM, Divya Gehlot 
> wrote:
>
>> Hi ,
>> I see ,Does that means Spark structured streaming doesn't work with
>> Twitter streams ?
>> I could see people used kafka or other streaming tools and used spark to
>> process the data in structured streaming .
>>
>> The below doesn't work directly with Twitter Stream until I set up Kafka
>> ?
>>
>>> import org.apache.spark.sql.SparkSession
>>> val spark = SparkSession
>>>   .builder()
>>>   .appName("Spark SQL basic example")
>>>   .config("spark.some.config.option", "some-value")
>>>   .getOrCreate()
>>> // For implicit conversions like converting RDDs to DataFrames
>>> import spark.implicits

 / Read text from socket
>>>
>>> val socketDF = spark
>>>
>>>   .readStream
>>>
>>>   .format("socket")
>>>
>>>   .option("host", "localhost")
>>>
>>>   .option("port", )
>>>
>>>   .load()
>>>
>>>
 socketDF.isStreaming// Returns True for DataFrames that have
 streaming sources
>>>
>>>
 socketDF.printSchema
>>>
>>>
>>>
>>
>>
>> Thanks,
>> Divya
>>
>> On 1 February 2018 at 10:30, Tathagata Das 
>> wrote:
>>
>>> Hello Divya,
>>>
>>> To add further clarification, the Apache Bahir does not have any
>>> Structured Streaming support for Twitter. It only has support for Twitter +
>>> DStreams.
>>>
>>> TD
>>>
>>>
>>>
>>> On Wed, Jan 31, 2018 at 2:44 AM, vermanurag <
>>> anurag.ve...@fnmathlogic.com> wrote:
>>>
 Twitter functionality is not part of Core Spark. We have successfully
 used
 the following packages from maven central in past

 org.apache.bahir:spark-streaming-twitter_2.11:2.2.0

 Earlier there used to be a twitter package under spark, but I find that
 it
 has not been updated beyond Spark 1.6
 org.apache.spark:spark-streaming-twitter_2.11:1.6.0

 Anurag
 www.fnmathlogic.com




 --
 Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

 -
 To unsubscribe e-mail: user-unsubscr...@spark.apache.org


>>>
>>
>


Re: Spark Structured Streaming for Twitter Streaming data

2018-01-31 Thread Tathagata Das
The code uses the format "socket" which is only for text sent over a simple
socket, which is completely different from how Twitter APIs works. So this
wont work at all.
Fundamentally, for Structured Streaming, we have focused only on those
streaming sources that have the capabilities record-level tracking offsets
(e.g. Kafka offsets) and replayability in order to give strong exactly-once
fault-tolerance guarantees. Hence we have focused on files, Kafka, Kinesis
(socket is just for testing as is documented). Twitter APIs as a source
does not provide those, hence we have not focused on building one. In
general, for such sources (ones that are not perfectly replayable), there
are two possible solutions.

1. Build your own source: A quick google search shows that others in the
community have attempted to build structured-streaming sources for Twitter.
It wont provide the same fault-tolerance guarantees as Kafka, etc. However,
I dont recommend this now because the DataSource APIs to build streaming
sources are not public yet, and are in flux.

2. Use Kafka/Kinesis as an intermediate system: Write something simple that
uses Twitter APIs directly to read tweets and write them into
Kafka/Kinesis. And then just read from Kafka/Kinesis.

Hope this helps.

TD

On Wed, Jan 31, 2018 at 7:18 PM, Divya Gehlot 
wrote:

> Hi ,
> I see ,Does that means Spark structured streaming doesn't work with
> Twitter streams ?
> I could see people used kafka or other streaming tools and used spark to
> process the data in structured streaming .
>
> The below doesn't work directly with Twitter Stream until I set up Kafka  ?
>
>> import org.apache.spark.sql.SparkSession
>> val spark = SparkSession
>>   .builder()
>>   .appName("Spark SQL basic example")
>>   .config("spark.some.config.option", "some-value")
>>   .getOrCreate()
>> // For implicit conversions like converting RDDs to DataFrames
>> import spark.implicits
>>>
>>> / Read text from socket
>>
>> val socketDF = spark
>>
>>   .readStream
>>
>>   .format("socket")
>>
>>   .option("host", "localhost")
>>
>>   .option("port", )
>>
>>   .load()
>>
>>
>>> socketDF.isStreaming// Returns True for DataFrames that have
>>> streaming sources
>>
>>
>>> socketDF.printSchema
>>
>>
>>
>
>
> Thanks,
> Divya
>
> On 1 February 2018 at 10:30, Tathagata Das 
> wrote:
>
>> Hello Divya,
>>
>> To add further clarification, the Apache Bahir does not have any
>> Structured Streaming support for Twitter. It only has support for Twitter +
>> DStreams.
>>
>> TD
>>
>>
>>
>> On Wed, Jan 31, 2018 at 2:44 AM, vermanurag > > wrote:
>>
>>> Twitter functionality is not part of Core Spark. We have successfully
>>> used
>>> the following packages from maven central in past
>>>
>>> org.apache.bahir:spark-streaming-twitter_2.11:2.2.0
>>>
>>> Earlier there used to be a twitter package under spark, but I find that
>>> it
>>> has not been updated beyond Spark 1.6
>>> org.apache.spark:spark-streaming-twitter_2.11:1.6.0
>>>
>>> Anurag
>>> www.fnmathlogic.com
>>>
>>>
>>>
>>>
>>> --
>>> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>>>
>>> -
>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>>
>>>
>>
>


FOSDEM mini-office hour?

2018-01-31 Thread Holden Karau
Hi Spark Friends,

If any folks are around for FOSDEM this year I was planning on doing a
coffee office hour on the last day after my talks
. Maybe like 6pm?
I'm also going to see if any BEAM folks are around and interested :)

Cheers,

Holden :)

-- 
Twitter: https://twitter.com/holdenkarau


Re: Spark Structured Streaming for Twitter Streaming data

2018-01-31 Thread Divya Gehlot
Hi ,
I see ,Does that means Spark structured streaming doesn't work with Twitter
streams ?
I could see people used kafka or other streaming tools and used spark to
process the data in structured streaming .

The below doesn't work directly with Twitter Stream until I set up Kafka  ?

> import org.apache.spark.sql.SparkSession
> val spark = SparkSession
>   .builder()
>   .appName("Spark SQL basic example")
>   .config("spark.some.config.option", "some-value")
>   .getOrCreate()
> // For implicit conversions like converting RDDs to DataFrames
> import spark.implicits
>>
>> / Read text from socket
>
> val socketDF = spark
>
>   .readStream
>
>   .format("socket")
>
>   .option("host", "localhost")
>
>   .option("port", )
>
>   .load()
>
>
>> socketDF.isStreaming// Returns True for DataFrames that have
>> streaming sources
>
>
>> socketDF.printSchema
>
>
>


Thanks,
Divya

On 1 February 2018 at 10:30, Tathagata Das 
wrote:

> Hello Divya,
>
> To add further clarification, the Apache Bahir does not have any
> Structured Streaming support for Twitter. It only has support for Twitter +
> DStreams.
>
> TD
>
>
>
> On Wed, Jan 31, 2018 at 2:44 AM, vermanurag 
> wrote:
>
>> Twitter functionality is not part of Core Spark. We have successfully used
>> the following packages from maven central in past
>>
>> org.apache.bahir:spark-streaming-twitter_2.11:2.2.0
>>
>> Earlier there used to be a twitter package under spark, but I find that it
>> has not been updated beyond Spark 1.6
>> org.apache.spark:spark-streaming-twitter_2.11:1.6.0
>>
>> Anurag
>> www.fnmathlogic.com
>>
>>
>>
>>
>> --
>> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>>
>


Re: Spark Structured Streaming for Twitter Streaming data

2018-01-31 Thread Tathagata Das
Hello Divya,

To add further clarification, the Apache Bahir does not have any Structured
Streaming support for Twitter. It only has support for Twitter + DStreams.

TD



On Wed, Jan 31, 2018 at 2:44 AM, vermanurag 
wrote:

> Twitter functionality is not part of Core Spark. We have successfully used
> the following packages from maven central in past
>
> org.apache.bahir:spark-streaming-twitter_2.11:2.2.0
>
> Earlier there used to be a twitter package under spark, but I find that it
> has not been updated beyond Spark 1.6
> org.apache.spark:spark-streaming-twitter_2.11:1.6.0
>
> Anurag
> www.fnmathlogic.com
>
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Max number of streams supported ?

2018-01-31 Thread Yogesh Mahajan
Thanks Michael, TD for quick reply. It was helpful. I will let you know the
numbers(limit) based on my experiments.

On Wed, Jan 31, 2018 at 3:10 PM, Tathagata Das 
wrote:

> Just to clarify a subtle difference between DStreams and Structured
> Streaming. Multiple input streams in a DStreamGraph is likely to mean they
> are all being processed/computed in the same way as there can be only one
> streaming query / context active in the StreamingContext. However, in the
> case of Structured Streaming, there can be any number of independent
> streaming queries (i.e. different computations), and each streaming query
> with any number if separate input sources. So Michael's comment of "each
> stream will have a thread on the driver" is correct when there are many
> independent queries with different computations simultaneously running.
> However if all your streams need to be processed in the same way, then its
> one streaming query with many inputs, and will require one thread.
>
> Hope this helps.
>
> TD
>
> On Wed, Jan 31, 2018 at 12:39 PM, Michael Armbrust  > wrote:
>
>> -dev +user
>>
>>
>>> Similarly for structured streaming, Would there be any limit on number
>>> of of streaming sources I can have ?
>>>
>>
>> There is no fundamental limit, but each stream will have a thread on the
>> driver that is doing coordination of execution.  We comfortably run 20+
>> streams on a single cluster in production, but I have not pushed the
>> limits.  You'd want to test with your specific application.
>>
>
>


Re: mapGroupsWithState in Python

2018-01-31 Thread ayan guha
Thanks a lot TD, exactly what I was looking for. And I have seen most of
your talks, really great stuff you guys are doing :)

On Thu, Feb 1, 2018 at 10:38 AM, Tathagata Das 
wrote:

> Hello Ayan,
>
> From what I understand, mapGroupsWithState (probably the more general
> flatMapGroupsWithState) is the best way forward (not available in python).
> However, you need to figure out your desired semantics of when you want to
> output the deduplicated data from the stremaing query. For example, if
> there is the following sequence of events
>
> (id, last_update_timestamp, attribute)
> 1, 12:00, A  < do you want to output this immediately or wait for
> sometime to see if there are new data?
> 1, 11:59, B  < ignored as duplicate
> 1, 12:01, C < do you want to output this?
> 1, 12:02, D
>
> If you want to output something every time there is a newer 
> last_update_timestamp,
> then thats not really a strict "deduplication". Its more like aggregation
> with keeping the latest. In that case, you can try using UDAFs as well.
> However, with UDAFs you wont get any state cleanup. So the
> flatMapGroupsWithState is the best solution as you can do whatever tracking
> you want, output whenever you want, and get state cleanup using timeouts.
>
> FYI: i have elaborated on flatMapGroupsWithState and timeouts in my talk -
> https://databricks.com/session/deep-dive-into-
> stateful-stream-processing-in-structured-streaming
>
>
>
>
>
>
>
> On Tue, Jan 30, 2018 at 5:14 AM, ayan guha  wrote:
>
>> Any help would be much appreciated :)
>>
>> On Mon, Jan 29, 2018 at 6:25 PM, ayan guha  wrote:
>>
>>> Hi
>>>
>>> I want to write something in Structured streaming:
>>>
>>> 1. I have a dataset which has 3 columns: id, last_update_timestamp,
>>> attribute
>>> 2. I am receiving the data through Kinesis
>>>
>>> I want to deduplicate records based on last_updated. In batch, it looks
>>> like:
>>>
>>> spark.sql("select * from (Select *, row_number() OVER(Partition by id
>>> order by last_updated desc) rank  from table1) tmp where rank =1")
>>>
>>> But now I would like to do it in Structured Stream. I need to maintain
>>> the state of id as per the highest last_updated, across the triggers, for a
>>> certain period (24 hours).
>>>
>>> Questions:
>>>
>>> 1. Should I use mapGroupsWithState or is there any other (SQL?)
>>> solution? Can anyone help me to write it?
>>> 2. Is mapGroupsWithState supported in Python?
>>>
>>>  Just to ensure we cover bases, I have already tried using
>>> dropDuplicates, but it is keeping the 1st record encountered for an Id, not
>>> updating the state:
>>>
>>> unpackedDF = kinesisDF.selectExpr("cast (data as STRING) jsonData")
>>> dataDF = unpackedDF.select(get_json_object(unpackedDF.jsonData, '$.
>>> header.id').alias('id'),
>>>   get_json_object(unpackedDF.jsonData,
>>> '$.header.last_updated').cast('timestamp').alias('last_updated'),
>>>   unpackedDF.jsonData)
>>>
>>> dedupDF = 
>>> dataDF.dropDuplicates(subset=['id']).withWatermark('last_updated','24
>>> hours')
>>>
>>>
>>> So it is not working. Any help is appreciated.
>>>
>>> --
>>> Best Regards,
>>> Ayan Guha
>>>
>>
>>
>>
>> --
>> Best Regards,
>> Ayan Guha
>>
>
>


-- 
Best Regards,
Ayan Guha


Re: Apache Spark - Exception on adding column to Structured Streaming DataFrame

2018-01-31 Thread Tathagata Das
Could you give the full stack trace of the exception?

Also, can you do `dataframe2.explain(true)` and show us the plan output?



On Wed, Jan 31, 2018 at 3:35 PM, M Singh 
wrote:

> Hi Folks:
>
> I have to add a column to a structured *streaming* dataframe but when I
> do that (using select or withColumn) I get an exception.  I can add a
> column in structured *non-streaming* structured dataframe. I could not
> find any documentation on how to do this in the following doc  [
> https://spark.apache.org/docs/latest/
> *structured-streaming-programming-guide*.html]
>
> I am using spark 2.4.0-SNAPSHOT
>
> Please let me know what I could be missing.
>
> Thanks for your help.
>
> (I am also attaching the source code for the structured streaming,
> structured non-streaming classes and input file with this email)
>
> 
> org.apache.spark.sql.catalyst.analysis.UnresolvedException: Invalid call
> to dataType on unresolved object, tree: 'cts
> at org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute.dataType(
> unresolved.scala:105)
> at org.apache.spark.sql.types.StructType$$anonfun$fromAttributes$1.apply(
> StructType.scala:435)
> 
>
> Here is the input file (in the ./data directory) - note tokens are
> separated by '\t'
>
> 1 v1
> 2 v1
> 2 v2
> 3 v3
> 3 v1
>
> Here is the code with dataframe (*non-streaming*) which works:
>
> import scala.collection.immutable
> import org.apache.spark.sql.functions._
> import org.apache.spark.sql._
> import org.apache.spark.sql.types._
>
> object StructuredTest {
>   def main(args:Array[String]) : Unit = {
> val sparkBuilder = SparkSession
>   .builder.
>   appName("StreamingTest").master("local[4]")
>
> val spark = sparkBuilder.getOrCreate()
>
> val schema = StructType(
> Array(
>   StructField("id", StringType, false),
>   StructField("visit", StringType, false)
>   ))
> var dataframe = spark.read.option("sep","\t").schema(schema).csv(
> "./data/")
> var dataframe2 = dataframe.select(expr("*"), current_timestamp().as(
> "cts"))
> dataframe2.show(false)
> spark.stop()
>
>   }
> }
>
> Output of the above code is:
>
> +---+-+---+
> |id |visit|cts|
> +---+-+---+
> |1  |v1   |2018-01-31 15:07:00.758|
> |2  |v1   |2018-01-31 15:07:00.758|
> |2  |v2   |2018-01-31 15:07:00.758|
> |3  |v3   |2018-01-31 15:07:00.758|
> |3  |v1   |2018-01-31 15:07:00.758|
> +---+-+---+
>
>
> Here is the code with *structured streaming* which throws the exception:
>
> import scala.collection.immutable
> import org.apache.spark.sql.functions._
> import org.joda.time._
> import org.apache.spark.sql._
> import org.apache.spark.sql.types._
> import org.apache.spark.sql.streaming._
> import org.apache.log4j._
>
> object StreamingTest {
>   def main(args:Array[String]) : Unit = {
> val sparkBuilder = SparkSession
>   .builder.
>   config("spark.sql.streaming.checkpointLocation", "./checkpointes").
>   appName("StreamingTest").master("local[4]")
>
> val spark = sparkBuilder.getOrCreate()
>
> val schema = StructType(
> Array(
>   StructField("id", StringType, false),
>   StructField("visit", StringType, false)
>   ))
> var dataframeInput = spark.readStream.option("sep","\t"
> ).schema(schema).csv("./data/")
> var dataframe2 = dataframeInput.select("*")
> dataframe2 = dataframe2.withColumn("cts", current_timestamp())
> val query = dataframe2.writeStream.option("trucate","false").format("
> console").start
> query.awaitTermination()
>   }
> }
>
> Here is the exception:
>
> 18/01/31 15:10:25 ERROR MicroBatchExecution: Query [id =
> 0fe655de-9096-4d69-b6a5-c593400d2eba, runId = 
> 2394a402-dd52-49b4-854e-cb46684bf4d8]
> terminated with error
> *org.apache.spark.sql.catalyst.analysis.UnresolvedException: Invalid call
> to dataType on unresolved object, tree: 'cts*
> at org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute.dataType(
> unresolved.scala:105)
> at org.apache.spark.sql.types.StructType$$anonfun$fromAttributes$1.apply(
> StructType.scala:435)
> at org.apache.spark.sql.types.StructType$$anonfun$fromAttributes$1.apply(
> StructType.scala:435)
>
> I've also used snippets (shown in bold below) from (
> https://docs.databricks.com/spark/latest/structured-
> streaming/examples.html)
> but still get the same exception:
>
> Here is the code:
>
> import scala.collection.immutable
> import org.apache.spark.sql.functions._
> import org.joda.time._
> import org.apache.spark.sql._
> import org.apache.spark.sql.types._
> import org.apache.spark.sql.streaming._
> import org.apache.log4j._
>
> object StreamingTest {
>   def main(args:Array[String]) : Unit = {
> val sparkBuilder = SparkSession
>   .builder.
>   config("spark.sql.streaming.checkpointLocation", "./checkpointes").
>   appName("StreamingTest").master("local[4]")
>
> val 

Re: mapGroupsWithState in Python

2018-01-31 Thread Tathagata Das
Hello Ayan,

>From what I understand, mapGroupsWithState (probably the more general
flatMapGroupsWithState) is the best way forward (not available in python).
However, you need to figure out your desired semantics of when you want to
output the deduplicated data from the stremaing query. For example, if
there is the following sequence of events

(id, last_update_timestamp, attribute)
1, 12:00, A  < do you want to output this immediately or wait for
sometime to see if there are new data?
1, 11:59, B  < ignored as duplicate
1, 12:01, C < do you want to output this?
1, 12:02, D

If you want to output something every time there is a newer
last_update_timestamp,
then thats not really a strict "deduplication". Its more like aggregation
with keeping the latest. In that case, you can try using UDAFs as well.
However, with UDAFs you wont get any state cleanup. So the
flatMapGroupsWithState is the best solution as you can do whatever tracking
you want, output whenever you want, and get state cleanup using timeouts.

FYI: i have elaborated on flatMapGroupsWithState and timeouts in my talk -
https://databricks.com/session/deep-dive-into-stateful-stream-processing-in-structured-streaming







On Tue, Jan 30, 2018 at 5:14 AM, ayan guha  wrote:

> Any help would be much appreciated :)
>
> On Mon, Jan 29, 2018 at 6:25 PM, ayan guha  wrote:
>
>> Hi
>>
>> I want to write something in Structured streaming:
>>
>> 1. I have a dataset which has 3 columns: id, last_update_timestamp,
>> attribute
>> 2. I am receiving the data through Kinesis
>>
>> I want to deduplicate records based on last_updated. In batch, it looks
>> like:
>>
>> spark.sql("select * from (Select *, row_number() OVER(Partition by id
>> order by last_updated desc) rank  from table1) tmp where rank =1")
>>
>> But now I would like to do it in Structured Stream. I need to maintain
>> the state of id as per the highest last_updated, across the triggers, for a
>> certain period (24 hours).
>>
>> Questions:
>>
>> 1. Should I use mapGroupsWithState or is there any other (SQL?)
>> solution? Can anyone help me to write it?
>> 2. Is mapGroupsWithState supported in Python?
>>
>>  Just to ensure we cover bases, I have already tried using
>> dropDuplicates, but it is keeping the 1st record encountered for an Id, not
>> updating the state:
>>
>> unpackedDF = kinesisDF.selectExpr("cast (data as STRING) jsonData")
>> dataDF = unpackedDF.select(get_json_object(unpackedDF.jsonData, '$.
>> header.id').alias('id'),
>>   get_json_object(unpackedDF.jsonData,
>> '$.header.last_updated').cast('timestamp').alias('last_updated'),
>>   unpackedDF.jsonData)
>>
>> dedupDF = 
>> dataDF.dropDuplicates(subset=['id']).withWatermark('last_updated','24
>> hours')
>>
>>
>> So it is not working. Any help is appreciated.
>>
>> --
>> Best Regards,
>> Ayan Guha
>>
>
>
>
> --
> Best Regards,
> Ayan Guha
>


Apache Spark - Exception on adding column to Structured Streaming DataFrame

2018-01-31 Thread M Singh
Hi Folks:
I have to add a column to a structured streaming dataframe but when I do that 
(using select or withColumn) I get an exception.  I can add a column in 
structured non-streaming structured dataframe. I could not find any 
documentation on how to do this in the following doc  
[https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html]
I am using spark 2.4.0-SNAPSHOT
Please let me know what I could be missing.

Thanks for your help.
(I am also attaching the source code for the structured streaming, structured 
non-streaming classes and input file with this email)
org.apache.spark.sql.catalyst.analysis.UnresolvedException: Invalid 
call to dataType on unresolved object, tree: 'cts at 
org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute.dataType(unresolved.scala:105)
 at 
org.apache.spark.sql.types.StructType$$anonfun$fromAttributes$1.apply(StructType.scala:435)
Here is the input file (in the ./data directory) - note tokens are separated by 
'\t'
1 v12 v12 v23 v33 v1
Here is the code with dataframe (non-streaming) which works:
import scala.collection.immutableimport org.apache.spark.sql.functions._import 
org.apache.spark.sql._import org.apache.spark.sql.types._
object StructuredTest {  def main(args:Array[String]) : Unit = {    val 
sparkBuilder = SparkSession      .builder.      
appName("StreamingTest").master("local[4]")          val spark = 
sparkBuilder.getOrCreate()       val schema = StructType(        Array(         
 StructField("id", StringType, false),           StructField("visit", 
StringType, false)          ))    var dataframe = 
spark.read.option("sep","\t").schema(schema).csv("./data/")    var dataframe2 = 
dataframe.select(expr("*"), current_timestamp().as("cts"))    
dataframe2.show(false)    spark.stop()      }}
Output of the above code is:
+---+-+---+|id |visit|cts                    
|+---+-+---+|1  |v1   |2018-01-31 15:07:00.758||2  |v1  
 |2018-01-31 15:07:00.758||2  |v2   |2018-01-31 15:07:00.758||3  |v3   
|2018-01-31 15:07:00.758||3  |v1   |2018-01-31 
15:07:00.758|+---+-+---+

Here is the code with structured streaming which throws the exception:
import scala.collection.immutableimport org.apache.spark.sql.functions._import 
org.joda.time._import org.apache.spark.sql._import 
org.apache.spark.sql.types._import org.apache.spark.sql.streaming._import 
org.apache.log4j._
object StreamingTest {  def main(args:Array[String]) : Unit = {    val 
sparkBuilder = SparkSession      .builder.      
config("spark.sql.streaming.checkpointLocation", "./checkpointes").      
appName("StreamingTest").master("local[4]")          val spark = 
sparkBuilder.getOrCreate()       val schema = StructType(        Array(         
 StructField("id", StringType, false),           StructField("visit", 
StringType, false)          ))    var dataframeInput = 
spark.readStream.option("sep","\t").schema(schema).csv("./data/")    var 
dataframe2 = dataframeInput.select("*")    dataframe2 = 
dataframe2.withColumn("cts", current_timestamp())    val query = 
dataframe2.writeStream.option("trucate","false").format("console").start    
query.awaitTermination()  }}
Here is the exception:
18/01/31 15:10:25 ERROR MicroBatchExecution: Query [id = 
0fe655de-9096-4d69-b6a5-c593400d2eba, runId = 
2394a402-dd52-49b4-854e-cb46684bf4d8] terminated with 
errororg.apache.spark.sql.catalyst.analysis.UnresolvedException: Invalid call 
to dataType on unresolved object, tree: 'cts at 
org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute.dataType(unresolved.scala:105)
 at 
org.apache.spark.sql.types.StructType$$anonfun$fromAttributes$1.apply(StructType.scala:435)
 at 
org.apache.spark.sql.types.StructType$$anonfun$fromAttributes$1.apply(StructType.scala:435)
I've also used snippets (shown in bold below) from 
(https://docs.databricks.com/spark/latest/structured-streaming/examples.html)but
 still get the same exception:
Here is the code:
import scala.collection.immutableimport org.apache.spark.sql.functions._import 
org.joda.time._import org.apache.spark.sql._import 
org.apache.spark.sql.types._import org.apache.spark.sql.streaming._import 
org.apache.log4j._
object StreamingTest {  def main(args:Array[String]) : Unit = {    val 
sparkBuilder = SparkSession      .builder.      
config("spark.sql.streaming.checkpointLocation", "./checkpointes").      
appName("StreamingTest").master("local[4]")          val spark = 
sparkBuilder.getOrCreate()       val schema = StructType(        Array(         
 StructField("id", StringType, false),           StructField("visit", 
StringType, false)          ))    var dataframeInput = 
spark.readStream.option("sep","\t").schema(schema).csv("./data/")    var 
dataframe2 = dataframeInput.select(      
current_timestamp().cast("timestamp").alias("timestamp"),      expr("*"))    
val query = 
dataframe2.writeStream.option("trucate","false").format("console").start
    query.awaitTermination() 

Re: Prefer Structured Streaming over Spark Streaming (DStreams)?

2018-01-31 Thread Michael Armbrust
At this point I recommend that new applications are built using structured
streaming. The engine was GA-ed as of Spark 2.2 and I know of several very
large (trillions of records) production jobs that are running in Structured
Streaming.  All of our production pipelines at databricks are written using
structured streaming as well.

Regarding the comparison with RDDs: The situation here is different than
when thinking about batch DataFrames vs. RDDs.  DataFrames are "just" a
higher-level abstraction on RDDs.  Structured streaming is a completely new
implementation that does not use DStreams at all, but instead directly runs
jobs using RDDs.  The advantages over DStreams include:
 - The ability to start and stop individual queries (rather than needing to
start/stop a separate StreamingContext)
 - The ability to upgrade your stream and still start from an existing
checkpoint
 - Support for working with Spark SQL data sources (json, parquet, etc)
 - Higher level APIs (DataFrames and SQL) and lambda functions (Datasets)
 - Support for event time aggregation

At this point, with the addition of mapGroupsWithState and
flatMapGroupsWithState, I think we should be at feature parity with
DStreams (and the state store does incremental checkpoints that are more
efficient than the DStream store).  However if there are applications you
are having a hard time porting over, please let us know!

On Wed, Jan 31, 2018 at 5:42 AM, vijay.bvp  wrote:

> here is my two cents, experts please correct me if wrong
>
> its important to understand why one over other and for what kind of use
> case. There might be sometime in future where low level API's are
> abstracted
> and become legacy but for now in Spark RDD API is the core and low level
> API, all higher APIs translate to RDD ultimately,  and RDD's are immutable.
>
> https://spark.apache.org/docs/latest/structured-streaming-
> programming-guide.html#unsupported-operations
> these are things that are not supported and this list needs to be validated
> with the use case you have.
>
> From my experience Structured Streaming is still new and DStreams API is a
> matured API.
> some things that are missing or need to explore more.
>
> watermarking/windowing based on no of records in a particular window
>
> assuming you have watermark and windowing on event time of the data,  the
> resultant dataframe is grouped data set, only thing you can do is run
> aggregate functions. you can't simply use that output as another dataframe
> and manipulate. There is a custom aggregator but I feel its limited.
>
> https://spark.apache.org/docs/latest/structured-streaming-
> programming-guide.html#arbitrary-stateful-operations
> There is option to do stateful operations, using GroupState where the
> function gets iterator of events for that window. This is the closest
> access
> to StateStore a developer could get.
> This arbitrary state that programmer could keep across invocations has its
> limitations as such how much state we could keep?, is that state stored in
> driver memory? What happens if the spark job fails is this checkpointed or
> restored?
>
> thanks
> Vijay
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Max number of streams supported ?

2018-01-31 Thread Tathagata Das
Just to clarify a subtle difference between DStreams and Structured
Streaming. Multiple input streams in a DStreamGraph is likely to mean they
are all being processed/computed in the same way as there can be only one
streaming query / context active in the StreamingContext. However, in the
case of Structured Streaming, there can be any number of independent
streaming queries (i.e. different computations), and each streaming query
with any number if separate input sources. So Michael's comment of "each
stream will have a thread on the driver" is correct when there are many
independent queries with different computations simultaneously running.
However if all your streams need to be processed in the same way, then its
one streaming query with many inputs, and will require one thread.

Hope this helps.

TD

On Wed, Jan 31, 2018 at 12:39 PM, Michael Armbrust 
wrote:

> -dev +user
>
>
>> Similarly for structured streaming, Would there be any limit on number of
>> of streaming sources I can have ?
>>
>
> There is no fundamental limit, but each stream will have a thread on the
> driver that is doing coordination of execution.  We comfortably run 20+
> streams on a single cluster in production, but I have not pushed the
> limits.  You'd want to test with your specific application.
>


Re: Max number of streams supported ?

2018-01-31 Thread Michael Armbrust
-dev +user


> Similarly for structured streaming, Would there be any limit on number of
> of streaming sources I can have ?
>

There is no fundamental limit, but each stream will have a thread on the
driver that is doing coordination of execution.  We comfortably run 20+
streams on a single cluster in production, but I have not pushed the
limits.  You'd want to test with your specific application.


Re: Apache Spark - Spark Structured Streaming - Watermark usage

2018-01-31 Thread Vishnu Viswanath
Hi Mans,

Watermark is Spark is used to decide when to clear the state, so if the
even it delayed more than when the state is cleared by Spark, then it will
be ignored.
I recently wrote a blog post on this :
http://vishnuviswanath.com/spark_structured_streaming.html#watermark

Yes, this State is applicable for aggregation only. If you are having only
a map function and don't want to process it, you could do a filter based on
its EventTime field, but I guess you will have to compare it with the
processing time since there is no API to access Watermark by the user.

-Vishnu

On Fri, Jan 26, 2018 at 1:14 PM, M Singh 
wrote:

> Hi:
>
> I am trying to filter out records which are lagging behind (based on event
> time) by a certain amount of time.
>
> Is the watermark api applicable to this scenario (ie, filtering lagging
> records) or it is only applicable with aggregation ?  I could not get a
> clear understanding from the documentation which only refers to it's usage
> with aggregation.
>
> Thanks
>
> Mans
>


Data of ArrayType field getting truncated when saving to parquet

2018-01-31 Thread HARSH TAKKAR
Hi

I have a dataframe with a field of type array which is of large size, when
i am trying to save the data to parquet file and read it again , array
field comes out as empty array.

Please help


Harsh


Singular Value Decomposition (SVD) in Spark Java

2018-01-31 Thread Donni Khan
Hi,

I would like to use the *Singular Value Decomposition* (SVD) to extract the
important concepts from a collection of text documents. I applied all
preprcessing pipeline( Tokenizer,  IDFModel,  Matrix, ... )

then I applied SVD
SingularValueDecomposition svd = rowMatrix.computeSVD(5,
*true*, 1.0E-9d);
Vect Matrix V = svd.V();


 I actually want to convert the results of SVD into the text(related
feature).

Does anyone know how can I get the original featurs from the Java Spark
Matrix ?


Thank you.
Doni


Re: Prefer Structured Streaming over Spark Streaming (DStreams)?

2018-01-31 Thread vijay.bvp
here is my two cents, experts please correct me if wrong

its important to understand why one over other and for what kind of use
case. There might be sometime in future where low level API's are abstracted
and become legacy but for now in Spark RDD API is the core and low level
API, all higher APIs translate to RDD ultimately,  and RDD's are immutable. 

https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#unsupported-operations
these are things that are not supported and this list needs to be validated
with the use case you have. 

>From my experience Structured Streaming is still new and DStreams API is a
matured API. 
some things that are missing or need to explore more.

watermarking/windowing based on no of records in a particular window

assuming you have watermark and windowing on event time of the data,  the
resultant dataframe is grouped data set, only thing you can do is run
aggregate functions. you can't simply use that output as another dataframe
and manipulate. There is a custom aggregator but I feel its limited.

https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#arbitrary-stateful-operations
There is option to do stateful operations, using GroupState where the
function gets iterator of events for that window. This is the closest access
to StateStore a developer could get. 
This arbitrary state that programmer could keep across invocations has its
limitations as such how much state we could keep?, is that state stored in
driver memory? What happens if the spark job fails is this checkpointed or
restored?

thanks
Vijay



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark Streaming Cluster queries

2018-01-31 Thread vijay.bvp
Assuming you are talking about Spark Streaming

1) How to analyze what part of code executes on Spark Driver and what part 
of code executes on the executors?

RDD's can be understood as set of data transformations or set of jobs. Your
understanding deepens as you do more programming with Spark. Here are some
good resources, some bit outdated but still good fundamentals.

A Deeper Understanding of Spark Internals - Aaron Davidson (Databricks)
  

Advanced Apache Spark Training - Sameer Farooqui (Databricks)
  

rdd-streaming debugging-streaming-applications.html

  

Look at the Task Details page in the spark UI for Streaming jobs

2) As Spark driver gets results from spark executors, should Spark executors 
have any access to redis storage? ( My guess is yes, because executors may 
need data from redis for further calculations)

Avoid the pattern of getting results back to driver and then doing something
or sending to some store. Driver quickly becomes bottleneck and you will
benefits of parallel programming. Having said that there is nothing
preventing you from doing it, for instance if you want to do a complex
calculation with output as single value you could do calculation on cluster
workers and collect it in driver and then send to some store. Still if
possible avoid using driver for these, driver is for scheduling of parallel
rdd jobs

which means each of your worker nodes needs to have access to redis store.
Also connections can't be serialized from driver to worker nodes. please
look at the documentation for the correct design pattern for storing data to
external stores and creating connections at workers. 

Design Patterns for using foreachRDD

  

3) Should the spark executors have access to the checkpoint storage s3? 
yes I believe ideally storage should be mapped to cluster avoiding login
each time. 
4) Can anyone share their checkpoint recovery strategy for s3?
Not specific to S3, but in general
checkpoint is time consuming process and can incur performance if the
checkpoint storage is slow. 
alternate is to use local storage which would be faster than S3 but that
means fault tolerance is not 100% guaranteed. 

thanks
Vijay



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark Structured Streaming for Twitter Streaming data

2018-01-31 Thread vermanurag
Twitter functionality is not part of Core Spark. We have successfully used
the following packages from maven central in past

org.apache.bahir:spark-streaming-twitter_2.11:2.2.0

Earlier there used to be a twitter package under spark, but I find that it
has not been updated beyond Spark 1.6 
org.apache.spark:spark-streaming-twitter_2.11:1.6.0

Anurag
www.fnmathlogic.com  




--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Prefer Structured Streaming over Spark Streaming (DStreams)?

2018-01-31 Thread Biplob Biswas
Hi,

I read an article which recommended to use dataframes instead of rdd
primitives. Now I read about the differences over using DStreams and
Structured Streaming and structured streaming adds a lot of improvements
like checkpointing, windowing, sessioning, fault tolerance etc.

What I am interested to know is, if I have to start a new project is there
any reason to prefer structured streaming over Dstreams?

One argument is that the engine is abstracted with structured streaming and
one can change the micro-batching engine to something like the continuous
processing engine.

Apart from that is there any special reason? Would there be a point in time
when the DStreams and RDD would become obsolete?


Re: [Spark Streaming]: Non-deterministic uneven task-to-machine assignment

2018-01-31 Thread vijay.bvp
Summarizing

1) Static data set read from Parquet files as DataFrame in HDFS has initial
parallelism of 90 (based on no input files)

2) static data set DataFrame is converted as rdd, and rdd has parallelism of
18 this was not expected
dataframe.rdd is lazy evaluation there must be some operation you were doing
that would have triggered
conversion from 90 to 18, this would be some operation that breaks
stage/requires shuffling such as groupby, reduceby, repartition,coalesce
if you are using coalesce, the second parameter shuff is by default false
which means upstream parallelism is not preserved.

3) you have DStream of Kafka source with 9 partitions this is joined with
above static data set? when joining have you tried setting up numPartitions
an optional parameter to provide no of partitions required.

4) your batch interval is 15 seconds but you are caching the static data set
for 30 minutes, what exactly you mean caching for 30 minutes?

Note when you cache data based on the memory pressure there is chance that
partitioning is not preserved. 

it would be useful to provide spark UI screen shots for one complete batch,
the DAG and other details

thanks
Vijay



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Type Casting Error in Spark Data Frame

2018-01-31 Thread vijay.bvp
formatted
=
Assuming MessageHelper.sqlMapping schema is correctly mapped with input json 
(it would help if the schema and sample json is shared) 

here is explode function with dataframes similar functionality is available
with SQL 

import sparkSession.implicits._ 
import org.apache.spark.sql.functions._ 
val routeEventDF=dataDF.select($"messageId"
,explode($"messageData").alias("MessageData")) 
 .select($"messageId",
$"MessageData.unixTime",$"MessageData.packetID", 
$"MessageData.messageID") 
routeEventDF.show 


thanks 
Vijay



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Type Casting Error in Spark Data Frame

2018-01-31 Thread vijay.bvp
Assuming MessageHelper.sqlMapping schema is correctly mapped with input json
(it would help if the schema and sample json is shared)here is explode
function with dataframes similar functionality is available with SQL import
sparkSession.implicits._import org.apache.spark.sql.functions._val
routeEventDF=dataDF.select($"messageId"
,explode($"messageData").alias("MessageData"))  
 
.select($"messageId", $"MessageData.unixTime",$"MessageData.packetID",  
 
$"MessageData.messageID") routeEventDF.show thanksVijay



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/