[jira] [Updated] (SPARK-18970) FileSource failure during file list refresh doesn't cause an application to fail, but stops further processing

2017-01-13 Thread Michael Armbrust (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-18970?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Michael Armbrust updated SPARK-18970:
-
Description: 
Spark streaming application uses S3 files as streaming sources. After running 
for several day processing stopped even though an application continued to run. 
Stack trace:
{code}
java.io.FileNotFoundException: No such file or directory 
's3n://X'
at 
com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.getFileStatus(S3NativeFileSystem.java:818)
at 
com.amazon.ws.emr.hadoop.fs.EmrFileSystem.getFileStatus(EmrFileSystem.java:511)
at 
org.apache.spark.sql.execution.datasources.HadoopFsRelation$$anonfun$7$$anonfun$apply$3.apply(fileSourceInterfaces.scala:465)
at 
org.apache.spark.sql.execution.datasources.HadoopFsRelation$$anonfun$7$$anonfun$apply$3.apply(fileSourceInterfaces.scala:462)
at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
at 
scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
at 
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
at 
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
at scala.collection.AbstractIterator.to(Iterator.scala:1336)
at 
scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1336)
at 
scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1336)
at 
org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:893)
at 
org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:893)
at 
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1897)
at 
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1897)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
at org.apache.spark.scheduler.Task.run(Task.scala:85)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
{code}
I believe 2 things should (or can) be fixed:
1. Application should fail in case of such an error.
2. Allow application to ignore such failure, since there is a chance that 
during next refresh the error will not resurface. (In my case I believe an 
error was cased by S3 cleaning the bucket exactly at the same moment when 
refresh was running) 

My code to create streaming processing looks as the following:
{code}
  val cq = sqlContext.readStream
.format("json")
.schema(struct)
.load(s"input")
.writeStream
.option("checkpointLocation", s"checkpoints")
.foreach(new ForeachWriter[Row] {...})
.trigger(ProcessingTime("10 seconds")).start()

  cq.awaitTermination() 
{code}

  was:
Spark streaming application uses S3 files as streaming sources. After running 
for several day processing stopped even though an application continued to run. 
Stack trace:
java.io.FileNotFoundException: No such file or directory 
's3n://X'
at 
com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.getFileStatus(S3NativeFileSystem.java:818)
at 
com.amazon.ws.emr.hadoop.fs.EmrFileSystem.getFileStatus(EmrFileSystem.java:511)
at 
org.apache.spark.sql.execution.datasources.HadoopFsRelation$$anonfun$7$$anonfun$apply$3.apply(fileSourceInterfaces.scala:465)
at 
org.apache.spark.sql.execution.datasources.HadoopFsRelation$$anonfun$7$$anonfun$apply$3.apply(fileSourceInterfaces.scala:462)
at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
at 
scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
at 
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
at 
s

Re: Dataset Type safety

2017-01-10 Thread Michael Armbrust
>
> As I've specified *.as[Person]* which does schema inferance then
> *"option("inferSchema","true")" *is redundant and not needed!


The resolution of fields is done by name, not by position for case
classes.  This is what allows us to support more complex things like JSON
or nested structures.  If you you just want to map it by position you can
do .as[(String, Long)] to map it to a tuple instead.

And lastly does .as[Person] check that column value matches with data type
> i.e. "age Long" would fail if it gets a non numeric value! because the
> input file could be millions of row which could be very time consuming.


No, this is a static check based on the schema.  It does not scan the data
(though schema inference does).

On Tue, Jan 10, 2017 at 11:34 AM, A Shaikh  wrote:

> I have a simple people.csv and following SimpleApp
>
>
> people.csv
> --
> name,age
> abc,22
> xyz,32
>
> 
> Working Code
> 
> Object SimpleApp {}
>   case class Person(name: String, age: Long)
>   def main(args: Array[String]): Unit = {
> val spark = SparkFactory.getSparkSession("PIPE2Dataset")
> import spark.implicits._
>
> val peopleDS = spark.read.option("inferSchema","true").option("header",
> "true").option("delimiter", ",").csv("/people.csv").as[Person]
> }
> 
>
>
> 
> Fails for data with no header
> 
> Removing header record "name,age" AND switching header option off
> =>.option("header", "false") return error => *cannot resolve '`name`'
> given input columns: [_c0, _c1]*
> val peopleDS = spark.read.option("inferSchema","true").option("header",
> "false").option("delimiter", ",").csv("/people.csv").as[Person]
>
> Should'nt this just assing the header from Person class
>
>
> 
> invalid data
> 
> As I've specified *.as[Person]* which does schema inferance then 
> *"option("inferSchema","true")"
> *is redundant and not needed!
>
>
> And lastly does .as[Person] check that column value matches with data type
> i.e. "age Long" would fail if it gets a non numeric value! because the
> input file could be millions of row which could be very time consuming.
>


Re: StateStoreSaveExec / StateStoreRestoreExec

2017-01-03 Thread Michael Armbrust
You might also be interested in this:
https://issues.apache.org/jira/browse/SPARK-19031

On Tue, Jan 3, 2017 at 3:36 PM, Michael Armbrust 
wrote:

> I think we should add something similar to mapWithState in 2.2.  It would
> be great if you could add the description of your problem to this ticket:
> https://issues.apache.org/jira/browse/SPARK-19067
>
> On Mon, Jan 2, 2017 at 2:05 PM, Jeremy Smith 
> wrote:
>
>> I have a question about state tracking in Structured Streaming.
>>
>> First let me briefly explain my use case: Given a mutable data source
>> (i.e. an RDBMS) in which we assume we can retrieve a set of newly created
>> row versions (being a row that was created or updated between two given
>> `Offset`s, whatever those are), we can create a Structured Streaming
>> `Source` which retrieves the new row versions. Further assuming that every
>> logical row has some primary key, then as long as we can track the current
>> offset for each primary key, we can differentiate between new and updated
>> rows. Then, when a row is updated, we can record that the previous version
>> of that row expired at some particular time. That's essentially what I'm
>> trying to do. This would effectively give you an "event-sourcing" type of
>> historical/immutable log of changes out of a mutable data source.
>>
>> I noticed that in Spark 2.0.1 there was a concept of a StateStore, which
>> seemed like it would allow me to do exactly the tracking that I needed, so
>> I decided to try and use that built-in functionality rather than some
>> external key/value store for storing the current "version number" of each
>> primary key. There were a lot of hard-coded hoops I had to jump through,
>> but I eventually made it work by implementing some custom LogicalPlans and
>> SparkPlans around StateStore[Save/Restore]Exec.
>>
>> Now, in Spark 2.1.0 it seems to have gotten even further away from what I
>> was using it for - the keyExpressions of StateStoreSaveExec must include a
>> timestamp column, which means that those expressions are not really keys
>> (at least not for a logical row). So it appears I can't use it that way
>> anymore (I can't blame Spark for this, as I knew what I was getting into
>> when leveraging developer APIs). There are also several hard-coded checks
>> which now make it clear that StateStore functionality is only to be used
>> for streaming aggregates, which is not really what I'm doing.
>>
>> My question is - is there a good way to accomplish the above use case
>> within Structured Streaming? Or is this the wrong use case for the state
>> tracking functionality (which increasingly seems to be targeted toward
>> aggregates only)? Is there a plan for any kind of generalized
>> `mapWithState`-type functionality for Structured Streaming, or should I
>> just give up on that and use an external key/value store for my state
>> tracking?
>>
>> Thanks,
>> Jeremy
>>
>
>


Re: StateStoreSaveExec / StateStoreRestoreExec

2017-01-03 Thread Michael Armbrust
I think we should add something similar to mapWithState in 2.2.  It would
be great if you could add the description of your problem to this ticket:
https://issues.apache.org/jira/browse/SPARK-19067

On Mon, Jan 2, 2017 at 2:05 PM, Jeremy Smith 
wrote:

> I have a question about state tracking in Structured Streaming.
>
> First let me briefly explain my use case: Given a mutable data source
> (i.e. an RDBMS) in which we assume we can retrieve a set of newly created
> row versions (being a row that was created or updated between two given
> `Offset`s, whatever those are), we can create a Structured Streaming
> `Source` which retrieves the new row versions. Further assuming that every
> logical row has some primary key, then as long as we can track the current
> offset for each primary key, we can differentiate between new and updated
> rows. Then, when a row is updated, we can record that the previous version
> of that row expired at some particular time. That's essentially what I'm
> trying to do. This would effectively give you an "event-sourcing" type of
> historical/immutable log of changes out of a mutable data source.
>
> I noticed that in Spark 2.0.1 there was a concept of a StateStore, which
> seemed like it would allow me to do exactly the tracking that I needed, so
> I decided to try and use that built-in functionality rather than some
> external key/value store for storing the current "version number" of each
> primary key. There were a lot of hard-coded hoops I had to jump through,
> but I eventually made it work by implementing some custom LogicalPlans and
> SparkPlans around StateStore[Save/Restore]Exec.
>
> Now, in Spark 2.1.0 it seems to have gotten even further away from what I
> was using it for - the keyExpressions of StateStoreSaveExec must include a
> timestamp column, which means that those expressions are not really keys
> (at least not for a logical row). So it appears I can't use it that way
> anymore (I can't blame Spark for this, as I knew what I was getting into
> when leveraging developer APIs). There are also several hard-coded checks
> which now make it clear that StateStore functionality is only to be used
> for streaming aggregates, which is not really what I'm doing.
>
> My question is - is there a good way to accomplish the above use case
> within Structured Streaming? Or is this the wrong use case for the state
> tracking functionality (which increasingly seems to be targeted toward
> aggregates only)? Is there a plan for any kind of generalized
> `mapWithState`-type functionality for Structured Streaming, or should I
> just give up on that and use an external key/value store for my state
> tracking?
>
> Thanks,
> Jeremy
>


[jira] [Created] (SPARK-19067) mapWithState Style API

2017-01-03 Thread Michael Armbrust (JIRA)
Michael Armbrust created SPARK-19067:


 Summary: mapWithState Style API
 Key: SPARK-19067
 URL: https://issues.apache.org/jira/browse/SPARK-19067
 Project: Spark
  Issue Type: New Feature
  Components: Structured Streaming
Reporter: Michael Armbrust
Priority: Critical


Right now the only way to do stateful operations with with Aggregator or UDAF.  
However, this does not give users control of emission or expiration of state 
making it hard to implement things like sessionization.  We should add a more 
general construct (probably similar to {{mapWithState}}) to structured 
streaming.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-19065) Bad error when using dropDuplicates in Streaming

2017-01-03 Thread Michael Armbrust (JIRA)
Michael Armbrust created SPARK-19065:


 Summary: Bad error when using dropDuplicates in Streaming
 Key: SPARK-19065
 URL: https://issues.apache.org/jira/browse/SPARK-19065
 Project: Spark
  Issue Type: Bug
  Components: Structured Streaming
Affects Versions: 2.1.0
Reporter: Michael Armbrust


Right now if you use .dropDuplicates in a stream you get a confusing exception.

Here is an example:
{code}
org.apache.spark.sql.AnalysisException: resolved attribute(s) 
accountName#34351,eventSource#34331,resources#34339,eventType#34333,readOnly#34335,date#34350,errorCode#34327,errorMessage#34328,userAgent#34344,eventVersion#34334,eventTime#34332,recipientAccountId#34336,sharedEventID#34341,timing#34349,apiVersion#34325,additionalEventData#34324,requestParameters#34338,sourceIPAddress#34342,serviceEventDetails#34343,timestamp#34323,awsRegion#34326,eventName#34330,responseElements#34340,filename#34347,requestID#34337,vpcEndpointId#34346,line#34348,userIdentity#34345
 missing from 
requestID#34119,eventSource#34113,serviceEventDetails#34125,eventVersion#34116,userIdentity#34127,requestParameters#34120,accountName#34133,apiVersion#34107,eventTime#34114,additionalEventData#34106,line#34130,readOnly#34117,sourceIPAddress#34124,eventID#34329,errorCode#34109,resources#34121,timing#34131,userAgent#34126,eventType#34115,recipientAccountId#34118,errorMessage#34110,vpcEndpointId#34128,sharedEventID#34123,filename#34129,awsRegion#34108,responseElements#34122,date#34132,timestamp#34105,eventName#34112
 in operator !Project [timestamp#34323, additionalEventData#34324, 
apiVersion#34325, awsRegion#34326, errorCode#34327, errorMessage#34328, 
eventID#34329, eventName#34330, eventSource#34331, eventTime#34332, 
eventType#34333, eventVersion#34334, readOnly#34335, recipientAccountId#34336, 
requestID#34337, requestParameters#34338, resources#34339, 
responseElements#34340, sharedEventID#34341, sourceIPAddress#34342, 
serviceEventDetails#34343, userAgent#34344, userIdentity#34345, 
vpcEndpointId#34346, ... 5 more fields];;

!Project [timestamp#34323, additionalEventData#34324, apiVersion#34325, 
awsRegion#34326, errorCode#34327, errorMessage#34328, eventID#34329, 
eventName#34330, eventSource#34331, eventTime#34332, eventType#34333, 
eventVersion#34334, readOnly#34335, recipientAccountId#34336, requestID#34337, 
requestParameters#34338, resources#34339, responseElements#34340, 
sharedEventID#34341, sourceIPAddress#34342, serviceEventDetails#34343, 
userAgent#34344, userIdentity#34345, vpcEndpointId#34346, ... 5 more fields]
+- Aggregate [eventID#34329], [first(timestamp#34323, false) AS 
timestamp#34105, first(additionalEventData#34324, false) AS 
additionalEventData#34106, first(apiVersion#34325, false) AS apiVersion#34107, 
first(awsRegion#34326, false) AS awsRegion#34108, first(errorCode#34327, false) 
AS errorCode#34109, first(errorMessage#34328, false) AS errorMessage#34110, 
eventID#34329, first(eventName#34330, false) AS eventName#34112, 
first(eventSource#34331, false) AS eventSource#34113, first(eventTime#34332, 
false) AS eventTime#34114, first(eventType#34333, false) AS eventType#34115, 
first(eventVersion#34334, false) AS eventVersion#34116, first(readOnly#34335, 
false) AS readOnly#34117, first(recipientAccountId#34336, false) AS 
recipientAccountId#34118, first(requestID#34337, false) AS requestID#34119, 
first(requestParameters#34338, false) AS requestParameters#34120, 
first(resources#34339, false) AS resources#34121, first(responseElements#34340, 
false) AS responseElements#34122, first(sharedEventID#34341, false) AS 
sharedEventID#34123, first(sourceIPAddress#34342, false) AS 
sourceIPAddress#34124, first(serviceEventDetails#34343, false) AS 
serviceEventDetails#34125, first(userAgent#34344, false) AS userAgent#34126, 
first(userIdentity#34345, false) AS userIdentity#34127, 
first(vpcEndpointId#34346, false) AS vpcEndpointId#34128, ... 5 more fields]
   +- Project [timestamp#34323, additionalEventData#34324, apiVersion#34325, 
awsRegion#34326, errorCode#34327, errorMessage#34328, eventID#34329, 
eventName#34330, eventSource#34331, eventTime#34332, eventType#34333, 
eventVersion#34334, readOnly#34335, recipientAccountId#34336, requestID#34337, 
requestParameters#34338, resources#34339, responseElements#34340, 
sharedEventID#34341, sourceIPAddress#34342, serviceEventDetails#34343, 
userAgent#34344, userIdentity#34345, vpcEndpointId#34346, ... 5 more fields]
  +- 
Relation[timestamp#34323,additionalEventData#34324,apiVersion#34325,awsRegion#34326,errorCode#34327,errorMessage#34328,eventID#34329,eventName#34330,eventSource#34331,eventTime#34332,eventType#34333,eventVersion#34334,readOnly#34335,recipientAccountId#34336,requestID#34337,requestParameters#34338,resources#34339,responseElements#34340,sharedEventID#34341,sourceIPAddress#34342,serviceEventDetails#34343,userAgent#34344

[jira] [Created] (SPARK-19031) JDBC Streaming Source

2016-12-29 Thread Michael Armbrust (JIRA)
Michael Armbrust created SPARK-19031:


 Summary: JDBC Streaming Source
 Key: SPARK-19031
 URL: https://issues.apache.org/jira/browse/SPARK-19031
 Project: Spark
  Issue Type: New Feature
  Components: Structured Streaming
Reporter: Michael Armbrust


Many RDBMs provide the ability to capture changes to a table (change data 
capture).  We should make this available as a streaming source.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



Re: [Spark Structured Streaming]: Is it possible to ingest data from a jdbc data source incrementally?

2016-12-29 Thread Michael Armbrust
We don't support this yet, but I've opened this JIRA as it sounds generally
useful: https://issues.apache.org/jira/browse/SPARK-19031

In the mean time you could try implementing your own Source, but that is
pretty low level and is not yet a stable API.

On Thu, Dec 29, 2016 at 4:05 AM, "Yuanzhe Yang (杨远哲)" 
wrote:

> Hi all,
>
> Thanks a lot for your contributions to bring us new technologies.
>
> I don't want to waste your time, so before I write to you, I googled,
> checked stackoverflow and mailing list archive with keywords "streaming"
> and "jdbc". But I was not able to get any solution to my use case. I hope I
> can get some clarification from you.
>
> The use case is quite straightforward, I need to harvest a relational
> database via jdbc, do something with data, and store result into Kafka. I
> am stuck at the first step, and the difficulty is as follows:
>
> 1. The database is too large to ingest with one thread.
> 2. The database is dynamic and time series data comes in constantly.
>
> Then an ideal workflow is that multiple workers process partitions of data
> incrementally according to a time window. For example, the processing
> starts from the earliest data with each batch containing data for one hour.
> If data ingestion speed is faster than data production speed, then
> eventually the entire database will be harvested and those workers will
> start to "tail" the database for new data streams and the processing
> becomes real time.
>
> With Spark SQL I can ingest data from a JDBC source with partitions
> divided by time windows, but how can I dynamically increment the time
> windows during execution? Assume that there are two workers ingesting data
> of 2017-01-01 and 2017-01-02, the one which finishes quicker gets next task
> for 2017-01-03. But I am not able to find out how to increment those values
> during execution.
>
> Then I looked into Structured Streaming. It looks much more promising
> because window operations based on event time are considered during
> streaming, which could be the solution to my use case. However, from
> documentation and code example I did not find anything related to streaming
> data from a growing database. Is there anything I can read to achieve my
> goal?
>
> Any suggestion is highly appreciated. Thank you very much and have a nice
> day.
>
> Best regards,
> Yang
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: What is mainly different from a UDT and a spark internal type that ExpressionEncoder recognized?

2016-12-27 Thread Michael Armbrust
An encoder uses reflection

to generate expressions that can extract data out of an object (by calling
methods on the object) and encode its contents directly into the tungsten
binary row format (and vice versa).  We codegenerate bytecode that
evaluates these expression in the same way that we code generate code for
normal expression evaluation in query processing.  However, this reflection
only works for simple ATDs
.  Another key thing to
realize is that we do this reflection / code generation at runtime, so we
aren't constrained by binary compatibility across versions of spark.

UDTs let you write custom code that translates an object into into a
generic row, which we can then translate into Spark's internal format
(using a RowEncoder). Unlike expressions and tungsten binary encoding, the
Row type that you return here is a stable public API that hasn't changed
since Spark 1.3.

So to summarize, if encoders don't work for your specific types you can use
UDTs, but they probably won't be as efficient. I'd love to unify these code
paths more, but its actually a fair amount of work to come up with a good
stable public API that doesn't sacrifice performance.

On Tue, Dec 27, 2016 at 6:32 AM, dragonly  wrote:

> I'm recently reading the source code of the SparkSQL project, and found
> some
> interesting databricks blogs about the tungsten project. I've roughly read
> through the encoder and unsafe representation part of the tungsten
> project(haven't read the algorithm part such as cache friendly hashmap
> algorithms).
> Now there's a big puzzle in front of me about the codegen of SparkSQL and
> how does the codegen utilize the tungsten encoding between JMV objects and
> unsafe bits.
> So can anyone tell me that's the main difference in situations where I
> write
> a UDT like ExamplePointUDT in SparkSQL or just create an ArrayType which
> can
> be handled by the tungsten encoder? I'll really appreciate it if you can go
> through some concrete code examples. thanks a lot!
>
>
>
> --
> View this message in context: http://apache-spark-
> developers-list.1001551.n3.nabble.com/What-is-mainly-
> different-from-a-UDT-and-a-spark-internal-type-that-
> ExpressionEncoder-recognized-tp20370.html
> Sent from the Apache Spark Developers List mailing list archive at
> Nabble.com.
>
> -
> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>
>


[jira] [Updated] (SPARK-17344) Kafka 0.8 support for Structured Streaming

2016-12-22 Thread Michael Armbrust (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-17344?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Michael Armbrust updated SPARK-17344:
-
Target Version/s:   (was: 2.1.1)

> Kafka 0.8 support for Structured Streaming
> --
>
> Key: SPARK-17344
> URL: https://issues.apache.org/jira/browse/SPARK-17344
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>Reporter: Frederick Reiss
>
> Design and implement Kafka 0.8-based sources and sinks for Structured 
> Streaming.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-17344) Kafka 0.8 support for Structured Streaming

2016-12-19 Thread Michael Armbrust (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17344?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15762756#comment-15762756
 ] 

Michael Armbrust commented on SPARK-17344:
--

[KAFKA-4462] aims to give us backwards compatibility for clients which will be 
great.  The fact that there is a long term plan here makes me less allergic to 
the idea of copy/pasting the 0.10.x {{Source}} and porting it to 0.8.0 as an 
interim solution for those who can upgrade yet.

> Kafka 0.8 support for Structured Streaming
> --
>
> Key: SPARK-17344
> URL: https://issues.apache.org/jira/browse/SPARK-17344
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>Reporter: Frederick Reiss
>
> Design and implement Kafka 0.8-based sources and sinks for Structured 
> Streaming.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-17344) Kafka 0.8 support for Structured Streaming

2016-12-19 Thread Michael Armbrust (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-17344?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Michael Armbrust updated SPARK-17344:
-
Target Version/s: 2.1.1

> Kafka 0.8 support for Structured Streaming
> --
>
> Key: SPARK-17344
> URL: https://issues.apache.org/jira/browse/SPARK-17344
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>Reporter: Frederick Reiss
>
> Design and implement Kafka 0.8-based sources and sinks for Structured 
> Streaming.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-18908) It's hard for the user to see the failure if StreamExecution fails to create the logical plan

2016-12-19 Thread Michael Armbrust (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-18908?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Michael Armbrust updated SPARK-18908:
-
Target Version/s: 2.1.1

> It's hard for the user to see the failure if StreamExecution fails to create 
> the logical plan
> -
>
> Key: SPARK-18908
> URL: https://issues.apache.org/jira/browse/SPARK-18908
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.1.0
>Reporter: Shixiong Zhu
>Assignee: Shixiong Zhu
>Priority: Critical
>
> If the logical plan fails to create, e.g., some Source options are invalid, 
> the user cannot use the code to detect the failure. The only place receiving 
> this error is Thread's UncaughtExceptionHandler.
> This bug is because logicalPlan is lazy, and when we try to create 
> StreamingQueryException to wrap the exception thrown by creating logicalPlan, 
> it calls logicalPlan agains.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-18932) Partial aggregation for collect_set / collect_list

2016-12-19 Thread Michael Armbrust (JIRA)
Michael Armbrust created SPARK-18932:


 Summary: Partial aggregation for collect_set / collect_list
 Key: SPARK-18932
 URL: https://issues.apache.org/jira/browse/SPARK-18932
 Project: Spark
  Issue Type: Improvement
  Components: SQL, Structured Streaming
Reporter: Michael Armbrust


The lack of partial aggregation here is blocking us from using these in 
streaming.  It still won't be fast, but it would be nice to at least be able to 
use them.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



Re: How to get recent value in spark dataframe

2016-12-16 Thread Michael Armbrust
Oh and to get the null for missing years, you'd need to do an outer join
with a table containing all of the years you are interested in.

On Fri, Dec 16, 2016 at 3:24 PM, Michael Armbrust 
wrote:

> Are you looking for argmax? Here is an example
> <https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/1023043053387187/3170497669323442/2840265927289860/latest.html>
> .
>
> On Wed, Dec 14, 2016 at 8:49 PM, Milin korath 
> wrote:
>
>> Hi
>>
>> I have a spark data frame with following structure
>>
>>  id  flag price date
>>   a   0100  2015
>>   a   050   2015
>>   a   1200  2014
>>   a   1300  2013
>>   a   0400  2012
>>
>> I need to create a data frame with recent value of flag 1 and updated in
>> the flag 0 rows.
>>
>>   id  flag price date new_column
>>   a   0100  2015200
>>   a   050   2015200
>>   a   1200  2014null
>>   a   1300  2013null
>>   a   0400  2012null
>>
>> We have 2 rows having flag=0. Consider the first row(flag=0),I will have
>> 2 values(200 and 300) and I am taking the recent one 200(2014). And the
>> last row I don't have any recent value for flag 1 so it is updated with
>> null.
>>
>> Looking for a solution using scala. Any help would be appreciated.Thanks
>>
>> Thanks
>> Milin
>>
>
>


Re: How to get recent value in spark dataframe

2016-12-16 Thread Michael Armbrust
Are you looking for argmax? Here is an example

.

On Wed, Dec 14, 2016 at 8:49 PM, Milin korath 
wrote:

> Hi
>
> I have a spark data frame with following structure
>
>  id  flag price date
>   a   0100  2015
>   a   050   2015
>   a   1200  2014
>   a   1300  2013
>   a   0400  2012
>
> I need to create a data frame with recent value of flag 1 and updated in
> the flag 0 rows.
>
>   id  flag price date new_column
>   a   0100  2015200
>   a   050   2015200
>   a   1200  2014null
>   a   1300  2013null
>   a   0400  2012null
>
> We have 2 rows having flag=0. Consider the first row(flag=0),I will have 2
> values(200 and 300) and I am taking the recent one 200(2014). And the last
> row I don't have any recent value for flag 1 so it is updated with null.
>
> Looking for a solution using scala. Any help would be appreciated.Thanks
>
> Thanks
> Milin
>


[jira] [Commented] (SPARK-5632) not able to resolve dot('.') in field name

2016-12-15 Thread Michael Armbrust (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-5632?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15752929#comment-15752929
 ] 

Michael Armbrust commented on SPARK-5632:
-

Hmm, I agree that error is confusing.  It does work if you use 
[backticks|https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/1023043053387187/2398463439880245/2840265927289860/latest.html]
 (at least with 2.1).

I think this falls into the general class of issues where we don't have 
consistent handling of strings that reference columns.  I'm going to link this 
ticket to [SPARK-18084] (which i've also targeted for investigation in the 2.2 
release).

> not able to resolve dot('.') in field name
> --
>
> Key: SPARK-5632
> URL: https://issues.apache.org/jira/browse/SPARK-5632
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Affects Versions: 1.2.0, 1.3.0
> Environment: Spark cluster: EC2 m1.small + Spark 1.2.0
> Cassandra cluster: EC2 m3.xlarge + Cassandra 2.1.2
>Reporter: Lishu Liu
>Assignee: Wenchen Fan
>Priority: Blocker
> Fix For: 1.4.0
>
>
> My cassandra table task_trace has a field sm.result which contains dot in the 
> name. So SQL tried to look up sm instead of full name 'sm.result'. 
> Here is my code: 
> {code}
> scala> import org.apache.spark.sql.cassandra.CassandraSQLContext
> scala> val cc = new CassandraSQLContext(sc)
> scala> val task_trace = cc.jsonFile("/task_trace.json")
> scala> task_trace.registerTempTable("task_trace")
> scala> cc.setKeyspace("cerberus_data_v4")
> scala> val res = cc.sql("SELECT received_datetime, task_body.cerberus_id, 
> task_body.sm.result FROM task_trace WHERE task_id = 
> 'fff7304e-9984-4b45-b10c-0423a96745ce'")
> res: org.apache.spark.sql.SchemaRDD = 
> SchemaRDD[57] at RDD at SchemaRDD.scala:108
> == Query Plan ==
> == Physical Plan ==
> java.lang.RuntimeException: No such struct field sm in cerberus_batch_id, 
> cerberus_id, couponId, coupon_code, created, description, domain, expires, 
> message_id, neverShowAfter, neverShowBefore, offerTitle, screenshots, 
> sm.result, sm.task, startDate, task_id, url, uuid, validationDateTime, 
> validity
> {code}
> The full schema look like this:
> {code}
> scala> task_trace.printSchema()
> root
>  \|-- received_datetime: long (nullable = true)
>  \|-- task_body: struct (nullable = true)
>  \|\|-- cerberus_batch_id: string (nullable = true)
>  \|\|-- cerberus_id: string (nullable = true)
>  \|\|-- couponId: integer (nullable = true)
>  \|\|-- coupon_code: string (nullable = true)
>  \|\|-- created: string (nullable = true)
>  \|\|-- description: string (nullable = true)
>  \|\|-- domain: string (nullable = true)
>  \|\|-- expires: string (nullable = true)
>  \|\|-- message_id: string (nullable = true)
>  \|\|-- neverShowAfter: string (nullable = true)
>  \|\|-- neverShowBefore: string (nullable = true)
>  \|\|-- offerTitle: string (nullable = true)
>  \|\|-- screenshots: array (nullable = true)
>  \|\|\|-- element: string (containsNull = false)
>  \|\|-- sm.result: struct (nullable = true)
>  \|\|\|-- cerberus_batch_id: string (nullable = true)
>  \|\|\|-- cerberus_id: string (nullable = true)
>  \|\|\|-- code: string (nullable = true)
>  \|\|\|-- couponId: integer (nullable = true)
>  \|\|\|-- created: string (nullable = true)
>  \|\|\|-- description: string (nullable = true)
>  \|\|\|-- domain: string (nullable = true)
>  \|\|\|-- expires: string (nullable = true)
>  \|\|\|-- message_id: string (nullable = true)
>  \|\|\|-- neverShowAfter: string (nullable = true)
>  \|\|\|-- neverShowBefore: string (nullable = true)
>  \|\|\|-- offerTitle: string (nullable = true)
>  \|\|\|-- result: struct (nullable = true)
>  \|\|\|\|-- post: struct (nullable = true)
>  \|\|\|\|\|-- alchemy_out_of_stock: struct (nullable = true)
>  \|\|\|\|\|\|-- ci: double (nullable = true)
>  \|\|\|\|\|\|-- value: boolean (nullable = true)
>  \|\|\|\|\|-- meta: struct (nullable = true)
>  \|\|\|\|\|\|-- None_tx_value: array (nullable = true)
>  \|\|\|\|\|\|\|-- element: string (containsNull = 
> false)
>  \|\|\|\|\|\|-- ex

[jira] [Updated] (SPARK-18084) write.partitionBy() does not recognize nested columns that select() can access

2016-12-15 Thread Michael Armbrust (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-18084?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Michael Armbrust updated SPARK-18084:
-
Target Version/s: 2.2.0

> write.partitionBy() does not recognize nested columns that select() can access
> --
>
> Key: SPARK-18084
> URL: https://issues.apache.org/jira/browse/SPARK-18084
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.0.0, 2.0.1
>Reporter: Nicholas Chammas
>Priority: Minor
>
> Here's a simple repro in the PySpark shell:
> {code}
> from pyspark.sql import Row
> rdd = spark.sparkContext.parallelize([Row(a=Row(b=5))])
> df = spark.createDataFrame(rdd)
> df.printSchema()
> df.select('a.b').show()  # works
> df.write.partitionBy('a.b').text('/tmp/test')  # doesn't work
> {code}
> Here's what I see when I run this:
> {code}
> >>> from pyspark.sql import Row
> >>> rdd = spark.sparkContext.parallelize([Row(a=Row(b=5))])
> >>> df = spark.createDataFrame(rdd)
> >>> df.printSchema()
> root
>  |-- a: struct (nullable = true)
>  ||-- b: long (nullable = true)
> >>> df.show()
> +---+
> |  a|
> +---+
> |[5]|
> +---+
> >>> df.select('a.b').show()
> +---+
> |  b|
> +---+
> |  5|
> +---+
> >>> df.write.partitionBy('a.b').text('/tmp/test')
> Traceback (most recent call last):
>   File 
> "/usr/local/Cellar/apache-spark/2.0.1/libexec/python/pyspark/sql/utils.py", 
> line 63, in deco
> return f(*a, **kw)
>   File 
> "/usr/local/Cellar/apache-spark/2.0.1/libexec/python/lib/py4j-0.10.3-src.zip/py4j/protocol.py",
>  line 319, in get_return_value
> py4j.protocol.Py4JJavaError: An error occurred while calling o233.text.
> : org.apache.spark.sql.AnalysisException: Partition column a.b not found in 
> schema 
> StructType(StructField(a,StructType(StructField(b,LongType,true)),true));
>   at 
> org.apache.spark.sql.execution.datasources.PartitioningUtils$$anonfun$partitionColumnsSchema$1$$anonfun$apply$10.apply(PartitioningUtils.scala:368)
>   at 
> org.apache.spark.sql.execution.datasources.PartitioningUtils$$anonfun$partitionColumnsSchema$1$$anonfun$apply$10.apply(PartitioningUtils.scala:368)
>   at scala.Option.getOrElse(Option.scala:121)
>   at 
> org.apache.spark.sql.execution.datasources.PartitioningUtils$$anonfun$partitionColumnsSchema$1.apply(PartitioningUtils.scala:367)
>   at 
> org.apache.spark.sql.execution.datasources.PartitioningUtils$$anonfun$partitionColumnsSchema$1.apply(PartitioningUtils.scala:366)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at scala.collection.Iterator$class.foreach(Iterator.scala:893)
>   at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
>   at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>   at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>   at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>   at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>   at 
> org.apache.spark.sql.execution.datasources.PartitioningUtils$.partitionColumnsSchema(PartitioningUtils.scala:366)
>   at 
> org.apache.spark.sql.execution.datasources.PartitioningUtils$.validatePartitionColumn(PartitioningUtils.scala:349)
>   at 
> org.apache.spark.sql.execution.datasources.DataSource.write(DataSource.scala:458)
>   at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:211)
>   at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:194)
>   at org.apache.spark.sql.DataFrameWriter.text(DataFrameWriter.scala:534)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:237)
>   at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
>   at py4j.Gateway.invoke(Gateway.java:280)
>   at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
>   at py4j.commands.CallCommand.execute(Ca

Re: Dataset encoders for further types?

2016-12-15 Thread Michael Armbrust
I would have sworn there was a ticket, but I can't find it.  So here you
go: https://issues.apache.org/jira/browse/SPARK-18891

A work around until that is fixed would be for you to manually specify the kryo
encoder

.

On Thu, Dec 15, 2016 at 8:18 AM, Jakub Dubovsky <
spark.dubovsky.ja...@gmail.com> wrote:

> Hey,
>
> I want to ask whether there is any roadmap/plan for adding Encoders for
> further types in next releases of Spark. Here is a list
>  of
> currently supported types. We would like to use Datasets with our
> internally defined case classes containing scala.collection.immutable.List(s).
> This does not work now because these lists are converted to ArrayType
> (Seq). This then fails a constructor lookup because of seq-is-not-a-list
> error...
>
> This means that for now we are stuck with using RDDs.
>
> Thanks for any insights!
>
> Jakub Dubovsky
>
>


[jira] [Created] (SPARK-18891) Support for specific collection types

2016-12-15 Thread Michael Armbrust (JIRA)
Michael Armbrust created SPARK-18891:


 Summary: Support for specific collection types
 Key: SPARK-18891
 URL: https://issues.apache.org/jira/browse/SPARK-18891
 Project: Spark
  Issue Type: Improvement
  Components: SQL
Affects Versions: 1.6.3, 2.1.0
Reporter: Michael Armbrust
Priority: Critical


Encoders treat all collections the same (i.e. {{Seq}} vs {{List}}) which force 
users to only define classes with the most generic type.

An [example 
error|https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/1023043053387187/2398463439880241/2840265927289860/latest.html]:
{code}
case class SpecificCollection(aList: List[Int])
Seq(SpecificCollection(1 :: Nil)).toDS().collect()
{code}

{code}
java.lang.RuntimeException: Error while decoding: 
java.util.concurrent.ExecutionException: java.lang.Exception: failed to 
compile: org.codehaus.commons.compiler.CompileException: File 'generated.java', 
Line 98, Column 120: No applicable constructor/method found for actual 
parameters "scala.collection.Seq"; candidates are: 
"line29e7e4b1e36445baa3505b2e102aa86b29.$read$$iw$$iw$$iw$$iw$SpecificCollection(scala.collection.immutable.List)"
{code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-5632) not able to resolve dot('.') in field name

2016-12-15 Thread Michael Armbrust (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-5632?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15752771#comment-15752771
 ] 

Michael Armbrust commented on SPARK-5632:
-

If you expand the commit you'll see its included in many tags.  The "fix 
version" here is 1.4, which means it was released with 1.4 and all subsequent 
versions.

> not able to resolve dot('.') in field name
> --
>
> Key: SPARK-5632
> URL: https://issues.apache.org/jira/browse/SPARK-5632
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Affects Versions: 1.2.0, 1.3.0
> Environment: Spark cluster: EC2 m1.small + Spark 1.2.0
> Cassandra cluster: EC2 m3.xlarge + Cassandra 2.1.2
>Reporter: Lishu Liu
>Assignee: Wenchen Fan
>Priority: Blocker
> Fix For: 1.4.0
>
>
> My cassandra table task_trace has a field sm.result which contains dot in the 
> name. So SQL tried to look up sm instead of full name 'sm.result'. 
> Here is my code: 
> {code}
> scala> import org.apache.spark.sql.cassandra.CassandraSQLContext
> scala> val cc = new CassandraSQLContext(sc)
> scala> val task_trace = cc.jsonFile("/task_trace.json")
> scala> task_trace.registerTempTable("task_trace")
> scala> cc.setKeyspace("cerberus_data_v4")
> scala> val res = cc.sql("SELECT received_datetime, task_body.cerberus_id, 
> task_body.sm.result FROM task_trace WHERE task_id = 
> 'fff7304e-9984-4b45-b10c-0423a96745ce'")
> res: org.apache.spark.sql.SchemaRDD = 
> SchemaRDD[57] at RDD at SchemaRDD.scala:108
> == Query Plan ==
> == Physical Plan ==
> java.lang.RuntimeException: No such struct field sm in cerberus_batch_id, 
> cerberus_id, couponId, coupon_code, created, description, domain, expires, 
> message_id, neverShowAfter, neverShowBefore, offerTitle, screenshots, 
> sm.result, sm.task, startDate, task_id, url, uuid, validationDateTime, 
> validity
> {code}
> The full schema look like this:
> {code}
> scala> task_trace.printSchema()
> root
>  \|-- received_datetime: long (nullable = true)
>  \|-- task_body: struct (nullable = true)
>  \|\|-- cerberus_batch_id: string (nullable = true)
>  \|\|-- cerberus_id: string (nullable = true)
>  \|\|-- couponId: integer (nullable = true)
>  \|\|-- coupon_code: string (nullable = true)
>  \|\|-- created: string (nullable = true)
>  \|\|-- description: string (nullable = true)
>  \|\|-- domain: string (nullable = true)
>  \|\|-- expires: string (nullable = true)
>  \|\|-- message_id: string (nullable = true)
>  \|\|-- neverShowAfter: string (nullable = true)
>  \|\|-- neverShowBefore: string (nullable = true)
>  \|\|-- offerTitle: string (nullable = true)
>  \|\|-- screenshots: array (nullable = true)
>  \|\|\|-- element: string (containsNull = false)
>  \|\|-- sm.result: struct (nullable = true)
>  \|\|\|-- cerberus_batch_id: string (nullable = true)
>  \|\|\|-- cerberus_id: string (nullable = true)
>  \|\|\|-- code: string (nullable = true)
>  \|\|\|-- couponId: integer (nullable = true)
>  \|\|\|-- created: string (nullable = true)
>  \|\|\|-- description: string (nullable = true)
>  \|\|\|-- domain: string (nullable = true)
>  \|\|\|-- expires: string (nullable = true)
>  \|\|\|-- message_id: string (nullable = true)
>  \|\|\|-- neverShowAfter: string (nullable = true)
>  \|\|\|-- neverShowBefore: string (nullable = true)
>  \|\|\|-- offerTitle: string (nullable = true)
>  \|\|\|-- result: struct (nullable = true)
>  \|\|\|\|-- post: struct (nullable = true)
>  \|\|\|\|\|-- alchemy_out_of_stock: struct (nullable = true)
>  \|\|\|\|\|\|-- ci: double (nullable = true)
>  \|\|\|\|\|\|-- value: boolean (nullable = true)
>  \|\|\|\|\|-- meta: struct (nullable = true)
>  \|\|\|\|\|\|-- None_tx_value: array (nullable = true)
>  \|\|\|\|\|\|\|-- element: string (containsNull = 
> false)
>  \|\|\|\|\|\|-- exceptions: array (nullable = true)
>  \|\|\|\|\|\|\|-- element: string (containsNull = 
> false)
>  \|\|\|\|\|\|-- no_input_value: array (nullable = true)
>  \|\|\|\|\|\|\|-- element: string (containsNull = 
> false)
>  \|\|\|\|\|

[jira] [Resolved] (SPARK-12777) Dataset fields can't be Scala tuples

2016-12-15 Thread Michael Armbrust (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-12777?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Michael Armbrust resolved SPARK-12777.
--
   Resolution: Fixed
Fix Version/s: 2.1.0

This works in 2.1:

https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/1023043053387187/408017793305293/2840265927289860/latest.html

> Dataset fields can't be Scala tuples
> 
>
> Key: SPARK-12777
> URL: https://issues.apache.org/jira/browse/SPARK-12777
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 1.6.0, 1.6.1, 2.0.0
>Reporter: Chris Jansen
> Fix For: 2.1.0
>
>
> Datasets can't seem to handle scala tuples as fields of case classes in 
> datasets.
> {code}
> Seq((1,2), (3,4)).toDS().show() //works
> {code}
> When including a tuple as a field, the code fails:
> {code}
> case class Test(v: (Int, Int))
> Seq(Test((1,2)), Test((3,4)).toDS().show //fails
> {code}
> {code}
>   UnresolvedException: : Invalid call to dataType on unresolved object, tree: 
> 'name  (unresolved.scala:59)
>  
> org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute.dataType(unresolved.scala:59)
>  
> org.apache.spark.sql.catalyst.expressions.GetStructField.org$apache$spark$sql$catalyst$expressions$GetStructField$$field$lzycompute(complexTypeExtractors.scala:107)
>  
> org.apache.spark.sql.catalyst.expressions.GetStructField.org$apache$spark$sql$catalyst$expressions$GetStructField$$field(complexTypeExtractors.scala:107)
>  
> org.apache.spark.sql.catalyst.expressions.GetStructField$$anonfun$toString$1.apply(complexTypeExtractors.scala:111)
>  
> org.apache.spark.sql.catalyst.expressions.GetStructField$$anonfun$toString$1.apply(complexTypeExtractors.scala:111)
>  
> org.apache.spark.sql.catalyst.expressions.GetStructField.toString(complexTypeExtractors.scala:111)
>  
> org.apache.spark.sql.catalyst.expressions.Expression.toString(Expression.scala:217)
>  
> org.apache.spark.sql.catalyst.expressions.Expression.toString(Expression.scala:217)
>  
> org.apache.spark.sql.catalyst.expressions.If.toString(conditionalExpressions.scala:76)
>  
> org.apache.spark.sql.catalyst.expressions.Expression.toString(Expression.scala:217)
>  
> org.apache.spark.sql.catalyst.expressions.Alias.toString(namedExpressions.scala:155)
>  
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$argString$1.apply(TreeNode.scala:385)
>  
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$argString$1.apply(TreeNode.scala:381)
>  org.apache.spark.sql.catalyst.trees.TreeNode.argString(TreeNode.scala:388)
>  org.apache.spark.sql.catalyst.trees.TreeNode.simpleString(TreeNode.scala:391)
>  
> org.apache.spark.sql.catalyst.plans.QueryPlan.simpleString(QueryPlan.scala:172)
>  
> org.apache.spark.sql.catalyst.trees.TreeNode.generateTreeString(TreeNode.scala:441)
>  org.apache.spark.sql.catalyst.trees.TreeNode.treeString(TreeNode.scala:396)
>  
> org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$5.apply(RuleExecutor.scala:118)
>  
> org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$5.apply(RuleExecutor.scala:119)
>  org.apache.spark.Logging$class.logDebug(Logging.scala:62)
>  
> org.apache.spark.sql.catalyst.rules.RuleExecutor.logDebug(RuleExecutor.scala:44)
>  
> org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:115)
>  
> org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:72)
>  
> org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:72)
>  
> org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.resolve(ExpressionEncoder.scala:253)
>  org.apache.spark.sql.Dataset.(Dataset.scala:78)
>  org.apache.spark.sql.Dataset.(Dataset.scala:89)
>  org.apache.spark.sql.SQLContext.createDataset(SQLContext.scala:507)
>  
> org.apache.spark.sql.SQLImplicits.localSeqToDatasetHolder(SQLImplicits.scala:80)
> {code}
> When providing a type alias, the code fails in a different way:
> {code}
> type TwoInt = (Int, Int)
> case class Test(v: TwoInt)
> Seq(Test((1,2)), Test((3,4)).toDS().show //fails
> {code}
> {code}
>   NoSuchElementException: : head of empty list  (ScalaReflection.scala:504)
>  
> org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$extractorFor$1.apply(ScalaReflection.scala:504)
>  
> org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$extractorFor$1.apply(Scal

Re: When will multiple aggregations be supported in Structured Streaming?

2016-12-15 Thread Michael Armbrust
What is your use case?

On Thu, Dec 15, 2016 at 10:43 AM, ljwagerfield 
wrote:

> The current version of Spark (2.0.2) only supports one aggregation per
> structured stream (and will throw an exception if multiple aggregations are
> applied).
>
> Roughly when will Spark support multiple aggregations?
>
>
>
> --
> View this message in context: http://apache-spark-user-list.
> 1001560.n3.nabble.com/When-will-multiple-aggregations-be-
> supported-in-Structured-Streaming-tp28219.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: [DataFrames] map function - 2.0

2016-12-15 Thread Michael Armbrust
Experimental in Spark really just means that we are not promising binary
compatibly for those functions in the 2.x release line.  For Datasets in
particular, we want a few releases to make sure the APIs don't have any
major gaps before removing the experimental tag.

On Thu, Dec 15, 2016 at 1:17 PM, Ninad Shringarpure 
wrote:

> Hi Team,
>
> When going through Dataset class for Spark 2.0 it comes across that both
> overloaded map functions with encoder and without are marked as
> experimental.
>
> Is there a reason and issues that developers whould be aware of when using
> this for production applications. Also is there a "non-experimental" way of
> using map function on Dataframe in Spark 2.0
>
> Thanks,
> Ninad
>


Re: Cached Tables SQL Performance Worse than Uncached

2016-12-15 Thread Michael Armbrust
Its hard to comment on performance without seeing query plans.  I'd suggest
posting the result of an explain.

On Thu, Dec 15, 2016 at 2:14 PM, Warren Kim 
wrote:

> Playing with TPC-H and comparing performance between cached (serialized
> in-memory tables) and uncached (DF from parquet) results in various
> SQL queries performing much worse, duration-wise.
>
>
> I see some physical plans have an extra layer of shuffle/sort/merge under
> cached scenario.
>
>
> I could do some filtering by key to optimize, but I'm just curious as to
> why out-of-the-box planning is more complex and slower when tables are
> cached to mem.
>
>
> Thanks!
>


Re: Expand the Spark SQL programming guide?

2016-12-15 Thread Michael Armbrust
Pull requests would be welcome for any major missing features in the guide:
https://github.com/apache/spark/blob/master/docs/sql-programming-guide.md

On Thu, Dec 15, 2016 at 11:48 AM, Jim Hughes  wrote:

> Hi Anton,
>
> I'd like to see this as well.  I've been working on implementing
> geospatial user-defined types and functions.  Having examples of
> aggregations and window functions would be awesome!
>
> I did test out implementing a distributed convex hull as a
> UserDefinedAggregateFunction, and that seemed to work sensibly.
>
> Cheers,
>
> Jim
>
>
> On 12/15/2016 03:28 AM, Anton Okolnychyi wrote:
>
> Hi,
>
> I am wondering whether it makes sense to expand the Spark SQL programming
> guide with examples of aggregations (including user-defined via the
> Aggregator API) and window functions.  For instance, there might be a
> separate subsection under "Getting Started" for each functionality.
>
> SPARK-16046 seems to be related but there is no activity for more than 4
> months.
>
> Best regards,
> Anton
>
>
>


[jira] [Commented] (SPARK-17890) scala.ScalaReflectionException

2016-12-14 Thread Michael Armbrust (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17890?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15749860#comment-15749860
 ] 

Michael Armbrust commented on SPARK-17890:
--

If I had to guess, I would guess that [this 
line|https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala#L48]
 should be using [this 
mirror|https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala#L45]

> scala.ScalaReflectionException
> --
>
> Key: SPARK-17890
> URL: https://issues.apache.org/jira/browse/SPARK-17890
> Project: Spark
>  Issue Type: Bug
>Affects Versions: 2.0.1
> Environment: x86_64 GNU/Linux
> Java(TM) SE Runtime Environment (build 1.8.0_60-b27)
>Reporter: Khalid Reid
>Priority: Minor
>  Labels: newbie
>
> Hello,
> I am seeing an error message in spark-shell when I map a DataFrame to a 
> Seq\[Foo\].  However, things work fine when I use flatMap.  
> {noformat}
> scala> case class Foo(value:String)
> defined class Foo
> scala> val df = sc.parallelize(List(1,2,3)).toDF
> df: org.apache.spark.sql.DataFrame = [value: int]
> scala> df.map{x => Seq.empty[Foo]}
> scala.ScalaReflectionException: object $line14.$read not found.
>   at scala.reflect.internal.Mirrors$RootsBase.staticModule(Mirrors.scala:162)
>   at scala.reflect.internal.Mirrors$RootsBase.staticModule(Mirrors.scala:22)
>   at $typecreator1$1.apply(:29)
>   at 
> scala.reflect.api.TypeTags$WeakTypeTagImpl.tpe$lzycompute(TypeTags.scala:232)
>   at scala.reflect.api.TypeTags$WeakTypeTagImpl.tpe(TypeTags.scala:232)
>   at 
> org.apache.spark.sql.SQLImplicits$$typecreator9$1.apply(SQLImplicits.scala:125)
>   at 
> scala.reflect.api.TypeTags$WeakTypeTagImpl.tpe$lzycompute(TypeTags.scala:232)
>   at scala.reflect.api.TypeTags$WeakTypeTagImpl.tpe(TypeTags.scala:232)
>   at 
> org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$.apply(ExpressionEncoder.scala:49)
>   at 
> org.apache.spark.sql.SQLImplicits.newProductSeqEncoder(SQLImplicits.scala:125)
>   ... 48 elided
> scala> df.flatMap{_ => Seq.empty[Foo]} //flatMap works
> res2: org.apache.spark.sql.Dataset[Foo] = [value: string]
> {noformat}
> I am seeing the same error reported 
> [here|https://issues.apache.org/jira/browse/SPARK-8465?jql=text%20~%20%22scala.ScalaReflectionException%22]
>  when I use spark-submit.
> I am new to Spark but I don't expect this to throw an exception.
> Thanks.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



Re: [Spark-SQL] collect_list() support for nested collection

2016-12-13 Thread Michael Armbrust
Yes

https://databricks-prod-cloudfront.cloud.databricks.com/public/
4027ec902e239c93eaaa8714f173bcfc/1023043053387187/4464261896877850/
2840265927289860/latest.html

On Tue, Dec 13, 2016 at 10:43 AM, Ninad Shringarpure 
wrote:

>
> Hi Team,
>
> Does Spark 2.0 support non-primitive types in collect_list for inserting
> nested collections?
> Would appreciate any references or samples.
>
> Thanks,
> Ninad
>
>
>


[jira] [Updated] (SPARK-17689) _temporary files breaks the Spark SQL streaming job.

2016-12-08 Thread Michael Armbrust (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-17689?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Michael Armbrust updated SPARK-17689:
-
Target Version/s: 2.2.0
 Description: 
Steps to reproduce:

1) Start a streaming job which reads from HDFS location hdfs://xyz/*
2) Write content to hdfs://xyz/a
.
.
repeat a few times.

And then job breaks as follows.


org.apache.spark.SparkException: Job aborted due to stage failure: Task 49 in 
stage 304.0 failed 1 times, most recent failure: Lost task 49.0 in stage 304.0 
(TID 14794, localhost): java.io.FileNotFoundException: File does not exist: 
hdfs://localhost:9000/input/t5/_temporary
at 
org.apache.hadoop.hdfs.DistributedFileSystem$22.doCall(DistributedFileSystem.java:1309)
at 
org.apache.hadoop.hdfs.DistributedFileSystem$22.doCall(DistributedFileSystem.java:1301)
at 
org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
at 
org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1317)
at 
org.apache.spark.sql.execution.datasources.HadoopFsRelation$$anonfun$7$$anonfun$apply$4.apply(fileSourceInterfaces.scala:464)
at 
org.apache.spark.sql.execution.datasources.HadoopFsRelation$$anonfun$7$$anonfun$apply$4.apply(fileSourceInterfaces.scala:462)
at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
at 
scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
at 
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
at 
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
at scala.collection.AbstractIterator.to(Iterator.scala:1336)
at 
scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1336)
at 
scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1336)
at 
org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:912)
at 
org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:912)
at 
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1919)
at 
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1919)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
at org.apache.spark.scheduler.Task.run(Task.scala:86)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)


  was:

Steps to reproduce:

1) Start a streaming job which reads from HDFS location hdfs://xyz/*
2) Write content to hdfs://xyz/a
.
.
repeat a few times.

And then job breaks as follows.


org.apache.spark.SparkException: Job aborted due to stage failure: Task 49 in 
stage 304.0 failed 1 times, most recent failure: Lost task 49.0 in stage 304.0 
(TID 14794, localhost): java.io.FileNotFoundException: File does not exist: 
hdfs://localhost:9000/input/t5/_temporary
at 
org.apache.hadoop.hdfs.DistributedFileSystem$22.doCall(DistributedFileSystem.java:1309)
at 
org.apache.hadoop.hdfs.DistributedFileSystem$22.doCall(DistributedFileSystem.java:1301)
at 
org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
at 
org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1317)
at 
org.apache.spark.sql.execution.datasources.HadoopFsRelation$$anonfun$7$$anonfun$apply$4.apply(fileSourceInterfaces.scala:464)
at 
org.apache.spark.sql.execution.datasources.HadoopFsRelation$$anonfun$7$$anonfun$apply$4.apply(fileSourceInterfaces.scala:462)
at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
at 
scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
at 
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
at

[jira] [Updated] (SPARK-18272) Test topic addition for subscribePattern on Kafka DStream and Structured Stream

2016-12-08 Thread Michael Armbrust (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-18272?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Michael Armbrust updated SPARK-18272:
-
Issue Type: Test  (was: Bug)

> Test topic addition for subscribePattern on Kafka DStream and Structured 
> Stream
> ---
>
> Key: SPARK-18272
> URL: https://issues.apache.org/jira/browse/SPARK-18272
> Project: Spark
>  Issue Type: Test
>  Components: DStreams, Structured Streaming
>Reporter: Cody Koeninger
>
> We've had reports of the following sequence
> - create subscribePattern stream that doesn't match any existing topics at 
> the time stream starts
> - add a topic that matches pattern
> - expect that messages from that topic show up, but they don't
> We don't seem to actually have tests that cover this case, so we should add 
> them



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-18790) Keep a general offset history of stream batches

2016-12-08 Thread Michael Armbrust (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-18790?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Michael Armbrust updated SPARK-18790:
-
Target Version/s: 2.1.0

> Keep a general offset history of stream batches
> ---
>
> Key: SPARK-18790
> URL: https://issues.apache.org/jira/browse/SPARK-18790
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>Reporter: Tyson Condie
>
> Instead of only keeping the minimum number of offsets around, we should keep 
> enough information to allow us to roll back n batches and reexecute the 
> stream starting from a given point. In particular, we should create a config 
> in SQLConf, spark.sql.streaming.retainedBatches that defaults to 100 and 
> ensure that we keep enough log files in the following places to roll back the 
> specified number of batches:
> the offsets that are present in each batch
> versions of the state store
> the files lists stored for the FileStreamSource
> the metadata log stored by the FileStreamSink



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-18796) StreamingQueryManager should not hold a lock when starting a query

2016-12-08 Thread Michael Armbrust (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-18796?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Michael Armbrust updated SPARK-18796:
-
Target Version/s: 2.1.0

> StreamingQueryManager should not hold a lock when starting a query
> --
>
> Key: SPARK-18796
> URL: https://issues.apache.org/jira/browse/SPARK-18796
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.0.2, 2.1.0
>Reporter: Shixiong Zhu
>
> Otherwise, the user cannot start any queries when a query is starting. If a 
> query takes a long time to start, the user experience will be pretty bad.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



Re: When will Structured Streaming support stream-to-stream joins?

2016-12-08 Thread Michael Armbrust
I would guess Spark 2.3, but maybe sooner maybe later depending on demand.
I created https://issues.apache.org/jira/browse/SPARK-18791 so people can
describe their requirements / stay informed.

On Thu, Dec 8, 2016 at 11:16 AM, ljwagerfield 
wrote:

> Hi there,
>
> Structured Streaming currently only supports stream-to-batch joins.
>
> Is there an ETA for stream-to-stream joins?
>
> Kindest regards (and keep up the awesome work!),
> Lawrence
>
> (p.s. I've traversed the JIRA roadmaps but couldn't see anything)
>
>
>
> --
> View this message in context: http://apache-spark-user-list.
> 1001560.n3.nabble.com/When-will-Structured-Streaming-
> support-stream-to-stream-joins-tp28185.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


[jira] [Created] (SPARK-18791) Stream-Stream Joins

2016-12-08 Thread Michael Armbrust (JIRA)
Michael Armbrust created SPARK-18791:


 Summary: Stream-Stream Joins
 Key: SPARK-18791
 URL: https://issues.apache.org/jira/browse/SPARK-18791
 Project: Spark
  Issue Type: New Feature
  Components: Structured Streaming
Reporter: Michael Armbrust


Just a placeholder for now.  Please comment with your requirements.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



Re: few basic questions on structured streaming

2016-12-08 Thread Michael Armbrust
>
> 1. what happens if an event arrives few days late? Looks like we have an
> unbound table with sorted time intervals as keys but I assume spark doesn't
> keep several days worth of data in memory but rather it would checkpoint
> parts of the unbound table to a storage at a specified interval such that
> if an event comes few days late it would update the part of the table that
> is in memory plus the parts of the table that are in storage which contains
> the interval (Again this is just my assumption, I don't know what it really
> does). is this correct so far?
>

The state we need to keep will be unbounded, unless you specify a
watermark.  This watermark tells us how long to wait for late data to
arrive and thus allows us to bound the amount of state that we keep in
memory.  Since we purge state for aggregations that are below the
watermark, we must also drop data that arrives even later than your
specified watermark (if any).  Note that the watermark is calculated based
on observed data, not on the actual time of processing.  So we should be
robust to cases where the stream is down for extended periods of time.


> 2.  Say I am running a Spark Structured streaming Job for 90 days with a
> window interval of 10 mins and a slide interval of 5 mins. Does the output
> of this Job always return the entire history in a table? other words the
> does the output on 90th day contains a table of 10 minute time intervals
> from day 1 to day 90? If so, wouldn't that be too big to return as an
> output?
>

This depends on the output mode.  In complete mode, we output the entire
result every time (thus, complete mode probably doesn't make sense for this
use case).  In update mode
, we will output
continually updated estimates of the final answer as the stream progresses
(useful if you are for example updating a database).  In append mode
(supported in 2.1) we only output finalized aggregations that have fallen
beneath the watermark.

Relatedly, SPARK-16738
 talks
about making the distributed state store queryable.  With this feature, you
could run your query in complete mode (given enough machines).  Even though
the results are large, you can still interact with the complete results of
the aggregation as a distributed DataFrame.


> 3. For Structured Streaming is it required to have a distributed storage
> such as HDFS? my guess would be yes (based on what I said in #1) but I
> would like to confirm.
>

Currently this is the only place that we can write the offset log (records
what data is in each batch) and the state checkpoints.  I think its likely
that we'll add support for other storage systems here in the future.


> 4. I briefly heard about watermarking. Are there any pointers where I can
> know them more in detail? Specifically how watermarks could help in
> structured streaming and so on.
>

Here's the best docs available: https://github.com/apache/spark/pull/15702

We are working on something for the programming guide / a blog post in the
next few weeks.


[jira] [Commented] (SPARK-17890) scala.ScalaReflectionException

2016-12-07 Thread Michael Armbrust (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17890?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15729905#comment-15729905
 ] 

Michael Armbrust commented on SPARK-17890:
--

Can you reproduce this with 2.1?  If so, I think we are just using the wrong 
classloader to instantiate the mirror.

> scala.ScalaReflectionException
> --
>
> Key: SPARK-17890
> URL: https://issues.apache.org/jira/browse/SPARK-17890
> Project: Spark
>  Issue Type: Bug
>Affects Versions: 2.0.1
> Environment: x86_64 GNU/Linux
> Java(TM) SE Runtime Environment (build 1.8.0_60-b27)
>Reporter: Khalid Reid
>Priority: Minor
>  Labels: newbie
>
> Hello,
> I am seeing an error message in spark-shell when I map a DataFrame to a 
> Seq\[Foo\].  However, things work fine when I use flatMap.  
> {noformat}
> scala> case class Foo(value:String)
> defined class Foo
> scala> val df = sc.parallelize(List(1,2,3)).toDF
> df: org.apache.spark.sql.DataFrame = [value: int]
> scala> df.map{x => Seq.empty[Foo]}
> scala.ScalaReflectionException: object $line14.$read not found.
>   at scala.reflect.internal.Mirrors$RootsBase.staticModule(Mirrors.scala:162)
>   at scala.reflect.internal.Mirrors$RootsBase.staticModule(Mirrors.scala:22)
>   at $typecreator1$1.apply(:29)
>   at 
> scala.reflect.api.TypeTags$WeakTypeTagImpl.tpe$lzycompute(TypeTags.scala:232)
>   at scala.reflect.api.TypeTags$WeakTypeTagImpl.tpe(TypeTags.scala:232)
>   at 
> org.apache.spark.sql.SQLImplicits$$typecreator9$1.apply(SQLImplicits.scala:125)
>   at 
> scala.reflect.api.TypeTags$WeakTypeTagImpl.tpe$lzycompute(TypeTags.scala:232)
>   at scala.reflect.api.TypeTags$WeakTypeTagImpl.tpe(TypeTags.scala:232)
>   at 
> org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$.apply(ExpressionEncoder.scala:49)
>   at 
> org.apache.spark.sql.SQLImplicits.newProductSeqEncoder(SQLImplicits.scala:125)
>   ... 48 elided
> scala> df.flatMap{_ => Seq.empty[Foo]} //flatMap works
> res2: org.apache.spark.sql.Dataset[Foo] = [value: string]
> {noformat}
> I am seeing the same error reported 
> [here|https://issues.apache.org/jira/browse/SPARK-8465?jql=text%20~%20%22scala.ScalaReflectionException%22]
>  when I use spark-submit.
> I am new to Spark but I don't expect this to throw an exception.
> Thanks.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-16902) Custom ExpressionEncoder for primitive array is not effective

2016-12-07 Thread Michael Armbrust (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-16902?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Michael Armbrust resolved SPARK-16902.
--
Resolution: Not A Problem

The encoder that is used is picked by scala's implicit resolution.  Just 
because you construct an encoder doesn't mean that we can find it (which is why 
its not having any affect).

I also think the nullability here is correct.  Even though the elements cannot 
be null (since they are primitives), you can put a null into the value field of 
DataPoint.

> Custom ExpressionEncoder for primitive array is not effective
> -
>
> Key: SPARK-16902
> URL: https://issues.apache.org/jira/browse/SPARK-16902
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Reporter: Kazuaki Ishizaki
>
> Even when a programmer explicitly specifies custom {{ExpressionEncoder}} for 
> primitive array, it does not seem to be used.
> The following test cases cause assertion errors. Since the custom 
> {{ExpressionEncoder}} is used, the {{nullable}} should be {{false}} for 
> {{value: array}}.
> {code:java}
> class Test extends QueryTest with SharedSQLContext {
>   import testImplicits._
>   test("test") {
> val schema = new StructType()
>   .add("array", ArrayType(DoubleType, containsNull = false), false)
> val cls = classOf[Array[Double]]
> val inputObject = BoundReference(0, 
> ScalaReflection.dataTypeFor[Array[Double]], false)
> val serializer = ScalaReflection.serializerFor[Array[Double]](
>   AssertNotNull(inputObject, Seq("non null array")))
> val deserializer = ScalaReflection.deserializerFor[Array[Double]]
> val encoder = new ExpressionEncoder[Array[Double]](
>   schema,
>   true,
>   serializer.flatten,
>   deserializer,
>   ClassTag[Array[Double]](cls)
> )
> val ds1 = sparkContext.parallelize(Seq(Array(1.1, 1.2), Array(2.1, 2.2)), 
> 1).toDS
> ds1.count
> val ds10 = ds1.map(e => e)(encoder)
> ds10.show
> ds10.printSchema
> assert(ds10.schema.fields(0).nullable == false)
> val ds2 = sparkContext.parallelize(
>   Seq(DataPoint(Array(1.1, 1.2), 1.0), DataPoint(Array(2.1, 2.2), 2.0)), 
> 1).toDS
> ds2.count
> val ds20 = ds2.map(p => p.x)(encoder)
> ds20.show
> ds20.printSchema
> assert(ds20.schema.fields(0).nullable == false)
>   }
> }
> {code}
> {code}
> +--+
> | value|
> +--+
> |[1.1, 1.2]|
> |[2.1, 2.2]|
> +--+
> root
>  |-- value: array (nullable = true)
>  ||-- element: double (containsNull = false)
> true did not equal false
> ScalaTestFailureLocation: org.apache.spark.sql.MySuite$$anonfun$1 at 
> (MySuite.scala:489)
> org.scalatest.exceptions.TestFailedException: true did not equal false
>   at 
> org.scalatest.Assertions$class.newAssertionFailedException(Assertions.scala:500)
>   at 
> org.scalatest.FunSuite.newAssertionFailedException(FunSuite.scala:1555)
>   at 
> org.scalatest.Assertions$AssertionsHelper.macroAssert(Assertions.scala:466)
>   at 
> org.apache.spark.sql.MySuite$$anonfun$1.apply$mcV$sp(MySuite.scala:489)
>   at org.apache.spark.sql.MySuite$$anonfun$1.apply(MySuite.scala:39)
>   at org.apache.spark.sql.MySuite$$anonfun$1.apply(MySuite.scala:39)
>   at 
> org.scalatest.Transformer$$anonfun$apply$1.apply$mcV$sp(Transformer.scala:22)
>   at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)
>   at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
>   at org.scalatest.Transformer.apply(Transformer.scala:22)
>   at org.scalatest.Transformer.apply(Transformer.scala:20)
>   at org.scalatest.FunSuiteLike$$anon$1.apply(FunSuiteLike.scala:166)
>   at org.apache.spark.SparkFunSuite.withFixture(SparkFunSuite.scala:57)
>   at 
> org.scalatest.FunSuiteLike$class.invokeWithFixture$1(FunSuiteLike.scala:163)
>   at 
> org.scalatest.FunSuiteLike$$anonfun$runTest$1.apply(FunSuiteLike.scala:175)
>   at 
> org.scalatest.FunSuiteLike$$anonfun$runTest$1.apply(FunSuiteLike.scala:175)
>   at org.scalatest.SuperEngine.runTestImpl(Engine.scala:306)
>   at org.scalatest.FunSuiteLike$class.runTest(FunSuiteLike.scala:175)
>   at org.scalatest.FunSuite.runTest(FunSuite.scala:1555)
>   at 
> org.scalatest.FunSuiteLike$$anonfun$runTests$1.apply(FunSuiteLike.scala:208)
>   at 
> org.scalatest.FunSuiteLike$$anonfun$runTests$1.apply(FunSuiteLike.scala:208)
>   at 
> org.scalatest.SuperEngine

[jira] [Updated] (SPARK-18754) Rename recentProgresses to recentProgress

2016-12-06 Thread Michael Armbrust (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-18754?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Michael Armbrust updated SPARK-18754:
-
Target Version/s: 2.1.0

> Rename recentProgresses to recentProgress
> -
>
> Key: SPARK-18754
> URL: https://issues.apache.org/jira/browse/SPARK-18754
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>    Reporter: Michael Armbrust
>Assignee: Michael Armbrust
>
> An informal poll of a bunch of users found this name to be more clear.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-18754) Rename recentProgresses to recentProgress

2016-12-06 Thread Michael Armbrust (JIRA)
Michael Armbrust created SPARK-18754:


 Summary: Rename recentProgresses to recentProgress
 Key: SPARK-18754
 URL: https://issues.apache.org/jira/browse/SPARK-18754
 Project: Spark
  Issue Type: Improvement
  Components: Structured Streaming
Reporter: Michael Armbrust
Assignee: Michael Armbrust


An informal poll of a bunch of users found this name to be more clear.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Closed] (SPARK-18749) CLONE - checkpointLocation being set in memory streams fail after restart. Should fail fast

2016-12-06 Thread Michael Armbrust (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-18749?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Michael Armbrust closed SPARK-18749.

Resolution: Invalid

> CLONE - checkpointLocation being set in memory streams fail after restart. 
> Should fail fast
> ---
>
> Key: SPARK-18749
> URL: https://issues.apache.org/jira/browse/SPARK-18749
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.0.0, 2.0.1
>Reporter: Burak Yavuz
>
> The checkpointLocation option in memory streams in StructuredStreaming is not 
> used during recovery. However, it can use this location if it is being set. 
> However, during recovery, if this location was set, we get an exception 
> saying that we will not use this location for recovery, please delete it. 
> It's better to just fail before you start the stream in the first place



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-18749) CLONE - checkpointLocation being set in memory streams fail after restart. Should fail fast

2016-12-06 Thread Michael Armbrust (JIRA)
Michael Armbrust created SPARK-18749:


 Summary: CLONE - checkpointLocation being set in memory streams 
fail after restart. Should fail fast
 Key: SPARK-18749
 URL: https://issues.apache.org/jira/browse/SPARK-18749
 Project: Spark
  Issue Type: Bug
  Components: Structured Streaming
Affects Versions: 2.0.0, 2.0.1
Reporter: Burak Yavuz


The checkpointLocation option in memory streams in StructuredStreaming is not 
used during recovery. However, it can use this location if it is being set. 
However, during recovery, if this location was set, we get an exception saying 
that we will not use this location for recovery, please delete it. 

It's better to just fail before you start the stream in the first place



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Closed] (SPARK-17921) checkpointLocation being set in memory streams fail after restart. Should fail fast

2016-12-06 Thread Michael Armbrust (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-17921?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Michael Armbrust closed SPARK-17921.

Resolution: Won't Fix

> checkpointLocation being set in memory streams fail after restart. Should 
> fail fast
> ---
>
> Key: SPARK-17921
> URL: https://issues.apache.org/jira/browse/SPARK-17921
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.0.0, 2.0.1
>Reporter: Burak Yavuz
>
> The checkpointLocation option in memory streams in StructuredStreaming is not 
> used during recovery. However, it can use this location if it is being set. 
> However, during recovery, if this location was set, we get an exception 
> saying that we will not use this location for recovery, please delete it. 
> It's better to just fail before you start the stream in the first place



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



Re: get corrupted rows using columnNameOfCorruptRecord

2016-12-06 Thread Michael Armbrust
.where("xxx IS NOT NULL") will give you the rows that couldn't be parsed.

On Tue, Dec 6, 2016 at 6:31 AM, Yehuda Finkelstein <
yeh...@veracity-group.com> wrote:

> Hi all
>
>
>
> I’m trying to parse json using existing schema and got rows with NULL’s
>
> //get schema
>
> val df_schema = spark.sqlContext.sql("select c1,c2,…cn t1  limit 1")
>
> //read json file
>
> val f = sc.textFile("/tmp/x")
>
> //load json into data frame using schema
>
> var df = spark.sqlContext.read.option("columnNameOfCorruptRecord","
> xxx").option("mode","PERMISSIVE").schema(df_schema.schema).json(f)
>
>
>
> in documentation it say that you can query the corrupted rows by this
> columns à columnNameOfCorruptRecord
>
> o“columnNameOfCorruptRecord (default is the value specified in
> spark.sql.columnNameOfCorruptRecord): allows renaming the new field
> having malformed string created by PERMISSIVE mode. This overrides
> spark.sql.columnNameOfCorruptRecord.”
>
>
>
> The question is how to fetch those corrupted rows ?
>
>
>
>
>
> Thanks
>
> Yehuda
>
>
>
>
>


Re: Writing DataFrame filter results to separate files

2016-12-05 Thread Michael Armbrust
>
> 1. In my case, I'd need to first explode my data by ~12x to assign each
> record to multiple 12-month rolling output windows. I'm not sure Spark SQL
> would be able to optimize this away, combining it with the output writing
> to do it incrementally.
>

You are right, but I wouldn't worry about the RAM use.  If implemented
properly (or if you just use the builtin window

function), it should all be pipelined.


> 2. Wouldn't each partition -- window in my case -- be shuffled to a single
> machine and then written together as one output shard? For a large amount
> of data per window, that seems less than ideal.
>

Oh sorry, I thought you wanted one file per value.  If you drop the
repartition then it won't shuffle, but will just write in parallel on each
machine.


Re: Writing DataFrame filter results to separate files

2016-12-05 Thread Michael Armbrust
If you repartition($"column") and then do .write.partitionBy("column") you
should end up with a single file for each value of the partition column.

On Mon, Dec 5, 2016 at 10:59 AM, Everett Anderson 
wrote:

> Hi,
>
> I have a DataFrame of records with dates, and I'd like to write all
> 12-month (with overlap) windows to separate outputs.
>
> Currently, I have a loop equivalent to:
>
> for ((windowStart, windowEnd) <- windows) {
> val windowData = allData.filter(
> getFilterCriteria(windowStart, windowEnd))
> windowData.write.format(...).save(...)
> }
>
> This works fine, but has the drawback that since Spark doesn't parallelize
> the writes, there is a fairly cost based on the number of windows.
>
> Is there a way around this?
>
> In MapReduce, I'd probably multiply the data in a Mapper with a window ID
> and then maybe use something like MultipleOutputs
> .
> But I'm a bit worried of trying to do this in Spark because of the data
> explosion and RAM use. What's the best approach?
>
> Thanks!
>
> - Everett
>
>


Re: ability to provide custom serializers

2016-12-05 Thread Michael Armbrust
Lets start with a new ticket, link them and we can merge if the solution
ends up working out for both cases.

On Sun, Dec 4, 2016 at 5:39 PM, Erik LaBianca 
wrote:

> Thanks Michael!
>
> On Dec 2, 2016, at 7:29 PM, Michael Armbrust 
> wrote:
>
> I would love to see something like this.  The closest related ticket is
> probably https://issues.apache.org/jira/browse/SPARK-7768 (though maybe
> there are enough people using UDTs in their current form that we should
> just make a new ticket)
>
>
> I’m not very familiar with UDT’s. Is this something I should research or
> just leave it be and create a new ticket? I did notice the presence of a
> registry in the source code but it seemed like it was targeted at a
> different use case.
>
> A few thoughts:
>  - even if you can do implicit search, we probably also want a registry
> for Java users.
>
>
> That’s fine. I’m not 100% sure I can get the right implicit in scope as
> things stand anyway, so let’s table that idea for now and do the registry.
>
>  - what is the output of the serializer going to be? one challenge here is
> that encoders write directly into the tungsten format, which is not a
> stable public API. Maybe this is more obvious if I understood MappedColumnType
> better?
>
>
> My assumption was that the output would be existing scalar data types. So
> string, long, double, etc. What I’d like to do is just “layer” the new ones
> on top already existing ones, kinda like the case case encoder does.
>
> Either way, I'm happy to give further advice if you come up with a more
> concrete proposal and put it on JIRA.
>
>
> Great, let me know and I’ll create a ticket, or we can re-use SPARK-7768
> and we can move the discussion there.
>
> Thanks!
>
> —erik
>
>


Re: ability to provide custom serializers

2016-12-02 Thread Michael Armbrust
I would love to see something like this.  The closest related ticket is
probably https://issues.apache.org/jira/browse/SPARK-7768 (though maybe
there are enough people using UDTs in their current form that we should
just make a new ticket)

A few thoughts:
 - even if you can do implicit search, we probably also want a registry for
Java users.
 - what is the output of the serializer going to be? one challenge here is
that encoders write directly into the tungsten format, which is not a
stable public API. Maybe this is more obvious if I understood MappedColumnType
better?

Either way, I'm happy to give further advice if you come up with a more
concrete proposal and put it on JIRA.

On Fri, Dec 2, 2016 at 4:03 PM, Erik LaBianca 
wrote:

> Hi All,
>
> Apologies in advance for any confusing terminology, I’m still pretty new
> to Spark.
>
> I’ve got a bunch of Scala case class “domain objects” from an existing
> application. Many of them contain simple, but unsupported-by-spark types in
> them, such as case class Foo(timestamp: java.time.Instant). I’d like to be
> able to use these case classes directly in a DataSet, but can’t, since
> there’s no encoder available for java.time.Instant. I’d like to resolve
> that.
>
> I asked around on the gitter channel, and was pointed to the
> ScalaReflections class, which handles creating Encoder[T] for a variety of
> things, including case classes and their members. Barring a better
> solution, what I’d like is to be able to add some additional case
> statements to the serializerFor and deserializeFor methods, dispatching to
> something along the lines of the Slick MappedColumnType[1]. In an ideal
> scenario, I could provide these mappings via implicit search, but I’d be
> happy to settle for a registry of some sort too.
>
> Does this idea make sense, in general? I’m interested in taking a stab at
> the implementation, but Jakob recommended I surface it here first to see if
> there were any plans around this sort of functionality already.
>
> Thanks!
>
> —erik
>
> 1. http://slick.lightbend.com/doc/3.0.0/userdefined.html#
> using-custom-scalar-types-in-queries
>
>
> -
> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>
>


Re: Flink event session window in Spark

2016-12-02 Thread Michael Armbrust
Here is the JIRA for adding this feature:
https://issues.apache.org/jira/browse/SPARK-10816

On Fri, Dec 2, 2016 at 11:20 AM, Fritz Budiyanto 
wrote:

> Hi All,
>
> I need help on how to implement Flink event session window in Spark. Is
> this possible?
>
> For instance, I wanted to create a session window with a timeout of 10
> minutes (see Flink snippet below)
> Continues event will make the session window alive. If there are no
> activity for 10 minutes, the session window shall close and forward the
> data to a sink function.
>
> // event-time session windows
> input
>
> .keyBy()
>
> .window(EventTimeSessionWindows.withGap(Time.minutes(10)))
>
> .();
>
>
>
>
> Any idea ?
>
> Thanks,
> Fritz
>


[jira] [Updated] (SPARK-18234) Update mode in structured streaming

2016-12-01 Thread Michael Armbrust (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-18234?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Michael Armbrust updated SPARK-18234:
-
Target Version/s: 2.2.0

> Update mode in structured streaming
> ---
>
> Key: SPARK-18234
> URL: https://issues.apache.org/jira/browse/SPARK-18234
> Project: Spark
>  Issue Type: New Feature
>  Components: Structured Streaming
>    Reporter: Michael Armbrust
>Priority: Critical
>
> We have this internal, but we should nail down the semantics and expose it to 
> users.  The idea of update mode is that any tuple that changes will be 
> emitted.  Open questions:
>  - do we need to reason about the {{keys}} for a given stream?  For things 
> like the {{foreach}} sink its up to the user.  However, for more end to end 
> use cases such as a JDBC sink, we need to know which row downstream is being 
> updated.
>  - okay to not support files?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-18682) Batch Source for Kafka

2016-12-01 Thread Michael Armbrust (JIRA)
Michael Armbrust created SPARK-18682:


 Summary: Batch Source for Kafka
 Key: SPARK-18682
 URL: https://issues.apache.org/jira/browse/SPARK-18682
 Project: Spark
  Issue Type: New Feature
  Components: SQL, Structured Streaming
Reporter: Michael Armbrust


Today, you can start a stream that reads from kafka.  However, given kafka's 
configurable retention period, it seems like sometimes you might just want to 
read all of the data that is available now.  As such we should add a version 
that works with {{spark.read}} as well.

The options should be the same as the streaming kafka source, with the 
following differences:
 - {{startingOffsets}} should default to earliest, and should not allow 
{{latest}} (which would always be empty).
 - {{endingOffsets}} should also be allowed and should default to {{latest}}. 
the same assign json format as {{startingOffsets}} should also be accepted.

It would be really good, if things like {{.limit\(n\)}} were enough to prevent 
all the data from being read (this might just work).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



Re: [structured streaming] How to remove outdated data when use Window Operations

2016-12-01 Thread Michael Armbrust
Yes

!

On Thu, Dec 1, 2016 at 12:57 PM, ayan guha  wrote:

> Thanks TD. Will it be available in pyspark too?
> On 1 Dec 2016 19:55, "Tathagata Das"  wrote:
>
>> In the meantime, if you are interested, you can read the design doc in
>> the corresponding JIRA - https://issues.apache.org/ji
>> ra/browse/SPARK-18124
>>
>> On Thu, Dec 1, 2016 at 12:53 AM, Tathagata Das <
>> tathagata.das1...@gmail.com> wrote:
>>
>>> That feature is coming in 2.1.0. We have added watermarking, that will
>>> track the event time of the data and accordingly close old windows, output
>>> its corresponding aggregate and then drop its corresponding state. But in
>>> that case, you will have to use append mode, and aggregated data of a
>>> particular window will be evicted only when the windows is closed. You will
>>> be able to control the threshold on how long to wait for late, out-of-order
>>> data before closing a window.
>>>
>>> We will be updated the docs soon to explain this.
>>>
>>> On Tue, Nov 29, 2016 at 8:30 PM, Xinyu Zhang  wrote:
>>>
 Hi

 I want to use window operations. However, if i don't remove any data,
 the "complete" table will become larger and larger as time goes on. So I
 want to remove some outdated data in the complete table that I would never
 use.
 Is there any method to meet my requirement?

 Thanks!





>>>
>>>
>>


[jira] [Reopened] (SPARK-18122) Fallback to Kryo for unknown classes in ExpressionEncoder

2016-11-30 Thread Michael Armbrust (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-18122?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Michael Armbrust reopened SPARK-18122:
--

I'm going to reopen this.  I think the benefits outweigh the compatibility 
concerns.

> Fallback to Kryo for unknown classes in ExpressionEncoder
> -
>
> Key: SPARK-18122
> URL: https://issues.apache.org/jira/browse/SPARK-18122
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.0.1
>    Reporter: Michael Armbrust
>Priority: Critical
>
> In Spark 2.0 we fail to generate an encoder if any of the fields of the class 
> are not of a supported type.  One example is {{Option\[Set\[Int\]\]}}, but 
> there are many more.  We should give the user the option to fall back on 
> opaque kryo serialization in these cases for subtrees of the encoder, rather 
> than failing.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-17939) Spark-SQL Nullability: Optimizations vs. Enforcement Clarification

2016-11-30 Thread Michael Armbrust (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-17939?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Michael Armbrust updated SPARK-17939:
-
Target Version/s: 2.1.0

> Spark-SQL Nullability: Optimizations vs. Enforcement Clarification
> --
>
> Key: SPARK-17939
> URL: https://issues.apache.org/jira/browse/SPARK-17939
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.0.0
>Reporter: Aleksander Eskilson
>Priority: Critical
>
> The notion of Nullability of of StructFields in DataFrames and Datasets 
> creates some confusion. As has been pointed out previously [1], Nullability 
> is a hint to the Catalyst optimizer, and is not meant to be a type-level 
> enforcement. Allowing null fields can also help the reader successfully parse 
> certain types of more loosely-typed data, like JSON and CSV, where null 
> values are common, rather than just failing. 
> There's already been some movement to clarify the meaning of Nullable in the 
> API, but also some requests for a (perhaps completely separate) type-level 
> implementation of Nullable that can act as an enforcement contract.
> This bug is logged here to discuss and clarify this issue.
> [1] - 
> [https://issues.apache.org/jira/browse/SPARK-11319|https://issues.apache.org/jira/browse/SPARK-11319?focusedCommentId=15014535&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-15014535]
> [2] - https://github.com/apache/spark/pull/11785



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-17939) Spark-SQL Nullability: Optimizations vs. Enforcement Clarification

2016-11-30 Thread Michael Armbrust (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-17939?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Michael Armbrust updated SPARK-17939:
-
Target Version/s: 2.2.0  (was: 2.1.0)

> Spark-SQL Nullability: Optimizations vs. Enforcement Clarification
> --
>
> Key: SPARK-17939
> URL: https://issues.apache.org/jira/browse/SPARK-17939
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.0.0
>Reporter: Aleksander Eskilson
>Priority: Critical
>
> The notion of Nullability of of StructFields in DataFrames and Datasets 
> creates some confusion. As has been pointed out previously [1], Nullability 
> is a hint to the Catalyst optimizer, and is not meant to be a type-level 
> enforcement. Allowing null fields can also help the reader successfully parse 
> certain types of more loosely-typed data, like JSON and CSV, where null 
> values are common, rather than just failing. 
> There's already been some movement to clarify the meaning of Nullable in the 
> API, but also some requests for a (perhaps completely separate) type-level 
> implementation of Nullable that can act as an enforcement contract.
> This bug is logged here to discuss and clarify this issue.
> [1] - 
> [https://issues.apache.org/jira/browse/SPARK-11319|https://issues.apache.org/jira/browse/SPARK-11319?focusedCommentId=15014535&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-15014535]
> [2] - https://github.com/apache/spark/pull/11785



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



Re: [VOTE] Apache Spark 2.1.0 (RC1)

2016-11-30 Thread Michael Armbrust
Unfortunately the FileFormat APIs are not stable yet, so if you are using
spark-avro, we are going to need to update it for this release.

On Wed, Nov 30, 2016 at 2:56 PM, Koert Kuipers  wrote:

> running our inhouse unit-tests (that work with spark 2.0.2) against spark
> 2.1.0-rc1 i see the following issues.
>
> any test that use avro (spark-avro 3.1.0) have this error:
> java.lang.AbstractMethodError
> at org.apache.spark.sql.execution.datasources.FileFormatWriter$
> SingleDirectoryWriteTask.(FileFormatWriter.scala:232)
> at org.apache.spark.sql.execution.datasources.
> FileFormatWriter$.org$apache$spark$sql$execution$
> datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:182)
> at org.apache.spark.sql.execution.datasources.
> FileFormatWriter$$anonfun$write$1$$anonfun$3.apply(
> FileFormatWriter.scala:129)
> at org.apache.spark.sql.execution.datasources.
> FileFormatWriter$$anonfun$write$1$$anonfun$3.apply(
> FileFormatWriter.scala:128)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
> at org.apache.spark.scheduler.Task.run(Task.scala:99)
> at org.apache.spark.executor.Executor$TaskRunner.run(
> Executor.scala:282)
> at java.util.concurrent.ThreadPoolExecutor.runWorker(
> ThreadPoolExecutor.java:1145)
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(
> ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:745)
>
>
> so looks like some api got changed or broken. i dont know if this is an
> issue or if this is OK.
>
> also a bunch of unit test related to reading and writing csv files fail.
> the issue seems to be newlines inside quoted values. this worked before and
> now it doesnt work anymore. i dont know if this was an accidentally
> supported feature and its ok to be broken? i am not even sure it is a good
> idea to support newlines inside quoted values. anyhow they still get
> written out the same way as before, but now when reading it back in things
> break down.
>
>
> On Mon, Nov 28, 2016 at 8:25 PM, Reynold Xin  wrote:
>
>> Please vote on releasing the following candidate as Apache Spark version
>> 2.1.0. The vote is open until Thursday, December 1, 2016 at 18:00 UTC and
>> passes if a majority of at least 3 +1 PMC votes are cast.
>>
>> [ ] +1 Release this package as Apache Spark 2.1.0
>> [ ] -1 Do not release this package because ...
>>
>>
>> To learn more about Apache Spark, please see http://spark.apache.org/
>>
>> The tag to be voted on is v2.1.0-rc1 (80aabc0bd33dc5661a90133156247
>> e7a8c1bf7f5)
>>
>> The release files, including signatures, digests, etc. can be found at:
>> http://people.apache.org/~pwendell/spark-releases/spark-2.1.0-rc1-bin/
>>
>> Release artifacts are signed with the following key:
>> https://people.apache.org/keys/committer/pwendell.asc
>>
>> The staging repository for this release can be found at:
>> https://repository.apache.org/content/repositories/orgapachespark-1216/
>>
>> The documentation corresponding to this release can be found at:
>> http://people.apache.org/~pwendell/spark-releases/spark-2.1.0-rc1-docs/
>>
>>
>> ===
>> How can I help test this release?
>> ===
>> If you are a Spark user, you can help us test this release by taking an
>> existing Spark workload and running on this release candidate, then
>> reporting any regressions.
>>
>> ===
>> What should happen to JIRA tickets still targeting 2.1.0?
>> ===
>> Committers should look at those and triage. Extremely important bug
>> fixes, documentation, and API tweaks that impact compatibility should be
>> worked on immediately. Everything else please retarget to 2.1.1 or 2.2.0.
>>
>>
>>
>


[jira] [Created] (SPARK-18657) Persist UUID across query restart

2016-11-30 Thread Michael Armbrust (JIRA)
Michael Armbrust created SPARK-18657:


 Summary: Persist UUID across query restart
 Key: SPARK-18657
 URL: https://issues.apache.org/jira/browse/SPARK-18657
 Project: Spark
  Issue Type: Bug
  Components: Structured Streaming
Reporter: Michael Armbrust
Priority: Critical


We probably also want to add an instance Id or something that changes when the 
query restarts



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-18588) KafkaSourceStressForDontFailOnDataLossSuite is flaky

2016-11-30 Thread Michael Armbrust (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-18588?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Michael Armbrust updated SPARK-18588:
-
Target Version/s: 2.1.0

> KafkaSourceStressForDontFailOnDataLossSuite is flaky
> 
>
> Key: SPARK-18588
> URL: https://issues.apache.org/jira/browse/SPARK-18588
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Reporter: Herman van Hovell
>Assignee: Shixiong Zhu
>
> https://spark-tests.appspot.com/test-details?suite_name=org.apache.spark.sql.kafka010.KafkaSourceStressForDontFailOnDataLossSuite&test_name=stress+test+for+failOnDataLoss%3Dfalse



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-16545) Structured Streaming : foreachSink creates the Physical Plan multiple times per TriggerInterval

2016-11-30 Thread Michael Armbrust (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-16545?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Michael Armbrust resolved SPARK-16545.
--
Resolution: Later

> Structured Streaming : foreachSink creates the Physical Plan multiple times 
> per TriggerInterval 
> 
>
> Key: SPARK-16545
> URL: https://issues.apache.org/jira/browse/SPARK-16545
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.0.0
>Reporter: Mario Briggs
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-18655) Ignore Structured Streaming 2.0.2 logs in history server

2016-11-30 Thread Michael Armbrust (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-18655?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Michael Armbrust updated SPARK-18655:
-
Fix Version/s: (was: 2.1.0)

> Ignore Structured Streaming 2.0.2 logs in history server
> 
>
> Key: SPARK-18655
> URL: https://issues.apache.org/jira/browse/SPARK-18655
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.1.0
>Reporter: Shixiong Zhu
>Assignee: Shixiong Zhu
>Priority: Blocker
>
> SPARK-18516 changes the event log format of Structured Streaming. We should 
> make sure our changes not break the history server.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-18655) Ignore Structured Streaming 2.0.2 logs in history server

2016-11-30 Thread Michael Armbrust (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-18655?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Michael Armbrust updated SPARK-18655:
-
Target Version/s: 2.1.0

> Ignore Structured Streaming 2.0.2 logs in history server
> 
>
> Key: SPARK-18655
> URL: https://issues.apache.org/jira/browse/SPARK-18655
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.1.0
>Reporter: Shixiong Zhu
>Assignee: Shixiong Zhu
>Priority: Blocker
>
> SPARK-18516 changes the event log format of Structured Streaming. We should 
> make sure our changes not break the history server.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-18516) Separate instantaneous state from progress performance statistics

2016-11-29 Thread Michael Armbrust (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-18516?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Michael Armbrust resolved SPARK-18516.
--
   Resolution: Fixed
Fix Version/s: 2.1.0

Issue resolved by pull request 15954
[https://github.com/apache/spark/pull/15954]

> Separate instantaneous state from progress performance statistics
> -
>
> Key: SPARK-18516
> URL: https://issues.apache.org/jira/browse/SPARK-18516
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>    Reporter: Michael Armbrust
>Assignee: Michael Armbrust
>Priority: Blocker
> Fix For: 2.1.0
>
>
> There are two types of information that you want to be able to extract from a 
> running query: instantaneous _status_ and metrics about the performance as 
> make _progress_ in query processing.
> Today, these are conflated in a single {{StreamingQueryStatus}} object.  The 
> downside to this approach is that a user now needs to reason about what state 
> the query is in anytime they retrieve a status object.  Fields like 
> {{statusMessage}} don't appear in updates that come from listener bus.  
> Simlarly, {{inputRate}}/{{processingRate}} statistics are usually {{0}} when 
> you retrieve a status object from the query itself.
> I propose we make the follow changes:
>  - Make {{status}} only report instantaneous things, such as if data is 
> available or a human readable message about what phase we are currently in.
>  - Have a separate {{progress}} message that we report for each trigger with 
> the other performance information that lives in status today.  You should be 
> able to easily retrieve a configurable number of the most recent progress 
> messages instead of just the most recent.
> While we are making these changes, I propose that we also change {{id}} to be 
> a globally unique identifier, rather than a JVM unique one.  Without this its 
> hard to correlate performance across restarts.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Reopened] (SPARK-18475) Be able to provide higher parallelization for StructuredStreaming Kafka Source

2016-11-29 Thread Michael Armbrust (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-18475?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Michael Armbrust reopened SPARK-18475:
--

> Be able to provide higher parallelization for StructuredStreaming Kafka Source
> --
>
> Key: SPARK-18475
> URL: https://issues.apache.org/jira/browse/SPARK-18475
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>Affects Versions: 2.0.2, 2.1.0
>Reporter: Burak Yavuz
>
> Right now the StructuredStreaming Kafka Source creates as many Spark tasks as 
> there are TopicPartitions that we're going to read from Kafka.
> This doesn't work well when we have data skew, and there is no reason why we 
> shouldn't be able to increase parallelism further, i.e. have multiple Spark 
> tasks reading from the same Kafka TopicPartition.
> What this will mean is that we won't be able to use the "CachedKafkaConsumer" 
> for what it is defined for (being cached) in this use case, but the extra 
> overhead is worth handling data skew and increasing parallelism especially in 
> ETL use cases.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-18475) Be able to provide higher parallelization for StructuredStreaming Kafka Source

2016-11-29 Thread Michael Armbrust (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-18475?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15706692#comment-15706692
 ] 

Michael Armbrust commented on SPARK-18475:
--

I think that this suggestion was closed prematurely.  While I don't think that 
we want to always perform this optimization, I think that for a large subset of 
the {{DataFrame}} operations that we support this is valid.  Furthermore, Burak 
has already show empirically that it significantly increases throughput, and I 
don't think that should be dismissed.  Spark users are not always the same 
people who are configuring Kafka, and I don't see a reason to tie their hands.

To unpack some of the specific concerns:
 - *Violation of Kafka's Ordering* - The proposal doesn't change the order of 
data presented by an iterator.  It just subdivides further than the existing 
batching mechanism and parallelizes.  For an operation like {{mapPartitions}}, 
running two correctly ordered partitions in parallel is indistinguishable from 
running them serially at batch boundaries. That is, unless your computation is 
non-deterministic as a result of communication with an external store.  Here, 
it should be noted that non-deterministic computation violates our recovery 
semantics, and should be avoided anyway.  That said, there certainly are cases 
where people may choose to give up correctness during recovery and that is why 
I agree this optimization should be optional.  Perhaps even off by default.
 - *Partitions are the answer* - Sufficient partitions are helpful, but this 
optimization would allow you to increase throughput through the use of replicas 
as well.  And again, Spark users are not always Kafka administrators.

Now, there is an operation, {{mapWithState}}, where this optimization could 
change the result.  I do think we will want to support this operation 
eventually (maybe in 2.2). I haven't really figured out the specifics, but I 
would imagine we can use existing mechanisms in the query planner, such as 
{{requiredChildOrdering}} or {{requiredChildDistribution}} to make sure that we 
only turn this on when it can't change the answer.

> Be able to provide higher parallelization for StructuredStreaming Kafka Source
> --
>
> Key: SPARK-18475
> URL: https://issues.apache.org/jira/browse/SPARK-18475
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>Affects Versions: 2.0.2, 2.1.0
>Reporter: Burak Yavuz
>
> Right now the StructuredStreaming Kafka Source creates as many Spark tasks as 
> there are TopicPartitions that we're going to read from Kafka.
> This doesn't work well when we have data skew, and there is no reason why we 
> shouldn't be able to increase parallelism further, i.e. have multiple Spark 
> tasks reading from the same Kafka TopicPartition.
> What this will mean is that we won't be able to use the "CachedKafkaConsumer" 
> for what it is defined for (being cached) in this use case, but the extra 
> overhead is worth handling data skew and increasing parallelism especially in 
> ETL use cases.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-18498) Clean up HDFSMetadataLog API for better testing

2016-11-29 Thread Michael Armbrust (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-18498?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Michael Armbrust resolved SPARK-18498.
--
   Resolution: Fixed
Fix Version/s: 2.1.0

Issue resolved by pull request 15924
[https://github.com/apache/spark/pull/15924]

> Clean up HDFSMetadataLog API for better testing
> ---
>
> Key: SPARK-18498
> URL: https://issues.apache.org/jira/browse/SPARK-18498
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL, Structured Streaming
>Affects Versions: 2.0.1
>Reporter: Tyson Condie
>Priority: Minor
>  Labels: test
> Fix For: 2.1.0
>
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>
> HDFSMetadataLog current conflates metadata log serialization and file writes. 
> The goal is to separate these two steps to enable more thorough testing.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



Re: How do I flatten JSON blobs into a Data Frame using Spark/Spark SQL

2016-11-28 Thread Michael Armbrust
You could open up a JIRA to add a version of from_json that supports schema
inference, but unfortunately that would not be super easy to implement.  In
particular, it would introduce a weird case where only this specific
function would block for a long time while we infer the schema (instead of
waiting for an action).  This blocking would be kind of odd for a call like
df.select(...).  If there is enough interest, though, we should still do it.

To give a little more detail, your version of the code is actually doing
two passes over the data: one to infer the schema and a second for whatever
processing you are asking it to do.  We have to know the schema at each
step of DataFrame construction, so we'd have to do this even before you
called an action.

Personally, I usually take a small sample of data and use schema inference
on that.  I then hardcode that schema into my program.  This makes your
spark jobs much faster and removes the possibility of the schema changing
underneath the covers.

Here's some code I use to build the static schema code automatically
<https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/1023043053387187/1128172975083446/2840265927289860/latest.html>
.

Would that work for you? If not, why not?

On Wed, Nov 23, 2016 at 2:48 AM, kant kodali  wrote:

> Hi Michael,
>
> Looks like all from_json functions will require me to pass schema and that
> can be little tricky for us but the code below doesn't require me to pass
> schema at all.
>
> import org.apache.spark.sql._
> val rdd = df2.rdd.map { case Row(j: String) => j }
> spark.read.json(rdd).show()
>
>
> On Tue, Nov 22, 2016 at 2:42 PM, Michael Armbrust 
> wrote:
>
>> The first release candidate should be coming out this week. You can
>> subscribe to the dev list if you want to follow the release schedule.
>>
>> On Mon, Nov 21, 2016 at 9:34 PM, kant kodali  wrote:
>>
>>> Hi Michael,
>>>
>>> I only see spark 2.0.2 which is what I am using currently. Any idea on
>>> when 2.1 will be released?
>>>
>>> Thanks,
>>> kant
>>>
>>> On Mon, Nov 21, 2016 at 5:12 PM, Michael Armbrust <
>>> mich...@databricks.com> wrote:
>>>
>>>> In Spark 2.1 we've added a from_json
>>>> <https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/functions.scala#L2902>
>>>> function that I think will do what you want.
>>>>
>>>> On Fri, Nov 18, 2016 at 2:29 AM, kant kodali 
>>>> wrote:
>>>>
>>>>> This seem to work
>>>>>
>>>>> import org.apache.spark.sql._
>>>>> val rdd = df2.rdd.map { case Row(j: String) => j }
>>>>> spark.read.json(rdd).show()
>>>>>
>>>>> However I wonder if this any inefficiency here ? since I have to apply
>>>>> this function for billion rows.
>>>>>
>>>>>
>>>>
>>>
>>
>


[jira] [Commented] (SPARK-18541) Add pyspark.sql.Column.aliasWithMetadata to allow dynamic metadata management in pyspark SQL API

2016-11-28 Thread Michael Armbrust (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-18541?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15703146#comment-15703146
 ] 

Michael Armbrust commented on SPARK-18541:
--

I don't think you can use {{as}} in python, as I believe it is a reserved word. 
 I would support having an optional second argument to {{alias}}, similar to 
what we do in [Scala'a 
as|https://spark.apache.org/docs/2.0.0/api/java/org/apache/spark/sql/Column.html#as(java.lang.String,%20org.apache.spark.sql.types.Metadata)].

> Add pyspark.sql.Column.aliasWithMetadata to allow dynamic metadata management 
> in pyspark SQL API
> 
>
> Key: SPARK-18541
> URL: https://issues.apache.org/jira/browse/SPARK-18541
> Project: Spark
>  Issue Type: Improvement
>  Components: PySpark, SQL
>Affects Versions: 2.0.2
> Environment: all
>Reporter: Shea Parkes
>Priority: Minor
>  Labels: newbie
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> In the Scala SQL API, you can pass in new metadata when you alias a field.  
> That functionality is not available in the Python API.   Right now, you have 
> to painfully utilize {{SparkSession.createDataFrame}} to manipulate the 
> metadata for even a single column.  I would propose to add the following 
> method to {{pyspark.sql.Column}}:
> {code}
> def aliasWithMetadata(self, name, metadata):
> """
> Make a new Column that has the provided alias and metadata.
> Metadata will be processed with json.dumps()
> """
> _context = pyspark.SparkContext._active_spark_context
> _metadata_str = json.dumps(metadata)
> _metadata_jvm = 
> _context._jvm.org.apache.spark.sql.types.Metadata.fromJson(_metadata_str)
> _new_java_column = getattr(self._jc, 'as')(name, _metadata_jvm)
> return Column(_new_java_column)
> {code}
> I can likely complete this request myself if there is any interest for it.  
> Just have to dust off my knowledge of doctest and the location of the python 
> tests.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-18498) Clean up HDFSMetadataLog API for better testing

2016-11-28 Thread Michael Armbrust (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-18498?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Michael Armbrust updated SPARK-18498:
-
Target Version/s: 2.1.0  (was: 2.2.0)

> Clean up HDFSMetadataLog API for better testing
> ---
>
> Key: SPARK-18498
> URL: https://issues.apache.org/jira/browse/SPARK-18498
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL, Structured Streaming
>Affects Versions: 2.0.1
>Reporter: Tyson Condie
>Priority: Minor
>  Labels: test
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>
> HDFSMetadataLog current conflates metadata log serialization and file writes. 
> The goal is to separate these two steps to enable more thorough testing.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



Re: Any equivalent method lateral and explore

2016-11-22 Thread Michael Armbrust
Both collect_list and explode are available in the function library

.

The following is an example of using it:
df.select($"*", explode($"myArray") as 'arrayItem)

On Tue, Nov 22, 2016 at 2:42 PM, Mahender Sarangam <
mahender.bigd...@outlook.com> wrote:

> Hi,
>
> We are converting our hive logic which is using lateral view and explode
> functions. Is there any builtin function in scala for performing lateral
> view explore.
>
>
> Below is our  query in Hive. temparray is temp table with c0 and c1 columns
>
> SELECT id, CONCAT_WS(',', collect_list(LineID)) as LineiD
> FROM (SELECT cast(LineID as STRING) as LineiD, cast(id as STRING) as id
> FROM temparray LATERAL VIEW explode(`_c1`) adTable AS id) T
> GROUP BY id;
>
>
> Can any one provide pointer for string functions available in scala. We
> would like perform operations like Collect_List, get starting index of
> matching string.
>
>
>
> Nanu
>
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: How do I flatten JSON blobs into a Data Frame using Spark/Spark SQL

2016-11-22 Thread Michael Armbrust
The first release candidate should be coming out this week. You can
subscribe to the dev list if you want to follow the release schedule.

On Mon, Nov 21, 2016 at 9:34 PM, kant kodali  wrote:

> Hi Michael,
>
> I only see spark 2.0.2 which is what I am using currently. Any idea on
> when 2.1 will be released?
>
> Thanks,
> kant
>
> On Mon, Nov 21, 2016 at 5:12 PM, Michael Armbrust 
> wrote:
>
>> In Spark 2.1 we've added a from_json
>> <https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/functions.scala#L2902>
>> function that I think will do what you want.
>>
>> On Fri, Nov 18, 2016 at 2:29 AM, kant kodali  wrote:
>>
>>> This seem to work
>>>
>>> import org.apache.spark.sql._
>>> val rdd = df2.rdd.map { case Row(j: String) => j }
>>> spark.read.json(rdd).show()
>>>
>>> However I wonder if this any inefficiency here ? since I have to apply
>>> this function for billion rows.
>>>
>>>
>>
>


Re: How do I persist the data after I process the data with Structured streaming...

2016-11-22 Thread Michael Armbrust
We are looking to add a native JDBC sink in Spark 2.2.  Until then you can
write your own connector using df.writeStream.foreach.

On Tue, Nov 22, 2016 at 12:55 PM, shyla deshpande 
wrote:

> Hi,
>
> Structured streaming works great with Kafka source but I need to persist
> the data after processing in some database like Cassandra or at least
> Postgres.
>
> Any suggestions, help please.
>
> Thanks
>


Re: How do I persist the data after I process the data with Structured streaming...

2016-11-22 Thread Michael Armbrust
Forgot the link:
https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#using-foreach

On Tue, Nov 22, 2016 at 2:40 PM, Michael Armbrust 
wrote:

> We are looking to add a native JDBC sink in Spark 2.2.  Until then you can
> write your own connector using df.writeStream.foreach.
>
> On Tue, Nov 22, 2016 at 12:55 PM, shyla deshpande <
> deshpandesh...@gmail.com> wrote:
>
>> Hi,
>>
>> Structured streaming works great with Kafka source but I need to persist
>> the data after processing in some database like Cassandra or at least
>> Postgres.
>>
>> Any suggestions, help please.
>>
>> Thanks
>>
>
>


Re: How do I flatten JSON blobs into a Data Frame using Spark/Spark SQL

2016-11-21 Thread Michael Armbrust
In Spark 2.1 we've added a from_json

function that I think will do what you want.

On Fri, Nov 18, 2016 at 2:29 AM, kant kodali  wrote:

> This seem to work
>
> import org.apache.spark.sql._
> val rdd = df2.rdd.map { case Row(j: String) => j }
> spark.read.json(rdd).show()
>
> However I wonder if this any inefficiency here ? since I have to apply
> this function for billion rows.
>
>


Re: Stateful aggregations with Structured Streaming

2016-11-21 Thread Michael Armbrust
We are planning on adding mapWithState or something similar in a future
release.  In the mean time, standard Dataframe aggregations should work
(count, sum, etc).  If you are looking to do something custom, I'd suggest
looking at Aggregators

.

On Sat, Nov 19, 2016 at 5:46 AM, Yuval.Itzchakov  wrote:

> I've been using `DStream.mapWithState` and was looking forward to trying
> out
> Structured Streaming. The thing I can't under is, does Structured Streaming
> in it's current state support stateful aggregations?
>
> Looking at the StateStore design document
> (https://docs.google.com/document/d/1-ncawFx8JS5Zyfq1HAEGBx56RDet9wf
> Vp_hDM8ZL254/edit#heading=h.2h7zw4ru3nw7),
> and then doing a bit of digging around in the Spark codebase, I've seen
> `mapPartitionsWithStateStore` as the only viable way of doing something
> with
> a store, but the API requires an `UnsafeRow` for key and value which makes
> we question if this is a real public API one should be using?
>
> Does anyone know what the state of things are currently in regards to an
> equivalent to `mapWithState` in Structured Streaming?
>
> Thanks,
> Yuval.
>
>
>
> --
> View this message in context: http://apache-spark-user-list.
> 1001560.n3.nabble.com/Stateful-aggregations-with-
> Structured-Streaming-tp28108.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Spark 2.0.2, Structured Streaming with kafka source... Unable to parse the value to Object..

2016-11-21 Thread Michael Armbrust
You could also do this with Datasets, which will probably be a little more
efficient (since you are telling us you only care about one column)

ds1.select($"value".as[Array[Byte]]).map(Student.parseFrom)

On Thu, Nov 17, 2016 at 1:05 PM, shyla deshpande 
wrote:

> Hello everyone,
>  The following code works ...
>
> def main(args : Array[String]) {
>
>   val spark = SparkSession.builder.
> master("local")
> .appName("spark session example")
> .getOrCreate()
>
>   import spark.implicits._
>
>   val ds1 = spark.readStream.format("kafka").
> option("kafka.bootstrap.servers","localhost:9092").
> option("subscribe","student").load()
>
>   val ds2 = ds1.map(row=> 
> row.getAs[Array[Byte]]("value")).map(Student.parseFrom(_))
>
>   val query = ds2.writeStream
> .outputMode("append")
> .format("console")
> .start()
>
>   query.awaitTermination()
>
> }
>
>
> On Thu, Nov 17, 2016 at 11:30 AM, shyla deshpande <
> deshpandesh...@gmail.com> wrote:
>
>> val spark = SparkSession.builder.
>>   master("local")
>>   .appName("spark session example")
>>   .getOrCreate()
>>
>> import spark.implicits._
>>
>> val dframe1 = spark.readStream.format("kafka").
>>   option("kafka.bootstrap.servers","localhost:9092").
>>   option("subscribe","student").load()
>>
>> *How do I deserialize the value column from dataframe1 *
>>
>> *which is Array[Byte] to Student object using Student.parseFrom..???*
>>
>> *Please help.*
>>
>> *Thanks.*
>>
>>
>>
>> // Stream of votes from Kafka as bytesval votesAsBytes = 
>> KafkaUtils.createDirectStream[String, Array[Byte]](
>>   ssc, LocationStrategies.PreferConsistent,
>>   ConsumerStrategies.Subscribe[String, Array[Byte]](Array("votes"), 
>> kafkaParams))
>> // Parse them into Vote case class.val votes: DStream[Vote] = 
>> votesAsBytes.map {
>>   (cr: ConsumerRecord[String, Array[Byte]]) =>
>> Vote.parseFrom(cr.value())}
>>
>>
>


Re: Create a Column expression from a String

2016-11-21 Thread Michael Armbrust
You are looking for org.apache.spark.sql.functions.expr()

On Sat, Nov 19, 2016 at 6:12 PM, Stuart White 
wrote:

> I'd like to allow for runtime-configured Column expressions in my
> Spark SQL application.  For example, if my application needs a 5-digit
> zip code, but the file I'm processing contains a 9-digit zip code, I'd
> like to be able to configure my application with the expression
> "substring('zipCode, 0, 5)" to use for the zip code.
>
> So, I think I'm looking for something like this:
>
> def parseColumnExpression(colExpr: String) : Column
>
> I see that SparkSession's sql() method exists to take a string and
> parse it into a DataFrame.  But that's not quite what I want.
>
> Does a mechanism exist that would allow me to take a string
> representation of a column expression and parse it into an actual
> column expression (something that could be use in a .select() call,
> for example)?
>
> Thanks!
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


[jira] [Created] (SPARK-18530) Kafka timestamp should be TimestampType

2016-11-21 Thread Michael Armbrust (JIRA)
Michael Armbrust created SPARK-18530:


 Summary: Kafka timestamp should be TimestampType
 Key: SPARK-18530
 URL: https://issues.apache.org/jira/browse/SPARK-18530
 Project: Spark
  Issue Type: Bug
  Components: Structured Streaming
Reporter: Michael Armbrust
Priority: Blocker


Otherwise every time you try to use it you have to do a manual conversion.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-18339) Don't push down current_timestamp for filters in StructuredStreaming

2016-11-21 Thread Michael Armbrust (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-18339?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Michael Armbrust updated SPARK-18339:
-
Priority: Critical  (was: Major)

> Don't push down current_timestamp for filters in StructuredStreaming
> 
>
> Key: SPARK-18339
> URL: https://issues.apache.org/jira/browse/SPARK-18339
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.0.1
>Reporter: Burak Yavuz
>Assignee: Tyson Condie
>Priority: Critical
>
> For the following workflow:
> 1. I have a column called time which is at minute level precision in a 
> Streaming DataFrame
> 2. I want to perform groupBy time, count
> 3. Then I want my MemorySink to only have the last 30 minutes of counts and I 
> perform this by
> {code}
> .where('time >= current_timestamp().cast("long") - 30 * 60)
> {code}
> what happens is that the `filter` gets pushed down before the aggregation, 
> and the filter happens on the source data for the aggregation instead of the 
> result of the aggregation (where I actually want to filter).
> I guess the main issue here is that `current_timestamp` is non-deterministic 
> in the streaming context and shouldn't be pushed down the filter.
> Does this require us to store the `current_timestamp` for each trigger of the 
> streaming job, that is something to discuss.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-18513) Record and recover watermark

2016-11-21 Thread Michael Armbrust (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-18513?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Michael Armbrust updated SPARK-18513:
-
Priority: Blocker  (was: Major)

> Record and recover watermark
> 
>
> Key: SPARK-18513
> URL: https://issues.apache.org/jira/browse/SPARK-18513
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>Reporter: Liwei Lin
>Priority: Blocker
>
> We should record the watermark into the persistent log and recover it to 
> ensure determinism.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-18513) Record and recover watermark

2016-11-21 Thread Michael Armbrust (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-18513?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Michael Armbrust updated SPARK-18513:
-
Target Version/s: 2.1.0

> Record and recover watermark
> 
>
> Key: SPARK-18513
> URL: https://issues.apache.org/jira/browse/SPARK-18513
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>Reporter: Liwei Lin
>
> We should record the watermark into the persistent log and recover it to 
> ensure determinism.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-18529) Timeouts shouldn't be AssertionErrors

2016-11-21 Thread Michael Armbrust (JIRA)
Michael Armbrust created SPARK-18529:


 Summary: Timeouts shouldn't be AssertionErrors
 Key: SPARK-18529
 URL: https://issues.apache.org/jira/browse/SPARK-18529
 Project: Spark
  Issue Type: Bug
  Components: Structured Streaming
Reporter: Michael Armbrust


A timeout should inherit from {{RuntimeException}} as its not a fatal error.
{code}
java.lang.AssertionError: assertion failed: Failed to get records for 
spark-kafka-source-26d6f51c-0781-45a4-ab8e-bc6bd6603917-86874470-executor 
service-log-0 49350201 after polling for 1000
at scala.Predef$.assert(Predef.scala:170)
at 
org.apache.spark.sql.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:65)
at 
org.apache.spark.sql.kafka010.KafkaSourceRDD$$anon$1.next(KafkaSourceRDD.scala:146)
at 
org.apache.spark.sql.kafka010.KafkaSourceRDD$$anon$1.next(KafkaSourceRDD.scala:142)
{code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-18516) Separate instantaneous state from progress performance statistics

2016-11-20 Thread Michael Armbrust (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-18516?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Michael Armbrust updated SPARK-18516:
-
Description: 
There are two types of information that you want to be able to extract from a 
running query: instantaneous _status_ and metrics about the performance as make 
_progress_ in query processing.

Today, these are conflated in a single {{StreamingQueryStatus}} object.  The 
downside to this approach is that a user now needs to reason about what state 
the query is in anytime they retrieve a status object.  Fields like 
{{statusMessage}} don't appear in updates that come from listener bus.  And 
inputRate/processingRate statistics are usually {{0}} when you retrieve a 
status object from the query itself.

I propose we make the follow changes:
 - Make {{status}} only report instantaneous things, such as if data is 
available or a human readable message about what phase we are currently in.
 - Have a separate {{progress}} message that we report for each trigger with 
the other performance information that lives in status today.  You should be 
able to easily retrieve a configurable number of the most recent progress 
messages instead of just the most recent.

While we are making these changes, I propose that we also change {{id}} to be a 
globally unique identifier, rather than a JVM unique one.  Without this its 
hard to correlate performance across restarts.

  was:
There are two types of information that you want to be able to extract from a 
running query: instantaneous _status_ and metrics about the performance as make 
_progress_ in query processing.

Today, these are conflated in a single {{StreamingQueryStatus}} object.  The 
downside to this approach is that a user now needs to reason about what state 
the query is in anytime they retrieve a status object.  Fields like 
{{statusMessage}} don't appear in messages that come from listener bus.  And 
inputRate/processingRate statistics are usually {{0}} when you retrieve a 
status object from the query itself.

I propose we make the follow changes:
 - Make {{status}} only report instantaneous things, such as if data is 
available or a human readable message about what phase we are currently in.
 - Have a separate {{progress}} message that we report for each trigger with 
the other performance information that lives in status today.  You should be 
able to easily retrieve a configurable number of the most recent progress 
messages instead of just the most recent.

While we are making these changes, I propose that we also change {{id}} to be a 
globally unique identifier, rather than a JVM unique one.  Without this its 
hard to correlate performance across restarts.


> Separate instantaneous state from progress performance statistics
> -
>
> Key: SPARK-18516
> URL: https://issues.apache.org/jira/browse/SPARK-18516
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>    Reporter: Michael Armbrust
>Assignee: Michael Armbrust
>Priority: Blocker
>
> There are two types of information that you want to be able to extract from a 
> running query: instantaneous _status_ and metrics about the performance as 
> make _progress_ in query processing.
> Today, these are conflated in a single {{StreamingQueryStatus}} object.  The 
> downside to this approach is that a user now needs to reason about what state 
> the query is in anytime they retrieve a status object.  Fields like 
> {{statusMessage}} don't appear in updates that come from listener bus.  And 
> inputRate/processingRate statistics are usually {{0}} when you retrieve a 
> status object from the query itself.
> I propose we make the follow changes:
>  - Make {{status}} only report instantaneous things, such as if data is 
> available or a human readable message about what phase we are currently in.
>  - Have a separate {{progress}} message that we report for each trigger with 
> the other performance information that lives in status today.  You should be 
> able to easily retrieve a configurable number of the most recent progress 
> messages instead of just the most recent.
> While we are making these changes, I propose that we also change {{id}} to be 
> a globally unique identifier, rather than a JVM unique one.  Without this its 
> hard to correlate performance across restarts.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-18516) Separate instantaneous state from progress performance statistics

2016-11-20 Thread Michael Armbrust (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-18516?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Michael Armbrust updated SPARK-18516:
-
Description: 
There are two types of information that you want to be able to extract from a 
running query: instantaneous _status_ and metrics about the performance as make 
_progress_ in query processing.

Today, these are conflated in a single {{StreamingQueryStatus}} object.  The 
downside to this approach is that a user now needs to reason about what state 
the query is in anytime they retrieve a status object.  Fields like 
{{statusMessage}} don't appear in updates that come from listener bus.  
Simlarly, {{inputRate}}/{{processingRate}} statistics are usually {{0}} when 
you retrieve a status object from the query itself.

I propose we make the follow changes:
 - Make {{status}} only report instantaneous things, such as if data is 
available or a human readable message about what phase we are currently in.
 - Have a separate {{progress}} message that we report for each trigger with 
the other performance information that lives in status today.  You should be 
able to easily retrieve a configurable number of the most recent progress 
messages instead of just the most recent.

While we are making these changes, I propose that we also change {{id}} to be a 
globally unique identifier, rather than a JVM unique one.  Without this its 
hard to correlate performance across restarts.

  was:
There are two types of information that you want to be able to extract from a 
running query: instantaneous _status_ and metrics about the performance as make 
_progress_ in query processing.

Today, these are conflated in a single {{StreamingQueryStatus}} object.  The 
downside to this approach is that a user now needs to reason about what state 
the query is in anytime they retrieve a status object.  Fields like 
{{statusMessage}} don't appear in updates that come from listener bus.  And 
inputRate/processingRate statistics are usually {{0}} when you retrieve a 
status object from the query itself.

I propose we make the follow changes:
 - Make {{status}} only report instantaneous things, such as if data is 
available or a human readable message about what phase we are currently in.
 - Have a separate {{progress}} message that we report for each trigger with 
the other performance information that lives in status today.  You should be 
able to easily retrieve a configurable number of the most recent progress 
messages instead of just the most recent.

While we are making these changes, I propose that we also change {{id}} to be a 
globally unique identifier, rather than a JVM unique one.  Without this its 
hard to correlate performance across restarts.


> Separate instantaneous state from progress performance statistics
> -
>
> Key: SPARK-18516
> URL: https://issues.apache.org/jira/browse/SPARK-18516
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>    Reporter: Michael Armbrust
>Assignee: Michael Armbrust
>Priority: Blocker
>
> There are two types of information that you want to be able to extract from a 
> running query: instantaneous _status_ and metrics about the performance as 
> make _progress_ in query processing.
> Today, these are conflated in a single {{StreamingQueryStatus}} object.  The 
> downside to this approach is that a user now needs to reason about what state 
> the query is in anytime they retrieve a status object.  Fields like 
> {{statusMessage}} don't appear in updates that come from listener bus.  
> Simlarly, {{inputRate}}/{{processingRate}} statistics are usually {{0}} when 
> you retrieve a status object from the query itself.
> I propose we make the follow changes:
>  - Make {{status}} only report instantaneous things, such as if data is 
> available or a human readable message about what phase we are currently in.
>  - Have a separate {{progress}} message that we report for each trigger with 
> the other performance information that lives in status today.  You should be 
> able to easily retrieve a configurable number of the most recent progress 
> messages instead of just the most recent.
> While we are making these changes, I propose that we also change {{id}} to be 
> a globally unique identifier, rather than a JVM unique one.  Without this its 
> hard to correlate performance across restarts.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-18516) Separate instantaneous state from progress performance statistics

2016-11-20 Thread Michael Armbrust (JIRA)
Michael Armbrust created SPARK-18516:


 Summary: Separate instantaneous state from progress performance 
statistics
 Key: SPARK-18516
 URL: https://issues.apache.org/jira/browse/SPARK-18516
 Project: Spark
  Issue Type: Improvement
  Components: Structured Streaming
Reporter: Michael Armbrust
Assignee: Michael Armbrust
Priority: Blocker


There are two types of information that you want to be able to extract from a 
running query: instantaneous _status_ and metrics about the performance as make 
_progress_ in query processing.

Today, these are conflated in a single {{StreamingQueryStatus}} object.  The 
downside to this approach is that a user now needs to reason about what state 
the query is in anytime they retrieve a status object.  Fields like 
{{statusMessage}} don't appear in messages that come from listener bus.  And 
inputRate/processingRate statistics are usually {{0}} when you retrieve a 
status object from the query itself.

I propose we make the follow changes:
 - Make {{status}} only report instantaneous things, such as if data is 
available or a human readable message about what phase we are currently in.
 - Have a separate {{progress}} message that we report for each trigger with 
the other performance information that lives in status today.  You should be 
able to easily retrieve a configurable number of the most recent progress 
messages instead of just the most recent.

While we are making these changes, I propose that we also change {{id}} to be a 
globally unique identifier, rather than a JVM unique one.  Without this its 
hard to correlate performance across restarts.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



Re: Analyzing and reusing cached Datasets

2016-11-19 Thread Michael Armbrust
You are hitting a weird optimization in withColumn.  Specifically, to avoid
building up huge trees with chained calls to this method, we collapse
projections eagerly (instead of waiting for the optimizer).

Typically we look for cached data in between analysis and optimization, so
that optimizations won't change out ability to recognized cached query
plans.  However, in this case the eager optimization is thwarting that.

On Sat, Nov 19, 2016 at 12:19 PM, Jacek Laskowski  wrote:

> Hi,
>
> There might be a bug in how analyzing Datasets or looking up cached
> Datasets works. I'm on master.
>
> ➜  spark git:(master) ✗ ./bin/spark-submit --version
> Welcome to
>     __
>  / __/__  ___ _/ /__
> _\ \/ _ \/ _ `/ __/  '_/
>/___/ .__/\_,_/_/ /_/\_\   version 2.1.0-SNAPSHOT
>   /_/
>
> Using Scala version 2.11.8, Java HotSpot(TM) 64-Bit Server VM, 1.8.0_112
> Branch master
> Compiled by user jacek on 2016-11-19T08:39:43Z
> Revision 2a40de408b5eb47edba92f9fe92a42ed1e78bf98
> Url https://github.com/apache/spark.git
> Type --help for more information.
>
> After reviewing CacheManager and how caching works for Datasets I
> thought the following query would use the cached Dataset but it does
> not.
>
> // Cache Dataset -- it is lazy
> scala> val df = spark.range(1).cache
> df: org.apache.spark.sql.Dataset[Long] = [id: bigint]
>
> // Trigger caching
> scala> df.show
> +---+
> | id|
> +---+
> |  0|
> +---+
>
> // Visit http://localhost:4040/storage to see the Dataset cached. And it
> is.
>
> // Use the cached Dataset in another query
> // Notice InMemoryRelation in use for cached queries
> // It works as expected.
> scala> df.withColumn("newId", 'id).explain(extended = true)
> == Parsed Logical Plan ==
> 'Project [*, 'id AS newId#16]
> +- Range (0, 1, step=1, splits=Some(8))
>
> == Analyzed Logical Plan ==
> id: bigint, newId: bigint
> Project [id#0L, id#0L AS newId#16L]
> +- Range (0, 1, step=1, splits=Some(8))
>
> == Optimized Logical Plan ==
> Project [id#0L, id#0L AS newId#16L]
> +- InMemoryRelation [id#0L], true, 1, StorageLevel(disk, memory,
> deserialized, 1 replicas)
>   +- *Range (0, 1, step=1, splits=Some(8))
>
> == Physical Plan ==
> *Project [id#0L, id#0L AS newId#16L]
> +- InMemoryTableScan [id#0L]
>   +- InMemoryRelation [id#0L], true, 1, StorageLevel(disk,
> memory, deserialized, 1 replicas)
> +- *Range (0, 1, step=1, splits=Some(8))
>
> I hoped that the following query would use the cached one but it does
> not. Should it? I thought that QueryExecution.withCachedData [1] would
> do the trick.
>
> [1] https://github.com/apache/spark/blob/master/sql/core/
> src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala#L70
>
> // The following snippet uses spark.range(1) which is the same as the
> one cached above
> // Why does the physical plan not use InMemoryTableScan and
> InMemoryRelation?
> scala> spark.range(1).withColumn("new", 'id).explain(extended = true)
> == Parsed Logical Plan ==
> 'Project [*, 'id AS new#29]
> +- Range (0, 1, step=1, splits=Some(8))
>
> == Analyzed Logical Plan ==
> id: bigint, new: bigint
> Project [id#26L, id#26L AS new#29L]
> +- Range (0, 1, step=1, splits=Some(8))
>
> == Optimized Logical Plan ==
> Project [id#26L, id#26L AS new#29L]
> +- Range (0, 1, step=1, splits=Some(8))
>
> == Physical Plan ==
> *Project [id#26L, id#26L AS new#29L]
> +- *Range (0, 1, step=1, splits=Some(8))
>
> Pozdrawiam,
> Jacek Laskowski
> 
> https://medium.com/@jaceklaskowski/
> Mastering Apache Spark 2.0 https://bit.ly/mastering-apache-spark
> Follow me at https://twitter.com/jaceklaskowski
>
> -
> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>
>


Re: Multiple streaming aggregations in structured streaming

2016-11-18 Thread Michael Armbrust
Doing this generally is pretty hard.  We will likely support algebraic
aggregate eventually, but this is not currently slotted for 2.2.  Instead I
think we will add something like mapWithState that lets users compute
arbitrary stateful things.  What is your use case?


On Wed, Nov 16, 2016 at 6:58 PM, wszxyh  wrote:

> Hi
>
> Multiple streaming aggregations are not yet supported. When will it be
> supported? Is it in the plan?
>
> Thanks
>
>
>
>


[jira] [Updated] (SPARK-18339) Don't push down current_timestamp for filters in StructuredStreaming

2016-11-18 Thread Michael Armbrust (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-18339?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Michael Armbrust updated SPARK-18339:
-
Assignee: Tyson Condie

> Don't push down current_timestamp for filters in StructuredStreaming
> 
>
> Key: SPARK-18339
> URL: https://issues.apache.org/jira/browse/SPARK-18339
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.0.1
>Reporter: Burak Yavuz
>Assignee: Tyson Condie
>
> For the following workflow:
> 1. I have a column called time which is at minute level precision in a 
> Streaming DataFrame
> 2. I want to perform groupBy time, count
> 3. Then I want my MemorySink to only have the last 30 minutes of counts and I 
> perform this by
> {code}
> .where('time >= current_timestamp().cast("long") - 30 * 60)
> {code}
> what happens is that the `filter` gets pushed down before the aggregation, 
> and the filter happens on the source data for the aggregation instead of the 
> result of the aggregation (where I actually want to filter).
> I guess the main issue here is that `current_timestamp` is non-deterministic 
> in the streaming context and shouldn't be pushed down the filter.
> Does this require us to store the `current_timestamp` for each trigger of the 
> streaming job, that is something to discuss.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-18339) Don't push down current_timestamp for filters in StructuredStreaming

2016-11-18 Thread Michael Armbrust (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-18339?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Michael Armbrust updated SPARK-18339:
-
Target Version/s: 2.1.0  (was: 2.2.0)

> Don't push down current_timestamp for filters in StructuredStreaming
> 
>
> Key: SPARK-18339
> URL: https://issues.apache.org/jira/browse/SPARK-18339
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.0.1
>Reporter: Burak Yavuz
>
> For the following workflow:
> 1. I have a column called time which is at minute level precision in a 
> Streaming DataFrame
> 2. I want to perform groupBy time, count
> 3. Then I want my MemorySink to only have the last 30 minutes of counts and I 
> perform this by
> {code}
> .where('time >= current_timestamp().cast("long") - 30 * 60)
> {code}
> what happens is that the `filter` gets pushed down before the aggregation, 
> and the filter happens on the source data for the aggregation instead of the 
> result of the aggregation (where I actually want to filter).
> I guess the main issue here is that `current_timestamp` is non-deterministic 
> in the streaming context and shouldn't be pushed down the filter.
> Does this require us to store the `current_timestamp` for each trigger of the 
> streaming job, that is something to discuss.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-18497) ForeachSink fails with "assertion failed: No plan for EventTimeWatermark"

2016-11-17 Thread Michael Armbrust (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-18497?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Michael Armbrust updated SPARK-18497:
-
Target Version/s: 2.1.0

> ForeachSink fails with "assertion failed: No plan for EventTimeWatermark"
> -
>
> Key: SPARK-18497
> URL: https://issues.apache.org/jira/browse/SPARK-18497
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.1.0
>Reporter: Aaron Davidson
>
> I have a pretty standard stream. I call ".writeStream.foreach(...).start()" 
> and get
> {code}
> java.lang.AssertionError: assertion failed: No plan for EventTimeWatermark 
> timestamp#39688: timestamp, interval 1 days
>   at scala.Predef$.assert(Predef.scala:170)
>   at 
> org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:92)
>   at 
> org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:77)
>   at 
> org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:74)
>   at 
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
>   at 
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
>   at scala.collection.Iterator$class.foreach(Iterator.scala:893)
>   at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
>   at 
> scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
>   at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1336)
>   at 
> org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2.apply(QueryPlanner.scala:74)
>   at 
> org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2.apply(QueryPlanner.scala:66)
>   at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
>   at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
>   at 
> org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:92)
>   at 
> org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:77)
>   at 
> org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:74)
>   at 
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
>   at 
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
>   at scala.collection.Iterator$class.foreach(Iterator.scala:893)
>   at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
>   at 
> scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
>   at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1336)
>   at 
> org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2.apply(QueryPlanner.scala:74)
>   at 
> org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2.apply(QueryPlanner.scala:66)
>   at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
>   at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
>   at 
> org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:92)
>   at 
> org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:79)
>   at 
> org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:75)
>   at 
> org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:84)
>   at 
> org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:84)
>   at 
> org.apache.spark.sql.execution.QueryExecution$$anonfun$toString$3.apply(QueryExecution.scala:232)
>   at 
> org.apache.spark.sql.execution.QueryExecution$$anonfun$toString$3.apply(QueryExecution.scala:232)
>   at 
> org.apache.spark.sql.execution.QueryExecution.stringOrError(QueryExecution.scala:107)
>   at 
> org.apache.spark.sql.execution.QueryExecution.toString(QueryExecution.scala:232)
>   at 
> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:54)
>   at org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:2751)
>   at org.apache.spark.sql.Dataset.foreachPartition(Dataset.scala:2290)
>   at 
> org.apache.spark.sql.execution.streaming.ForeachSink.addBatch(ForeachSink.scala:70)
>   at 
> org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sq

[jira] [Updated] (SPARK-18497) ForeachSink fails with "assertion failed: No plan for EventTimeWatermark"

2016-11-17 Thread Michael Armbrust (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-18497?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Michael Armbrust updated SPARK-18497:
-
Priority: Critical  (was: Major)

> ForeachSink fails with "assertion failed: No plan for EventTimeWatermark"
> -
>
> Key: SPARK-18497
> URL: https://issues.apache.org/jira/browse/SPARK-18497
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.1.0
>Reporter: Aaron Davidson
>Priority: Critical
>
> I have a pretty standard stream. I call ".writeStream.foreach(...).start()" 
> and get
> {code}
> java.lang.AssertionError: assertion failed: No plan for EventTimeWatermark 
> timestamp#39688: timestamp, interval 1 days
>   at scala.Predef$.assert(Predef.scala:170)
>   at 
> org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:92)
>   at 
> org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:77)
>   at 
> org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:74)
>   at 
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
>   at 
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
>   at scala.collection.Iterator$class.foreach(Iterator.scala:893)
>   at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
>   at 
> scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
>   at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1336)
>   at 
> org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2.apply(QueryPlanner.scala:74)
>   at 
> org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2.apply(QueryPlanner.scala:66)
>   at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
>   at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
>   at 
> org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:92)
>   at 
> org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:77)
>   at 
> org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:74)
>   at 
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
>   at 
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
>   at scala.collection.Iterator$class.foreach(Iterator.scala:893)
>   at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
>   at 
> scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
>   at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1336)
>   at 
> org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2.apply(QueryPlanner.scala:74)
>   at 
> org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2.apply(QueryPlanner.scala:66)
>   at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
>   at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
>   at 
> org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:92)
>   at 
> org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:79)
>   at 
> org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:75)
>   at 
> org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:84)
>   at 
> org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:84)
>   at 
> org.apache.spark.sql.execution.QueryExecution$$anonfun$toString$3.apply(QueryExecution.scala:232)
>   at 
> org.apache.spark.sql.execution.QueryExecution$$anonfun$toString$3.apply(QueryExecution.scala:232)
>   at 
> org.apache.spark.sql.execution.QueryExecution.stringOrError(QueryExecution.scala:107)
>   at 
> org.apache.spark.sql.execution.QueryExecution.toString(QueryExecution.scala:232)
>   at 
> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:54)
>   at org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:2751)
>   at org.apache.spark.sql.Dataset.foreachPartition(Dataset.scala:2290)
>   at 
> org.apache.spark.sql.execution.streaming.ForeachSink.addBatch(ForeachSink.scala:70)
>   at 
> org.apache.spark.sql.execution.str

[jira] [Resolved] (SPARK-18461) Improve docs on StreamingQueryListener and StreamingQuery.status

2016-11-16 Thread Michael Armbrust (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-18461?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Michael Armbrust resolved SPARK-18461.
--
Resolution: Fixed

Issue resolved by pull request 15897
[https://github.com/apache/spark/pull/15897]

> Improve docs on StreamingQueryListener and StreamingQuery.status
> 
>
> Key: SPARK-18461
> URL: https://issues.apache.org/jira/browse/SPARK-18461
> Project: Spark
>  Issue Type: Sub-task
>  Components: Documentation, Structured Streaming
>Reporter: Tathagata Das
>Assignee: Tathagata Das
> Fix For: 2.1.0
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-17977) DataFrameReader and DataStreamReader should have an ancestor class

2016-11-16 Thread Michael Armbrust (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17977?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15671287#comment-15671287
 ] 

Michael Armbrust commented on SPARK-17977:
--

No, they were actually the same class for a while.  I would be fine with 
creating a trait that has the common methods.

> DataFrameReader and DataStreamReader should have an ancestor class
> --
>
> Key: SPARK-17977
> URL: https://issues.apache.org/jira/browse/SPARK-17977
> Project: Spark
>  Issue Type: Wish
>  Components: SQL
>Affects Versions: 2.0.0, 2.0.1
>Reporter: Amit Assudani
>
> There should be an ancestor class of DataFrameReader and DataStreamReader to 
> configure common options / format and use common methods. Most of the methods 
> are exact same having exact same arguments. This will help create utilities / 
> generic code being used for stream / batch applications. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



Re: How do I convert json_encoded_blob_column into a data frame? (This may be a feature request)

2016-11-16 Thread Michael Armbrust
On Wed, Nov 16, 2016 at 2:49 AM, Hyukjin Kwon  wrote:

> Maybe it sounds like you are looking for from_json/to_json functions after
> en/decoding properly.
>

Which are new built-in functions that will be released with Spark 2.1.


Re: How do I convert json_encoded_blob_column into a data frame? (This may be a feature request)

2016-11-16 Thread Michael Armbrust
On Wed, Nov 16, 2016 at 2:49 AM, Hyukjin Kwon  wrote:

> Maybe it sounds like you are looking for from_json/to_json functions after
> en/decoding properly.
>

Which are new built-in functions that will be released with Spark 2.1.


[jira] [Resolved] (SPARK-18440) Fix FileStreamSink with aggregation + watermark + append mode

2016-11-15 Thread Michael Armbrust (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-18440?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Michael Armbrust resolved SPARK-18440.
--
Resolution: Fixed

Issue resolved by pull request 15885
[https://github.com/apache/spark/pull/15885]

> Fix FileStreamSink with aggregation + watermark + append mode
> -
>
> Key: SPARK-18440
> URL: https://issues.apache.org/jira/browse/SPARK-18440
> Project: Spark
>  Issue Type: Sub-task
>  Components: Structured Streaming
>Reporter: Tathagata Das
> Fix For: 2.1.0
>
>
> SPARK-18012 refactored the file write path in FileStreamSink using 
> FileFormatWriter which always uses the default non-streaming QueryExecution 
> to perform the writes. This is wrong for FileStreamSink, because the 
> streaming QueryExecution (i.e. IncrementalExecution) should be used for 
> correctly incrementalizing aggregation. The addition of watermarks in 
> SPARK-18124, file stream sink should logically supports aggregation + 
> watermark + append mode. But actually it fails with 
> {code}
> 16:23:07.389 ERROR org.apache.spark.sql.execution.streaming.StreamExecution: 
> Query query-0 terminated with error
> java.lang.AssertionError: assertion failed: No plan for EventTimeWatermark 
> timestamp#7: timestamp, interval 10 seconds
> +- LocalRelation [timestamp#7]
>   at scala.Predef$.assert(Predef.scala:170)
>   at 
> org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:92)
>   at 
> org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:77)
>   at 
> org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:74)
>   at 
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
>   at 
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
>   at scala.collection.Iterator$class.foreach(Iterator.scala:893)
>   at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
>   at 
> scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
>   at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1336)
>   at 
> org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2.apply(QueryPlanner.scala:74)
>   at 
> org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2.apply(QueryPlanner.scala:66)
>   at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
>   at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
>   at 
> org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:92)
>   at 
> org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:77)
>   at 
> org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:74)
>   at 
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
>   at 
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
>   at scala.collection.Iterator$class.foreach(Iterator.scala:893)
>   at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
>   at 
> scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
>   at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1336)
>   at 
> org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2.apply(QueryPlanner.scala:74)
>   at 
> org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2.apply(QueryPlanner.scala:66)
>   at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
>   at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
>   at 
> org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:92)
>   at 
> org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:77)
>   at 
> org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:74)
>   at 
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
>   at 
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
>   at scala.collection.Iterator$class.foreach(Iterator.scala:893)
>   at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
>   at 
> scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)

Re: getting encoder implicits to be more accurate

2016-11-14 Thread Michael Armbrust
I would definitly like to open up APIs for people to write their own
encoders.  The challenge thus far has been that Encoders use internal APIs
that have not been stable for translating the data into the tungsten
format.  We also make use of the analyzer to figure out the mapping from
columns to fields (also not a stable API)  This is the only "magic" that is
happening.

If someone wants to propose a stable / fast API here it would be great to
start the discussion.  Its an often requested feature.

On Mon, Nov 14, 2016 at 1:32 PM, Sam Goodwin 
wrote:

> I wouldn't recommend using a Tuple as you end up with column names like
> "_1" and "_2", but it will still work :)
>
> ExpressionEncoder can do the same thing but it doesn't support custom
> types, and as far as I can tell, does not support custom implementations.
> I.e. is it possible for me to write my own Encoder logic and completely
> bypass ExpressionEncoder? The trait definition has no useful methods so it
> doesn't seem straight-forward. If the Encoder trait was opened up so
> people could provide their own implementations then I don't see this as an
> issue anymore. It would allow for external Encoder libraries like mine
> while not neglecting Java (non-scala) developers. Is there "magic" happening
> behind the scenes stopping us from doing this?
>
> On Mon, 14 Nov 2016 at 12:31 Koert Kuipers  wrote:
>
>> just taking it for a quick spin it looks great, with correct support for
>> nested rows and using option for nullability.
>>
>> scala> val format = implicitly[RowFormat[(String, Seq[(String,
>> Option[Int])])]]
>> format: com.github.upio.spark.sql.RowFormat[(String, Seq[(String,
>> Option[Int])])] = com.github.upio.spark.sql.
>> FamilyFormats$$anon$3@2c0961e2
>>
>> scala> format.schema
>> res12: org.apache.spark.sql.types.StructType = 
>> StructType(StructField(_1,StringType,false),
>> StructField(_2,ArrayType(StructType(StructField(_1,StringType,false),
>> StructField(_2,IntegerType,true)),true),false))
>>
>> scala> val x = format.read(Row("a", Seq(Row("a", 5
>> x: (String, Seq[(String, Option[Int])]) = (a,List((a,Some(5
>>
>> scala> format.write(x)
>> res13: org.apache.spark.sql.Row = [a,List([a,5])]
>>
>>
>>
>> On Mon, Nov 14, 2016 at 3:10 PM, Koert Kuipers  wrote:
>>
>> agreed on your point that this can be done without macros
>>
>> On Wed, Nov 2, 2016 at 12:15 AM, Sam Goodwin 
>> wrote:
>>
>> You don't need compiler time macros for this, you can do it quite easily
>> using shapeless. I've been playing with a project which borrows ideas from
>> spray-json and spray-json-shapeless to implement Row marshalling for
>> arbitrary case classes. It's checked and generated at compile time,
>> supports arbitrary/nested case classes, and allows custom types. It is also
>> entirely pluggable meaning you can bypass the default implementations and
>> provide your own, just like any type class.
>>
>> https://github.com/upio/spark-sql-formats
>>
>>
>> *From:* Michael Armbrust 
>> *Date:* October 26, 2016 at 12:50:23 PM PDT
>> *To:* Koert Kuipers 
>> *Cc:* Ryan Blue , "dev@spark.apache.org" <
>> dev@spark.apache.org>
>> *Subject:* *Re: getting encoder implicits to be more accurate*
>>
>> Sorry, I realize that set is only one example here, but I don't think
>> that making the type of the implicit more narrow to include only ProductN
>> or something eliminates the issue.  Even with that change, we will fail to
>> generate an encoder with the same error if you, for example, have a field
>> of your case class that is an unsupported type.
>>
>>
>>
>> Short of changing this to compile-time macros, I think we are stuck with
>> this class of errors at runtime.  The simplest solution seems to be to
>> expand the set of thing we can handle as much as possible and allow users
>> to turn on a kryo fallback for expression encoders.  I'd be hesitant to
>> make this the default though, as behavior would change with each release
>> that adds support for more types.  I would be very supportive of making
>> this fallback a built-in option though.
>>
>>
>>
>> On Wed, Oct 26, 2016 at 11:47 AM, Koert Kuipers 
>> wrote:
>>
>> yup, it doesnt really solve the underlying issue.
>>
>> we fixed it internally by having our own typeclass that produces encoders
>> and that does check the contents of the products, but we did this by simply
>> supporting Tuple1 - Tu

[jira] [Updated] (SPARK-18407) Inferred partition columns cause assertion error

2016-11-10 Thread Michael Armbrust (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-18407?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Michael Armbrust updated SPARK-18407:
-
Description: 
[This 
assertion|https://github.com/apache/spark/blob/16eaad9daed0b633e6a714b5704509aa7107d6e5/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala#L408]
 fails when you run a stream against json data that is stored in partitioned 
folders, if you manually specify the schema and that schema omits the 
partitioned columns.

My hunch is that we are inferring those columns even though the schema is being 
passed in manually and adding them to the end.

While we are fixing this bug, it would be nice to make the assertion better.  
Truncating is not terribly useful as, at least in my case, it truncated the 
most interesting part.  I changed it to this while debugging:

{code}
  s"""
 |Batch does not have expected schema
 |Expected: ${output.mkString(",")}
 |Actual: ${newPlan.output.mkString(",")}
 |
 |== Original ==
 |$logicalPlan
 |
 |== Batch ==
 |$newPlan
   """.stripMargin
{code}

I also tried specifying the partition columns in the schema and now it appears 
that they are filled with corrupted data.

  was:
[This 
assertion|https://github.com/apache/spark/blob/16eaad9daed0b633e6a714b5704509aa7107d6e5/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala#L408]
 fails when you run a stream against json data that is stored in partitioned 
folders, if you manually specify the schema and that schema omits the 
partitioned columns.

My hunch is that we are inferring those columns even though the schema is being 
passed in manually and adding them to the end.

While we are fixing this bug, it would be nice to make the assertion better.  
Truncating is not terribly useful as, at least in my case, it truncated the 
most interesting part.  I changed it to this while debugging:

{code}
  s"""
 |Batch does not have expected schema
 |Expected: ${output.mkString(",")}
 |Actual: ${newPlan.output.mkString(",")}
 |
 |== Original ==
 |$logicalPlan
 |
 |== Batch ==
 |$newPlan
   """.stripMargin
{code}


> Inferred partition columns cause assertion error
> 
>
> Key: SPARK-18407
> URL: https://issues.apache.org/jira/browse/SPARK-18407
> Project: Spark
>      Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.0.2
>Reporter: Michael Armbrust
>Priority: Critical
>
> [This 
> assertion|https://github.com/apache/spark/blob/16eaad9daed0b633e6a714b5704509aa7107d6e5/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala#L408]
>  fails when you run a stream against json data that is stored in partitioned 
> folders, if you manually specify the schema and that schema omits the 
> partitioned columns.
> My hunch is that we are inferring those columns even though the schema is 
> being passed in manually and adding them to the end.
> While we are fixing this bug, it would be nice to make the assertion better.  
> Truncating is not terribly useful as, at least in my case, it truncated the 
> most interesting part.  I changed it to this while debugging:
> {code}
>   s"""
>  |Batch does not have expected schema
>  |Expected: ${output.mkString(",")}
>  |Actual: ${newPlan.output.mkString(",")}
>  |
>  |== Original ==
>  |$logicalPlan
>  |
>  |== Batch ==
>  |$newPlan
>""".stripMargin
> {code}
> I also tried specifying the partition columns in the schema and now it 
> appears that they are filled with corrupted data.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-18407) Inferred partition columns cause assertion error

2016-11-10 Thread Michael Armbrust (JIRA)
Michael Armbrust created SPARK-18407:


 Summary: Inferred partition columns cause assertion error
 Key: SPARK-18407
 URL: https://issues.apache.org/jira/browse/SPARK-18407
 Project: Spark
  Issue Type: Bug
  Components: Structured Streaming
Affects Versions: 2.0.2
Reporter: Michael Armbrust
Priority: Critical


[This 
assertion|https://github.com/apache/spark/blob/16eaad9daed0b633e6a714b5704509aa7107d6e5/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala#L408]
 fails when you run a stream against json data that is stored in partitioned 
folders, if you manually specify the schema and that schema omits the 
partitioned columns.

My hunch is that we are inferring those columns even though the schema is being 
passed in manually and adding them to the end.

While we are fixing this bug, it would be nice to make the assertion better.  
Truncating is not terribly useful as, at least in my case, it truncated the 
most interesting part.  I changed it to this while debugging:

{code}
  s"""
 |Batch does not have expected schema
 |Expected: ${output.mkString(",")}
 |Actual: ${newPlan.output.mkString(",")}
 |
 |== Original ==
 |$logicalPlan
 |
 |== Batch ==
 |$newPlan
   """.stripMargin
{code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



Re: type-safe join in the new DataSet API?

2016-11-10 Thread Michael Armbrust
You can groupByKey and then cogroup.

On Thu, Nov 10, 2016 at 10:44 AM, Yang  wrote:

> the new DataSet API is supposed to provide type safety and type checks at
> compile time https://spark.apache.org/docs/latest/structured-
> streaming-programming-guide.html#join-operations
>
> It does this indeed for a lot of places, but I found it still doesn't have
> a type safe join:
>
> val ds1 = hc.sql("select col1, col2 from mytable")
>
> val ds2 = hc.sql("select col3 , col4 from mytable2")
>
> val ds3 = ds1.joinWith(ds2, ds1.col("col1") === ds2.col("col2"))
>
> here spark has no way to make sure (at compile time) that the two columns
> being joined together
> , "col1" and "col2" are of matching types. This is contrast to rdd join,
> where it would be detected at compile time.
>
> am I missing something?
>
> thanks
>
>


[jira] [Commented] (SPARK-17691) Add aggregate function to collect list with maximum number of elements

2016-11-09 Thread Michael Armbrust (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17691?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15652723#comment-15652723
 ] 

Michael Armbrust commented on SPARK-17691:
--

I think that should be able to use mutable buffers with the aggregator 
interface (and if there is bad performance there we should fix it).  Depending 
on what you are trying to do, I'd imagine groupByKey and mapGroups would also 
be fast.  You could also collect the top N per group using window functions.

Basically, this function sounds pretty specific (correct me if I'm wrong and 
this a common thing that other system support).  So I think it makes more sense 
to find fast/general mechanisms that let you build something specific like 
this, rather than adding yet another aggregate function.

> Add aggregate function to collect list with maximum number of elements
> --
>
> Key: SPARK-17691
> URL: https://issues.apache.org/jira/browse/SPARK-17691
> Project: Spark
>  Issue Type: New Feature
>Reporter: Assaf Mendelson
>Priority: Minor
>
> One of the aggregate functions we have today is the collect_list function. 
> This is a useful tool to do a "catch all" aggregation which doesn't really 
> fit anywhere else.
> The problem with collect_list is that it is unbounded. I would like to see a 
> means to do a collect_list where we limit the maximum number of elements.
> I would see that the input for this would be the maximum number of elements 
> to use and the method of choosing (pick whatever, pick the top N, pick the 
> bottom B)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



Re: Aggregations on every column on dataframe causing StackOverflowError

2016-11-09 Thread Michael Armbrust
It would be great if you could try with the 2.0.2 RC.  Thanks for creating
an issue.

On Wed, Nov 9, 2016 at 1:22 PM, Raviteja Lokineni <
raviteja.lokin...@gmail.com> wrote:

> Well I've tried with 1.5.2, 1.6.2 and 2.0.1
>
> FYI, I have created https://issues.apache.org/jira/browse/SPARK-18388
>
> On Wed, Nov 9, 2016 at 3:08 PM, Michael Armbrust 
> wrote:
>
>> Which version of Spark?  Does seem like a bug.
>>
>> On Wed, Nov 9, 2016 at 10:06 AM, Raviteja Lokineni <
>> raviteja.lokin...@gmail.com> wrote:
>>
>>> Does this stacktrace look like a bug guys? Definitely seems like one to
>>> me.
>>>
>>> Caused by: java.lang.StackOverflowError
>>> at org.apache.spark.sql.execution.SparkPlan.prepare(SparkPlan.scala:195)
>>> at 
>>> org.apache.spark.sql.execution.SparkPlan$$anonfun$prepare$1.apply(SparkPlan.scala:195)
>>> at 
>>> org.apache.spark.sql.execution.SparkPlan$$anonfun$prepare$1.apply(SparkPlan.scala:195)
>>> at scala.collection.immutable.List.foreach(List.scala:381)
>>> at org.apache.spark.sql.execution.SparkPlan.prepare(SparkPlan.scala:195)
>>> at 
>>> org.apache.spark.sql.execution.SparkPlan$$anonfun$prepare$1.apply(SparkPlan.scala:195)
>>> at 
>>> org.apache.spark.sql.execution.SparkPlan$$anonfun$prepare$1.apply(SparkPlan.scala:195)
>>> at scala.collection.immutable.List.foreach(List.scala:381)
>>> at org.apache.spark.sql.execution.SparkPlan.prepare(SparkPlan.scala:195)
>>> at 
>>> org.apache.spark.sql.execution.SparkPlan$$anonfun$prepare$1.apply(SparkPlan.scala:195)
>>> at 
>>> org.apache.spark.sql.execution.SparkPlan$$anonfun$prepare$1.apply(SparkPlan.scala:195)
>>> at scala.collection.immutable.List.foreach(List.scala:381)
>>> at org.apache.spark.sql.execution.SparkPlan.prepare(SparkPlan.scala:195)
>>> at 
>>> org.apache.spark.sql.execution.SparkPlan$$anonfun$prepare$1.apply(SparkPlan.scala:195)
>>> at 
>>> org.apache.spark.sql.execution.SparkPlan$$anonfun$prepare$1.apply(SparkPlan.scala:195)
>>> at scala.collection.immutable.List.foreach(List.scala:381)
>>>
>>>
>>> On Wed, Nov 9, 2016 at 10:48 AM, Raviteja Lokineni <
>>> raviteja.lokin...@gmail.com> wrote:
>>>
>>>> Hi all,
>>>>
>>>> I am not sure if this is a bug or not. Basically I am generating weekly
>>>> aggregates of every column of data.
>>>>
>>>> Adding source code here (also attached):
>>>>
>>>> from pyspark.sql.window import Window
>>>> from pyspark.sql.functions import *
>>>>
>>>> timeSeries = sqlContext.read.option("header", 
>>>> "true").format("org.apache.spark.sql.execution.datasources.csv.CSVFileFormat").load("file:///tmp/spark-bug.csv")
>>>>
>>>> # Hive timestamp is interpreted as UNIX timestamp in seconds*
>>>> days = lambda i: i * 86400
>>>>
>>>> w = (Window()
>>>>  .partitionBy("id")
>>>>  .orderBy(col("dt").cast("timestamp").cast("long"))
>>>>  .rangeBetween(-days(6), 0))
>>>>
>>>> cols = ["id", "dt"]
>>>> skipCols = ["id", "dt"]
>>>>
>>>> for col in timeSeries.columns:
>>>> if col in skipCols:
>>>> continue
>>>> cols.append(mean(col).over(w).alias("mean_7_"+col))
>>>> cols.append(count(col).over(w).alias("count_7_"+col))
>>>> cols.append(sum(col).over(w).alias("sum_7_"+col))
>>>> cols.append(min(col).over(w).alias("min_7_"+col))
>>>> cols.append(max(col).over(w).alias("max_7_"+col))
>>>>
>>>> df = timeSeries.select(cols)
>>>> df.orderBy('id', 'dt').write\
>>>> 
>>>> .format("org.apache.spark.sql.execution.datasources.csv.CSVFileFormat")\
>>>> .save("file:///tmp/spark-bug-out.csv")
>>>>
>>>>
>>>> Thanks,
>>>> --
>>>> *Raviteja Lokineni* | Business Intelligence Developer
>>>> TD Ameritrade
>>>>
>>>> E: raviteja.lokin...@gmail.com
>>>>
>>>> [image: View Raviteja Lokineni's profile on LinkedIn]
>>>> <http://in.linkedin.com/in/ravitejalokineni>
>>>>
>>>>
>>>
>>>
>>> --
>>> *Raviteja Lokineni* | Business Intelligence Developer
>>> TD Ameritrade
>>>
>>> E: raviteja.lokin...@gmail.com
>>>
>>> [image: View Raviteja Lokineni's profile on LinkedIn]
>>> <http://in.linkedin.com/in/ravitejalokineni>
>>>
>>>
>>
>
>
> --
> *Raviteja Lokineni* | Business Intelligence Developer
> TD Ameritrade
>
> E: raviteja.lokin...@gmail.com
>
> [image: View Raviteja Lokineni's profile on LinkedIn]
> <http://in.linkedin.com/in/ravitejalokineni>
>
>


<    1   2   3   4   5   6   7   8   9   10   >